如何使用 Multiprocessing 订阅多个 Websocket 流
How to Subscribe to multiple Websocket streams using Muiltiprocessing
我在 python.
中不熟悉处理多处理、多线程等。
我正在尝试使用 multiprocessing
从我的加密货币交易所 (API Docs Here) 订阅多个 Websocket 流。
但是,当我 运行 下面的代码时,我只收到 ticker information
,而不是 order book updates
。
如何修改代码以获取这两个信息?
multiprocessing
上的 运行 时只有一个 websocket 似乎在工作的原因是什么?
(当我 运行 函数 ws_orderBookUpdates()
和 ws_tickerInfo()
分开时,没有使用 multiprocessing
,它单独工作正常,所以这不是交易所的问题。)
import websocket
import json
import pprint
from datetime import datetime
import time
# Function to subscribe to ticker information.
def ws_tickerInfo():
def on_open(self):
print("opened")
subscribe_message = {
"method": "subscribe",
"params": {'channel': "lightning_ticker_BTC_JPY"}
}
ws.send(json.dumps(subscribe_message))
def on_message(self, message, prev=None):
print(f"Ticker Info, Received : {datetime.now()}")
###### full json payloads ######
# pprint.pprint(json.loads(message))
def on_close(self):
print("closed connection")
endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
ws = websocket.WebSocketApp(endpoint,
on_open=on_open,
on_message=on_message,
on_close=on_close)
ws.run_forever()
# Function to subscribe to order book updates.
def ws_orderBookUpdates():
def on_open(self):
print("opened")
subscribe_message = {
"method": "subscribe",
"params": {'channel': "lightning_board_BTC_JPY"}
}
ws.send(json.dumps(subscribe_message))
def on_message(self, message):
print(f"Order Book, Received : {datetime.now()}")
###### full json payloads ######
# pprint.pprint(json.loads(message))
def on_close(self):
print("closed connection")
endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
ws = websocket.WebSocketApp(endpoint,
on_open=on_open,
on_message=on_message,
on_close=on_close)
ws.run_forever()
# Multiprocessing two functions
if __name__ == '__main__':
import multiprocessing as mp
mp.Process(target=ws_tickerInfo(), daemon=True).start()
mp.Process(target=ws_orderBookUpdates(), daemon=True).start()
更新
您已经创建了两个 守护进程 进程。当所有 非守护进程 进程终止时,它们将终止,在本例中是主进程,它在创建守护进程 后立即终止 。你很幸运,即使其中一个过程有机会产生输出,但为什么要冒险呢? 不要使用 dameon 进程。相反:
if __name__ == '__main__':
import multiprocessing as mp
p1 = mp.Process(target=ws_tickerInfo)
p2 = mp.Process(target=ws_orderBookUpdates)
p1.start()
p2.start()
p1.join() # wait for completion
p2.join() # wait for completion
但真正的问题摆在我们面前,我们都错过了!你有:
p1 = mp.Process(target=ws_tickerInfo(), daemon=True)
p2 = mp.Process(target=ws_orderBookUpdates(), daemon=True)
应该是什么时候:
p1 = mp.Process(target=ws_tickerInfo)
p2 = mp.Process(target=ws_orderBookUpdates)
看出区别了吗?您实际上并没有将函数 ws_tickerInfo
传递给 Process
,而是 调用 ws_tickerInfo
并试图传递 return 值,这将有是荒谬的 None
如果函数曾经 returned(它没有)。所以你甚至都没有执行第二个进程创建语句。
尽管 Ctrl-C 中断处理程序可能不起作用(见下文),但您可能还使用了多线程而不是多处理。还应该有一种机制来终止程序。我添加了一些代码来检测 Ctrl-C 并在输入时终止。此外,您已将 self
用作函数参数,就好像该函数实际上是 class 方法一样,但事实并非如此。这不是好的编程风格。这是更新的来源:
import websocket
import json
import pprint
from datetime import datetime
import time
import sys
import signal
# Function to subscribe to ticker information.
def ws_tickerInfo():
def on_open(wsapp):
print("opened")
subscribe_message = {
"method": "subscribe",
"params": {'channel': "lightning_ticker_BTC_JPY"}
}
wsapp.send(json.dumps(subscribe_message))
def on_message(wsapp, message, prev=None):
print(f"Ticker Info, Received : {datetime.now()}")
###### full json payloads ######
# pprint.pprint(json.loads(message))
def on_close(wsapp):
print("closed connection")
endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
ws = websocket.WebSocketApp(endpoint,
on_open=on_open,
on_message=on_message,
on_close=on_close)
ws.run_forever()
# Function to subscribe to order book updates.
def ws_orderBookUpdates():
def on_open(wsapp):
print("opened")
subscribe_message = {
"method": "subscribe",
"params": {'channel': "lightning_board_BTC_JPY"}
}
wsapp.send(json.dumps(subscribe_message))
def on_message(wsapp, message):
print(f"Order Book, Received : {datetime.now()}")
###### full json payloads ######
# pprint.pprint(json.loads(message))
def on_close(wsapp):
print("closed connection")
endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
ws = websocket.WebSocketApp(endpoint,
on_open=on_open,
on_message=on_message,
on_close=on_close)
ws.run_forever()
def handle_ctrl_c(signum, stack_frame):
sys.exit(0)
if __name__ == '__main__':
import multiprocessing as mp
signal.signal(signal.SIGINT, handle_ctrl_c) # terminate on ctrl-c
print('Enter Ctrl-C to terminate.')
p1 = mp.Process(target=ws_tickerInfo)
p2 = mp.Process(target=ws_orderBookUpdates)
p1.start()
p2.start()
p1.join() # wait for completion (will never happen)
p2.join() # wait for completion (will never happen)
使用多线程
if __name__ == '__main__':
import threading
t1 = threading.Thread(target=ws_tickerInfo, daemon=True)
t2 = threading.Thread(target=ws_orderBookUpdates, daemon=True)
t1.start()
t2.start()
input('Hit enter to terminate...\n')
我在 python.
中不熟悉处理多处理、多线程等。我正在尝试使用 multiprocessing
从我的加密货币交易所 (API Docs Here) 订阅多个 Websocket 流。
但是,当我 运行 下面的代码时,我只收到 ticker information
,而不是 order book updates
。
如何修改代码以获取这两个信息?
multiprocessing
上的 运行 时只有一个 websocket 似乎在工作的原因是什么?
(当我 运行 函数 ws_orderBookUpdates()
和 ws_tickerInfo()
分开时,没有使用 multiprocessing
,它单独工作正常,所以这不是交易所的问题。)
import websocket
import json
import pprint
from datetime import datetime
import time
# Function to subscribe to ticker information.
def ws_tickerInfo():
def on_open(self):
print("opened")
subscribe_message = {
"method": "subscribe",
"params": {'channel': "lightning_ticker_BTC_JPY"}
}
ws.send(json.dumps(subscribe_message))
def on_message(self, message, prev=None):
print(f"Ticker Info, Received : {datetime.now()}")
###### full json payloads ######
# pprint.pprint(json.loads(message))
def on_close(self):
print("closed connection")
endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
ws = websocket.WebSocketApp(endpoint,
on_open=on_open,
on_message=on_message,
on_close=on_close)
ws.run_forever()
# Function to subscribe to order book updates.
def ws_orderBookUpdates():
def on_open(self):
print("opened")
subscribe_message = {
"method": "subscribe",
"params": {'channel': "lightning_board_BTC_JPY"}
}
ws.send(json.dumps(subscribe_message))
def on_message(self, message):
print(f"Order Book, Received : {datetime.now()}")
###### full json payloads ######
# pprint.pprint(json.loads(message))
def on_close(self):
print("closed connection")
endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
ws = websocket.WebSocketApp(endpoint,
on_open=on_open,
on_message=on_message,
on_close=on_close)
ws.run_forever()
# Multiprocessing two functions
if __name__ == '__main__':
import multiprocessing as mp
mp.Process(target=ws_tickerInfo(), daemon=True).start()
mp.Process(target=ws_orderBookUpdates(), daemon=True).start()
更新
您已经创建了两个 守护进程 进程。当所有 非守护进程 进程终止时,它们将终止,在本例中是主进程,它在创建守护进程 后立即终止 。你很幸运,即使其中一个过程有机会产生输出,但为什么要冒险呢? 不要使用 dameon 进程。相反:
if __name__ == '__main__':
import multiprocessing as mp
p1 = mp.Process(target=ws_tickerInfo)
p2 = mp.Process(target=ws_orderBookUpdates)
p1.start()
p2.start()
p1.join() # wait for completion
p2.join() # wait for completion
但真正的问题摆在我们面前,我们都错过了!你有:
p1 = mp.Process(target=ws_tickerInfo(), daemon=True)
p2 = mp.Process(target=ws_orderBookUpdates(), daemon=True)
应该是什么时候:
p1 = mp.Process(target=ws_tickerInfo)
p2 = mp.Process(target=ws_orderBookUpdates)
看出区别了吗?您实际上并没有将函数 ws_tickerInfo
传递给 Process
,而是 调用 ws_tickerInfo
并试图传递 return 值,这将有是荒谬的 None
如果函数曾经 returned(它没有)。所以你甚至都没有执行第二个进程创建语句。
尽管 Ctrl-C 中断处理程序可能不起作用(见下文),但您可能还使用了多线程而不是多处理。还应该有一种机制来终止程序。我添加了一些代码来检测 Ctrl-C 并在输入时终止。此外,您已将 self
用作函数参数,就好像该函数实际上是 class 方法一样,但事实并非如此。这不是好的编程风格。这是更新的来源:
import websocket
import json
import pprint
from datetime import datetime
import time
import sys
import signal
# Function to subscribe to ticker information.
def ws_tickerInfo():
def on_open(wsapp):
print("opened")
subscribe_message = {
"method": "subscribe",
"params": {'channel': "lightning_ticker_BTC_JPY"}
}
wsapp.send(json.dumps(subscribe_message))
def on_message(wsapp, message, prev=None):
print(f"Ticker Info, Received : {datetime.now()}")
###### full json payloads ######
# pprint.pprint(json.loads(message))
def on_close(wsapp):
print("closed connection")
endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
ws = websocket.WebSocketApp(endpoint,
on_open=on_open,
on_message=on_message,
on_close=on_close)
ws.run_forever()
# Function to subscribe to order book updates.
def ws_orderBookUpdates():
def on_open(wsapp):
print("opened")
subscribe_message = {
"method": "subscribe",
"params": {'channel': "lightning_board_BTC_JPY"}
}
wsapp.send(json.dumps(subscribe_message))
def on_message(wsapp, message):
print(f"Order Book, Received : {datetime.now()}")
###### full json payloads ######
# pprint.pprint(json.loads(message))
def on_close(wsapp):
print("closed connection")
endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
ws = websocket.WebSocketApp(endpoint,
on_open=on_open,
on_message=on_message,
on_close=on_close)
ws.run_forever()
def handle_ctrl_c(signum, stack_frame):
sys.exit(0)
if __name__ == '__main__':
import multiprocessing as mp
signal.signal(signal.SIGINT, handle_ctrl_c) # terminate on ctrl-c
print('Enter Ctrl-C to terminate.')
p1 = mp.Process(target=ws_tickerInfo)
p2 = mp.Process(target=ws_orderBookUpdates)
p1.start()
p2.start()
p1.join() # wait for completion (will never happen)
p2.join() # wait for completion (will never happen)
使用多线程
if __name__ == '__main__':
import threading
t1 = threading.Thread(target=ws_tickerInfo, daemon=True)
t2 = threading.Thread(target=ws_orderBookUpdates, daemon=True)
t1.start()
t2.start()
input('Hit enter to terminate...\n')