Deferred 中未处理的错误?在 Twisted 中使用 UNIX 套接字

Unhandled error in Deferred? Using UNIX socket with Twisted

我对网络编程和事件驱动事件完全陌生。但是,我能够使用我的机器(服务器)和用于测试的客户端机器(命令行)之间的 TCP 连接成功实施发布-订阅方案。但是,我需要实际使用带有 Twisted 的 UNIX 套接字。

我在 运行 代码时收到以下错误:

Unhandled error in Deferred

这是我的 pub_sub.py 代码:

"""
a networking implementation of PubSub using Twisted.
=============
PubSub Server
=============
A PubSub server listens for subscription requests and publish commands, and, when
published to, sends data to subscribers. All incoming and outgoing requests are
encoded in JSON.
A Subscribe request looks like this:
    {
        "command": "subscribe",
        "topic": "hello"
    }
A Publish request looks like this:
    {
        "command": "publish",
        "topic": "hello",
        "data": {
            "world": "WORLD"
        }
    }
When the server receives a Publish request, it will send the 'data' object to all
subscribers of 'topic'.
"""

import argparse
import json
import logging

from collections import defaultdict

from twisted.internet import reactor
from twisted.python import log
from twisted.python.filepath import FilePath
from twisted.internet.endpoints import UNIXClientEndpoint, UNIXServerEndpoint, \
                                       connectProtocol
from twisted.internet.protocol import Protocol, Factory


class PubSubProtocol(Protocol):

    def __init__(self, topics):
        self.topics = topics
        self.subscribed_topic = None

    def connectionLost(self, reason):
        print("Connection lost: {}".format(reason))
        if self.subscribed_topic:
            self.topics[self.subscribed_topic].remove(self)

    def dataReceived(self, data):
        print("Data received: {}".format(data))
        try:
            request = json.loads(data)
        except ValueError:
            logging.debug("ValueError on deconding incoming data. "
                          "Data: {}".format(data), exc_info=True)
            self.transport.loseConnection()
            return

        if request['command'] == 'subscribe':
            self.handle_subscribe(request['topic'])
        elif request['command'] == 'publish':
            self.handle_publish(request['topic'], request['data'])

    def handle_subscribe(self, topic):
        print("Subscribed to topic: {}".format(topic))
        self.topics[topic].add(self)
        self.subscribed_topic = topic

    def handle_publish(self, topic, data):
        request = json.dumps(data)

        for protocol in self.topics[topic]:
            protocol.transport.write(request)
        print("Publish sent for topic: {}".format(topic))


class PubSubFactory(Factory):

    def __init__(self):
        self.topics = defaultdict(set)

    def buildProtocol(self, addr):
        return PubSubProtocol(self.topics)


class PublisherProtocol(Protocol):
    """
    Publish protocol for sending data to client, i.e. front-end web GUI.
    """
    def __init__(self, topic, **kwargs):
        self.topic = topic
        self.kwargs = kwargs

    def connectionMade(self):
        request = json.dumps({
            'command': 'publish',
            'topic': self.topic,
            'data': self.kwargs,
        })

        self.transport.write(request)
        self.transport.loseConnection()


class SubscriberProtocol(Protocol):
    """
    Subscriber protocol for client sending a request to subscribe to a specific
    topic.
    """
    def __init__(self, topic, callback):
        self.topic = topic
        self.callback = callback

    def connectionMade(self):
        request = json.dumps({
            'command': 'subscribe',
            'topic': self.topic,
        })

        self.transport.write(request)

    def dataReceived(self, data):
        kwargs = json.loads(data)

        self.callback(**kwargs)


class PubSub(object):

    def __init__(self, path='./.sock'):
        self.path = FilePath(path)
        self.reactor = reactor

    def _make_connection(self, protocol):
        endpoint = UNIXClientEndpoint(reactor, self.path)
        connection = connectProtocol(endpoint, protocol)

    def subscribe(self, topic, callback):
        """
        Subscribe 'callback' callable to 'topic'.
        """
        sub = SubscriberProtocol(topic, callback)
        self._make_connection(sub)

    def publish(self, topic, **kwargs):
        """
        Publish 'kwargs' to 'topic', calling all callables subscribed to 'topic'
        with the arguments specified in '**kwargs'.
        """
        pub = PublisherProtocol(topic, **kwargs)
        self._make_connection(pub)

    def run(self):
        """
        Convenience method to start the Twisted event loop.
        """
        self.reactor.run()


def main():
    path = FilePath("./.sock")
    endpoint = UNIXServerEndpoint(reactor, path)
    endpoint.listen(PubSubFactory())

    reactor.run()


if __name__ == '__main__':
    main()

对于我做错的任何帮助将不胜感激。

谢谢,

布莱恩

您似乎是 运行 您在 Windows 上的软件。唉,UNIX 套接字在 Windows 上不可用。如果要使用 UNIX 套接字,则需要使用更 POSIX 风格的环境 - Linux、*BSD、OS X 等