RxPy:如何从外部回调创建热可观察对象并订阅多个异步进程?
RxPy: How to create hot observable from external callback and subscribe multiple asynchronous processes?
我有一个外部服务 (ExternalDummyService),我在其中注册了一个回调。我想从该回调创建一个可观察对象并订阅多个异步进程。
pyfiddle 中的完整代码:https://pyfiddle.io/fiddle/da1e1d53-2e34-4742-a0b9-07838f2c13df
* 请注意,在 pyfiddle 版本中,"sleeps" 被替换为 "for i in range(10000): foo += i",因为 sleep 无法正常工作。
主要代码是这样的:
thread = ExternalDummyService()
external_obs = thread.subject.publish()
external_obs.subscribe(slow_process)
external_obs.subscribe(fast_process)
external_obs.connect()
thread.start()
class ExternalDummyService(Thread):
def __init__(self):
self.subject = Subject()
def run(self):
for i in range(5):
dummy_msg = { ... }
self.subject.on_next(dummy_msg)
def fast_process(msg):
print("FAST {0} {1}".format(msg["counter"], 1000*(time() - msg["timestamp"])))
sleep(0.1)
def slow_process(msg):
print("SLOW {0} {1}".format(msg["counter"], 1000*(time() - msg["timestamp"])))
sleep(1)
我得到的输出是这个,两个进程 运行 同步并且 ExternalDummyService 在两个进程完成每次执行之前不会发出新值:
emitting 0
STARTED
SLOW 0 1.0008811950683594
FAST 0 2.0122528076171875
emitting 1
SLOW 1 1.5070438385009766
FAST 1 1.5070438385009766
emitting 2
SLOW 2 0.5052089691162109
FAST 2 0.9891986846923828
emitting 3
SLOW 3 1.0006427764892578
FAST 3 1.0006427764892578
emitting 4
SLOW 4 1.0013580322265625
FAST 4 1.0013580322265625
FINISHED
我想要这样的东西,服务发出消息而不等待进程 运行 和进程 运行 异步:
STARTED
emitting 0
emitting 1
emitting 2
FAST 0 2.0122528076171875
FAST 1 1.5070438385009766
emitting 3
SLOW 0 1.0008811950683594
FAST 2 0.9891986846923828
emitting 4
FAST 3 1.0006427764892578
SLOW 1 1.5070438385009766
FAST 4 1.0013580322265625
SLOW 2 0.5052089691162109
SLOW 3 1.0006427764892578
SLOW 4 1.0013580322265625
FINISHED
我尝试过使用 share()、ThreadPoolScheduler 和其他我不知道我在做什么的东西。
谢谢!
使用这个问题的答案:RxJava concurrency with multiple subscribers and events
...我用这段代码达到了预期的结果:
optimal_thread_count = cpu_count()
pool_scheduler = ThreadPoolScheduler(optimal_thread_count)
thread = ExternalDummyService()
external_obs = thread.subject.publish()
external_obs \
.flat_map(lambda msg: Observable.just(msg).subscribe_on(pool_scheduler)) \
.subscribe(fast_process)
external_obs \
.flat_map(lambda msg: Observable.just(msg).subscribe_on(pool_scheduler)) \
.subscribe(slow_process)
external_obs.connect()
thread.start()
完整版:https://pyfiddle.io/fiddle/20f8871c-48d6-4d6b-b1a4-fdd0a4aa6f95/?m=Saved%20fiddle
输出为:
emitting 0
emitting 1
emitting 2
emitting 3
emitting 4
FAST 0 52.629709243774414
FAST 1 51.12814903259277
FAST 2 100.2051830291748
FAST 3 151.2434482574463
SLOW 0 503.0245780944824
SLOW 1 502.0263195037842
FAST 4 548.7725734710693
SLOW 2 551.4400005340576
SLOW 3 652.1098613739014
SLOW 4 1000.3445148468018
请随时提出任何改进建议。
我有一个外部服务 (ExternalDummyService),我在其中注册了一个回调。我想从该回调创建一个可观察对象并订阅多个异步进程。
pyfiddle 中的完整代码:https://pyfiddle.io/fiddle/da1e1d53-2e34-4742-a0b9-07838f2c13df * 请注意,在 pyfiddle 版本中,"sleeps" 被替换为 "for i in range(10000): foo += i",因为 sleep 无法正常工作。
主要代码是这样的:
thread = ExternalDummyService()
external_obs = thread.subject.publish()
external_obs.subscribe(slow_process)
external_obs.subscribe(fast_process)
external_obs.connect()
thread.start()
class ExternalDummyService(Thread):
def __init__(self):
self.subject = Subject()
def run(self):
for i in range(5):
dummy_msg = { ... }
self.subject.on_next(dummy_msg)
def fast_process(msg):
print("FAST {0} {1}".format(msg["counter"], 1000*(time() - msg["timestamp"])))
sleep(0.1)
def slow_process(msg):
print("SLOW {0} {1}".format(msg["counter"], 1000*(time() - msg["timestamp"])))
sleep(1)
我得到的输出是这个,两个进程 运行 同步并且 ExternalDummyService 在两个进程完成每次执行之前不会发出新值:
emitting 0
STARTED
SLOW 0 1.0008811950683594
FAST 0 2.0122528076171875
emitting 1
SLOW 1 1.5070438385009766
FAST 1 1.5070438385009766
emitting 2
SLOW 2 0.5052089691162109
FAST 2 0.9891986846923828
emitting 3
SLOW 3 1.0006427764892578
FAST 3 1.0006427764892578
emitting 4
SLOW 4 1.0013580322265625
FAST 4 1.0013580322265625
FINISHED
我想要这样的东西,服务发出消息而不等待进程 运行 和进程 运行 异步:
STARTED
emitting 0
emitting 1
emitting 2
FAST 0 2.0122528076171875
FAST 1 1.5070438385009766
emitting 3
SLOW 0 1.0008811950683594
FAST 2 0.9891986846923828
emitting 4
FAST 3 1.0006427764892578
SLOW 1 1.5070438385009766
FAST 4 1.0013580322265625
SLOW 2 0.5052089691162109
SLOW 3 1.0006427764892578
SLOW 4 1.0013580322265625
FINISHED
我尝试过使用 share()、ThreadPoolScheduler 和其他我不知道我在做什么的东西。
谢谢!
使用这个问题的答案:RxJava concurrency with multiple subscribers and events
...我用这段代码达到了预期的结果:
optimal_thread_count = cpu_count()
pool_scheduler = ThreadPoolScheduler(optimal_thread_count)
thread = ExternalDummyService()
external_obs = thread.subject.publish()
external_obs \
.flat_map(lambda msg: Observable.just(msg).subscribe_on(pool_scheduler)) \
.subscribe(fast_process)
external_obs \
.flat_map(lambda msg: Observable.just(msg).subscribe_on(pool_scheduler)) \
.subscribe(slow_process)
external_obs.connect()
thread.start()
完整版:https://pyfiddle.io/fiddle/20f8871c-48d6-4d6b-b1a4-fdd0a4aa6f95/?m=Saved%20fiddle
输出为:
emitting 0
emitting 1
emitting 2
emitting 3
emitting 4
FAST 0 52.629709243774414
FAST 1 51.12814903259277
FAST 2 100.2051830291748
FAST 3 151.2434482574463
SLOW 0 503.0245780944824
SLOW 1 502.0263195037842
FAST 4 548.7725734710693
SLOW 2 551.4400005340576
SLOW 3 652.1098613739014
SLOW 4 1000.3445148468018
请随时提出任何改进建议。