如何使用 Multiprocessing 订阅多个 Websocket 流

How to Subscribe to multiple Websocket streams using Muiltiprocessing

我在 python.

中不熟悉处理多处理、多线程等。

我正在尝试使用 multiprocessing 从我的加密货币交易所 (API Docs Here) 订阅多个 Websocket 流。 但是,当我 运行 下面的代码时,我只收到 ticker information,而不是 order book updates

如何修改代码以获取这两个信息?
multiprocessing 上的 运行 时只有一个 websocket 似乎在工作的原因是什么?

(当我 运行 函数 ws_orderBookUpdates()ws_tickerInfo() 分开时,没有使用 multiprocessing,它单独工作正常,所以这不是交易所的问题。)

import websocket
import json
import pprint
from datetime import datetime
import time

# Function to subscribe to ticker information.
def ws_tickerInfo():
    def on_open(self):
        print("opened")
        subscribe_message = {
            "method": "subscribe",
            "params": {'channel': "lightning_ticker_BTC_JPY"}
        }
        ws.send(json.dumps(subscribe_message))

    def on_message(self, message, prev=None):
        print(f"Ticker Info, Received : {datetime.now()}")

        ###### full json payloads ######
        # pprint.pprint(json.loads(message))

    def on_close(self):
        print("closed connection")

    endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
    ws = websocket.WebSocketApp(endpoint,
                                on_open=on_open,
                                on_message=on_message,
                                on_close=on_close)

    ws.run_forever()


# Function to subscribe to order book updates.
def ws_orderBookUpdates():
    def on_open(self):
        print("opened")
        subscribe_message = {
            "method": "subscribe",
            "params": {'channel': "lightning_board_BTC_JPY"}
        }
        ws.send(json.dumps(subscribe_message))

    def on_message(self, message):
        print(f"Order Book, Received : {datetime.now()}")

        ###### full json payloads ######
        # pprint.pprint(json.loads(message))

    def on_close(self):
        print("closed connection")

    endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
    ws = websocket.WebSocketApp(endpoint,
                                on_open=on_open,
                                on_message=on_message,
                                on_close=on_close)
    ws.run_forever()


# Multiprocessing two functions
if __name__ == '__main__':
    import multiprocessing as mp

    mp.Process(target=ws_tickerInfo(), daemon=True).start()
    mp.Process(target=ws_orderBookUpdates(), daemon=True).start()

更新

您已经创建了两个 守护进程 进程。当所有 非守护进程 进程终止时,它们将终止,在本例中是主进程,它在创建守护进程 后立即终止 。你很幸运,即使其中一个过程有机会产生输出,但为什么要冒险呢? 不要使用 dameon 进程。相反:

if __name__ == '__main__':
    import multiprocessing as mp

    p1 = mp.Process(target=ws_tickerInfo)
    p2 = mp.Process(target=ws_orderBookUpdates)
    p1.start()
    p2.start()
    p1.join() # wait for completion
    p2.join() # wait for completion

但真正的问题摆在我们面前,我们都错过了!你有:

    p1 = mp.Process(target=ws_tickerInfo(), daemon=True)
    p2 = mp.Process(target=ws_orderBookUpdates(), daemon=True)

应该是什么时候:

    p1 = mp.Process(target=ws_tickerInfo)
    p2 = mp.Process(target=ws_orderBookUpdates)

看出区别了吗?您实际上并没有将函数 ws_tickerInfo 传递给 Process,而是 调用 ws_tickerInfo 并试图传递 return 值,这将有是荒谬的 None 如果函数曾经 returned(它没有)。所以你甚至都没有执行第二个进程创建语句。

尽管 Ctrl-C 中断处理程序可能不起作用(见下文),但您可能还使用了多线程而不是多处理。还应该有一种机制来终止程序。我添加了一些代码来检测 Ctrl-C 并在输入时终止。此外,您已将 self 用作函数参数,就好像该函数实际上是 class 方法一样,但事实并非如此。这不是好的编程风格。这是更新的来源:

import websocket
import json
import pprint
from datetime import datetime
import time
import sys
import signal

# Function to subscribe to ticker information.
def ws_tickerInfo():
    def on_open(wsapp):
        print("opened")
        subscribe_message = {
            "method": "subscribe",
            "params": {'channel': "lightning_ticker_BTC_JPY"}
        }
        wsapp.send(json.dumps(subscribe_message))

    def on_message(wsapp, message, prev=None):
        print(f"Ticker Info, Received : {datetime.now()}")

        ###### full json payloads ######
        # pprint.pprint(json.loads(message))

    def on_close(wsapp):
        print("closed connection")

    endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
    ws = websocket.WebSocketApp(endpoint,
                                on_open=on_open,
                                on_message=on_message,
                                on_close=on_close)

    ws.run_forever()


# Function to subscribe to order book updates.
def ws_orderBookUpdates():
    def on_open(wsapp):
        print("opened")
        subscribe_message = {
            "method": "subscribe",
            "params": {'channel': "lightning_board_BTC_JPY"}
        }
        wsapp.send(json.dumps(subscribe_message))

    def on_message(wsapp, message):
        print(f"Order Book, Received : {datetime.now()}")

        ###### full json payloads ######
        # pprint.pprint(json.loads(message))

    def on_close(wsapp):
        print("closed connection")

    endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
    ws = websocket.WebSocketApp(endpoint,
                                on_open=on_open,
                                on_message=on_message,
                                on_close=on_close)
    ws.run_forever()

def handle_ctrl_c(signum, stack_frame):
    sys.exit(0)

if __name__ == '__main__':
    import multiprocessing as mp

    signal.signal(signal.SIGINT, handle_ctrl_c) # terminate on ctrl-c
    print('Enter Ctrl-C to terminate.')
    p1 = mp.Process(target=ws_tickerInfo)
    p2 = mp.Process(target=ws_orderBookUpdates)
    p1.start()
    p2.start()
    p1.join() # wait for completion (will never happen)
    p2.join() # wait for completion (will never happen)

使用多线程

if __name__ == '__main__':
    import threading

    t1 = threading.Thread(target=ws_tickerInfo, daemon=True)
    t2 = threading.Thread(target=ws_orderBookUpdates, daemon=True)
    t1.start()
    t2.start()
    input('Hit enter to terminate...\n')