运行 并发线程中的加密提要:线程中没有当前事件循环 'ThreadPoolExecutor-0_0'
run crypto feed in a concurrent thread: There is no current event loop in thread 'ThreadPoolExecutor-0_0'
我正在尝试使用加密提要同时下载数据。
f = FeedHandler()
f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
f.run()
上面这段代码可以运行成功。但是,我试图在后台 运行 它。所以我正在使用并发期货来提供帮助。
executor = concurrent.futures.ThreadPoolExecutor(16)
job2 = executor.submit(f.run)
但是,我得到了错误:
job2.result()
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
<ipython-input-54-f96e35ee3c66> in <module>
----> 1 job2.result()
~/anaconda3/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
430 raise CancelledError()
431 elif self._state == FINISHED:
--> 432 return self.__get_result()
433
434 self._condition.wait(timeout)
~/anaconda3/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
386 def __get_result(self):
387 if self._exception:
--> 388 raise self._exception
389 else:
390 return self._result
~/anaconda3/lib/python3.8/concurrent/futures/thread.py in run(self)
55
56 try:
---> 57 result = self.fn(*self.args, **self.kwargs)
58 except BaseException as exc:
59 self.future.set_exception(exc)
~/anaconda3/lib/python3.8/site-packages/cryptofeed/feedhandler.py in run(self, start_loop, install_signal_handlers, exception_handler)
145 raise ValueError(txt)
146
--> 147 loop = asyncio.get_event_loop()
148 # Good to enable when debugging or without code change: export PYTHONASYNCIODEBUG=1)
149 # loop.set_debug(True)
~/anaconda3/lib/python3.8/asyncio/events.py in get_event_loop(self)
637
638 if self._local._loop is None:
--> 639 raise RuntimeError('There is no current event loop in thread %r.'
640 % threading.current_thread().name)
641
RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-0_0'.
有人能帮帮我吗?非常感谢!
编辑:关注
def threadable():
f = FeedHandler()
f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
f.run()
executor = concurrent.futures.ThreadPoolExecutor(16)
job2 = executor.submit(threadable)
job2.done()
job2.result()
我收到错误:关于事件循环,我似乎仍然遇到同样的错误...它可以解决吗?
RuntimeError Traceback (most recent call last)
<ipython-input-47-05c023dd326f> in <module>
11 job2.done()
12
---> 13 job2.result()
~/anaconda3/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
437 raise CancelledError()
438 elif self._state == FINISHED:
--> 439 return self.__get_result()
440 else:
441 raise TimeoutError()
~/anaconda3/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
386 def __get_result(self):
387 if self._exception:
--> 388 raise self._exception
389 else:
390 return self._result
~/anaconda3/lib/python3.8/concurrent/futures/thread.py in run(self)
55
56 try:
---> 57 result = self.fn(*self.args, **self.kwargs)
58 except BaseException as exc:
59 self.future.set_exception(exc)
<ipython-input-47-05c023dd326f> in threadable()
2 f = FeedHandler()
3 f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
----> 4 f.run()
5
6
~/anaconda3/lib/python3.8/site-packages/cryptofeed/feedhandler.py in run(self, start_loop, install_signal_handlers, exception_handler)
145 raise ValueError(txt)
146
--> 147 loop = asyncio.get_event_loop()
148 # Good to enable when debugging or without code change: export PYTHONASYNCIODEBUG=1)
149 # loop.set_debug(True)
~/anaconda3/lib/python3.8/asyncio/events.py in get_event_loop(self)
637
638 if self._local._loop is None:
--> 639 raise RuntimeError('There is no current event loop in thread %r.'
640 % threading.current_thread().name)
641
RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-1_0'.
在代码的单线程版本中,所有这三个语句以简单的顺序方式在同一个线程中执行:
f = FeedHandler()
f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
f.run()
在多线程版本中,您只将最后一行提交给执行器,因此它将 运行 在辅助线程中。但是这些语句,据我从你提供的代码中可以看出,仍然在主线程中执行:
f = FeedHandler()
f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
你怎么知道这会奏效?一般来说,这将取决于 Gateio 和 Feedhandler 的实现细节。您需要非常小心地将程序分割成不同线程中的 运行 部分,尤其是在涉及第三方库调用时。那么,祝你好运。
你可以试试这个:
def threadable():
f = FeedHandler()
f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
f.run()
...
executor = concurrent.futures.ThreadPoolExecutor(16)
job2 = executor.submit(threadable)
那么,至少,您的整个步骤序列将在同一个线程中执行。
不过,我会担心那些回调。他们现在将 运行 在辅助线程中,您需要了解其后果。他们是否与用户界面程序交互?您的 UI 可能不支持多线程。
Executor 协议的使用在这里有点奇怪,因为您的函数没有 return 值。执行器在用于聚合 returned 值时最有用。您最好使用 threading
模块中的方法启动您需要的线程。
我正在尝试使用加密提要同时下载数据。
f = FeedHandler()
f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
f.run()
上面这段代码可以运行成功。但是,我试图在后台 运行 它。所以我正在使用并发期货来提供帮助。
executor = concurrent.futures.ThreadPoolExecutor(16)
job2 = executor.submit(f.run)
但是,我得到了错误:
job2.result()
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
<ipython-input-54-f96e35ee3c66> in <module>
----> 1 job2.result()
~/anaconda3/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
430 raise CancelledError()
431 elif self._state == FINISHED:
--> 432 return self.__get_result()
433
434 self._condition.wait(timeout)
~/anaconda3/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
386 def __get_result(self):
387 if self._exception:
--> 388 raise self._exception
389 else:
390 return self._result
~/anaconda3/lib/python3.8/concurrent/futures/thread.py in run(self)
55
56 try:
---> 57 result = self.fn(*self.args, **self.kwargs)
58 except BaseException as exc:
59 self.future.set_exception(exc)
~/anaconda3/lib/python3.8/site-packages/cryptofeed/feedhandler.py in run(self, start_loop, install_signal_handlers, exception_handler)
145 raise ValueError(txt)
146
--> 147 loop = asyncio.get_event_loop()
148 # Good to enable when debugging or without code change: export PYTHONASYNCIODEBUG=1)
149 # loop.set_debug(True)
~/anaconda3/lib/python3.8/asyncio/events.py in get_event_loop(self)
637
638 if self._local._loop is None:
--> 639 raise RuntimeError('There is no current event loop in thread %r.'
640 % threading.current_thread().name)
641
RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-0_0'.
有人能帮帮我吗?非常感谢!
编辑:关注
def threadable():
f = FeedHandler()
f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
f.run()
executor = concurrent.futures.ThreadPoolExecutor(16)
job2 = executor.submit(threadable)
job2.done()
job2.result()
我收到错误:关于事件循环,我似乎仍然遇到同样的错误...它可以解决吗?
RuntimeError Traceback (most recent call last)
<ipython-input-47-05c023dd326f> in <module>
11 job2.done()
12
---> 13 job2.result()
~/anaconda3/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
437 raise CancelledError()
438 elif self._state == FINISHED:
--> 439 return self.__get_result()
440 else:
441 raise TimeoutError()
~/anaconda3/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
386 def __get_result(self):
387 if self._exception:
--> 388 raise self._exception
389 else:
390 return self._result
~/anaconda3/lib/python3.8/concurrent/futures/thread.py in run(self)
55
56 try:
---> 57 result = self.fn(*self.args, **self.kwargs)
58 except BaseException as exc:
59 self.future.set_exception(exc)
<ipython-input-47-05c023dd326f> in threadable()
2 f = FeedHandler()
3 f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
----> 4 f.run()
5
6
~/anaconda3/lib/python3.8/site-packages/cryptofeed/feedhandler.py in run(self, start_loop, install_signal_handlers, exception_handler)
145 raise ValueError(txt)
146
--> 147 loop = asyncio.get_event_loop()
148 # Good to enable when debugging or without code change: export PYTHONASYNCIODEBUG=1)
149 # loop.set_debug(True)
~/anaconda3/lib/python3.8/asyncio/events.py in get_event_loop(self)
637
638 if self._local._loop is None:
--> 639 raise RuntimeError('There is no current event loop in thread %r.'
640 % threading.current_thread().name)
641
RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-1_0'.
在代码的单线程版本中,所有这三个语句以简单的顺序方式在同一个线程中执行:
f = FeedHandler()
f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
f.run()
在多线程版本中,您只将最后一行提交给执行器,因此它将 运行 在辅助线程中。但是这些语句,据我从你提供的代码中可以看出,仍然在主线程中执行:
f = FeedHandler()
f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
你怎么知道这会奏效?一般来说,这将取决于 Gateio 和 Feedhandler 的实现细节。您需要非常小心地将程序分割成不同线程中的 运行 部分,尤其是在涉及第三方库调用时。那么,祝你好运。
你可以试试这个:
def threadable():
f = FeedHandler()
f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
f.run()
...
executor = concurrent.futures.ThreadPoolExecutor(16)
job2 = executor.submit(threadable)
那么,至少,您的整个步骤序列将在同一个线程中执行。
不过,我会担心那些回调。他们现在将 运行 在辅助线程中,您需要了解其后果。他们是否与用户界面程序交互?您的 UI 可能不支持多线程。
Executor 协议的使用在这里有点奇怪,因为您的函数没有 return 值。执行器在用于聚合 returned 值时最有用。您最好使用 threading
模块中的方法启动您需要的线程。