Crossbar.io/Autobahn 服务器端会话存储

Crossbar.io/Autobahn server side session storage

我正在尝试设置一个可以处理单个客户端会话数据的 WAMP 服务器。然而,这似乎比最初想象的更麻烦。

交叉栏配置:

{
  "workers": [
    {
      "type": "router",
      "realms": [
        {
          "name": "default",
          "roles": [
            {
              "name": "anonymous",
              "permissions": [
                {
                  "uri": "*",
                  "call": true,
                  "register": true
                }
              ]
            }
          ]
        }
      ],
      "transports": [
        {
          "type": "websocket",
          "endpoint": {
            "type": "tcp",
            "port": 8080
          }
        }
      ],
      "components": [
        {
          "type": "class",
          "classname": "server.Server",
          "realm": "default",
          "role": "anonymous"
        }
      ]
    }
  ]
}

server.py:

服务器注册了两个RPC,一个用于追加数据,一个用于返回一串数据。数据存储为 self.data,但这是为所有会话存储数据,而不是基于每个客户端、每个会话。一旦会话结束,服务器应该清理会话数据。简单地清理列表不是解决方案,因为并发客户端可以访问彼此的数据。 客户端的 ID 在 append RPC 中可用(如果客户端公开自己),但是此时这似乎毫无用处。

from autobahn.twisted import wamp
from autobahn.wamp import types


class Server(wamp.ApplicationSession):
    def __init__(self, *args, **kwargs):
        wamp.ApplicationSession.__init__(self, *args, **kwargs)
        self.data = []

    def onJoin(self, details):
        def append(data, details):
            client_id = details.caller
            self.data.append(data)

        def get():
            return ''.join(self.data)

        options = types.RegisterOptions(details_arg='details')
        self.register(append, 'append', options=options)
        self.register(get, 'get')

client.py:

客户端连接到服务器,等待连接打开并执行RPC。客户端首先将'a''b'附加到服务器的数据,然后获取并打印数据。结果应该是 ab,因为数据应该按每个客户端、每个会话存储。一旦会话结束,数据应该被清理。

import asyncio

from autobahn.asyncio import wamp
from autobahn.asyncio import websocket
from autobahn.wamp import types


class Client(wamp.ApplicationSession):
    def onOpen(self, protocol):
        protocol.session = self
        wamp.ApplicationSession.onOpen(self, protocol)


if __name__ == '__main__':
    session_factory = wamp.ApplicationSessionFactory()
    session_factory.session = Client
    transport_factory = websocket.WampWebSocketClientFactory(session_factory)

    loop = asyncio.get_event_loop()
    transport, protocol = loop.run_until_complete(
        asyncio.async(loop.create_connection(
            transport_factory, 'localhost', '8080',)))

    connected = asyncio.Event()

    @asyncio.coroutine
    def poll():
        session = getattr(protocol, 'session', None)
        if not session:
            yield from asyncio.sleep(1)
            asyncio.ensure_future(poll())
        else:
            connected.set()

    # Wait for session to open.
    asyncio.ensure_future(poll())
    loop.run_until_complete(connected.wait())

    # Client is connected, call RPCs.
    options = types.CallOptions(disclose_me=True)
    protocol.session.call('append', 'a', options=options)
    protocol.session.call('append', 'b', options=options)
    f = protocol.session.call('get', options=options)
    # Get stored data and print it.
    print(loop.run_until_complete(f))

希望有人能告诉我如何将每个客户端、每个会话的数据存储在服务器的内存中。

我设法为此创建了一个 hack。它似乎不是完美的解决方案,但它现在有效。

from autobahn.twisted import wamp
from autobahn.wamp import types
from twisted.internet.defer import inlineCallbacks


class Server(wamp.ApplicationSession):
    def __init__(self, *args, **kwargs):
        wamp.ApplicationSession.__init__(self, *args, **kwargs)    
        self.sessions = {}

    def onJoin(self, details):
        def on_client_join(details):
            client_id = details['session']
            self.sessions[client_id] = {}

        def on_client_leave(client_id):
            self.sessions.pop(client_id)

        self.subscribe(on_client_join, 'wamp.session.on_join')
        self.subscribe(on_client_leave, 'wamp.session.on_leave')

        def get_session(details):
            return self.sessions[details.caller]

        @inlineCallbacks
        def append(data, details):
            session = yield self.call('get_session', details)
            d = session.setdefault('data', [])
            d.append(data)

        @inlineCallbacks
        def get(details):
            session = yield self.call('get_session', details)
            return ''.join(session['data'])

        reg_options = types.RegisterOptions(details_arg='details')
        self.register(get_session, 'get_session')
        self.register(append, 'append', options=reg_options)
        self.register(get, 'get', options=reg_options)

当客户端连接时创建会话 (on_client_join),当客户端断开连接时会话被销毁 (on_client_leave)。

服务器还需要订阅元事件的权限。 config.json:

...
"permissions": [
  {
    "uri": "*",
    "call": true,
    "register": true,
    "subscribe": true,
  }
]
....