在 ReactiveX 中,如何将其他参数传递给 Observer.create?
in ReactiveX, how do I pass other parameters to Observer.create?
使用 RxPY 进行说明。
我想从函数创建一个可观察对象,但该函数必须带有参数。这个特定示例必须 return,以随机间隔,我要发送给它的许多预定义代码之一。到目前为止,我的解决方案是使用闭包:
from __future__ import print_function
from rx import Observable
import random
import string
import time
def make_tickers(n = 300, s = 123):
""" generates up to n unique 3-letter strings geach makde up of uppsercase letters"""
random.seed(s)
tickers = [''.join(random.choice(string.ascii_uppercase) for _ in range(3)) for y in range(n)]
tickers = list(set(tickers)) # unique
print(len(tickers))
return(tickers)
def spawn_prices_fn(tickers):
""" returns a function that will return a random element
out of tickers every 20-100 ms, and takes an observable parameter """
def spawner(observer):
while True:
next_tick = random.choice(tickers)
observer.on_next(next_tick)
time.sleep(random.randint(20, 100)/1000.0)
return(spawner)
if __name__ == "__main__":
spawned = spawn_prices_fn(make_tickers())
xx = Observable.create(spawned)
xx.subscribe(lambda s: print(s))
有没有更简单的方法?是否可以将更多参数发送到 Observable.create 的第一个参数函数,不需要闭包?什么是规范建议?
可以通过多种方式完成,这里是一种不会过多更改您的代码的解决方案。
请注意,代码生成也可以分解为生成单个字符串的函数,结合一些 rx
魔法,使其更 rx
-like
我也稍微调整了代码,让flake8
开心
from __future__ import print_function
import random
import string
import time
from rx import Observable
def make_tickers(n=300, s=123):
"""
Generates up to n unique 3-letter strings each made up of uppercase letters
"""
random.seed(s)
tickers = [''.join(random.choice(string.ascii_uppercase) for _ in range(3))
for y in range(n)]
tickers = list(set(tickers)) # unique
print(len(tickers))
return(tickers)
def random_picker(tickers):
ticker = random.choice(tickers)
time.sleep(random.randint(20, 100) / 1000.0)
return ticker
if __name__ == "__main__":
xx = Observable\
.repeat(make_tickers())\
.map(random_picker)\
.subscribe(lambda s: print(s))
或没有make_tickers
的解决方案:
from __future__ import print_function
import random
import string
import time
from rx import Observable
def random_picker(tickers):
ticker = random.choice(tickers)
time.sleep(random.randint(20, 100) / 1000.0)
return ticker
if __name__ == "__main__":
random.seed(123)
Observable.range(1, 300)\
.map(lambda _: ''.join(random.choice(string.ascii_uppercase)
for _ in range(3)))\
.reduce(lambda x, y: x + [y], [])\
.do_while(lambda _: True)\
.map(random_picker)\
.subscribe(lambda s: print(s))
time.sleep
可以从 random_picker
移开,但代码会变得有点棘手
您还可以使用“partials”来包装您的 Subscription 方法。它允许您定义其他参数,但在仅等待 Observer 和 Scheduler 的方法上调用 rx.create:
def my_subscription_with_arguments(observer, scheduler, arg1):
observer.on_next(arg1)
my_subscription_wrapper = functools.partial(my_subscription_with_arguments, arg1='hello')
source = rx.create(my_subscription_wrapper)
使用 RxPY 进行说明。
我想从函数创建一个可观察对象,但该函数必须带有参数。这个特定示例必须 return,以随机间隔,我要发送给它的许多预定义代码之一。到目前为止,我的解决方案是使用闭包:
from __future__ import print_function
from rx import Observable
import random
import string
import time
def make_tickers(n = 300, s = 123):
""" generates up to n unique 3-letter strings geach makde up of uppsercase letters"""
random.seed(s)
tickers = [''.join(random.choice(string.ascii_uppercase) for _ in range(3)) for y in range(n)]
tickers = list(set(tickers)) # unique
print(len(tickers))
return(tickers)
def spawn_prices_fn(tickers):
""" returns a function that will return a random element
out of tickers every 20-100 ms, and takes an observable parameter """
def spawner(observer):
while True:
next_tick = random.choice(tickers)
observer.on_next(next_tick)
time.sleep(random.randint(20, 100)/1000.0)
return(spawner)
if __name__ == "__main__":
spawned = spawn_prices_fn(make_tickers())
xx = Observable.create(spawned)
xx.subscribe(lambda s: print(s))
有没有更简单的方法?是否可以将更多参数发送到 Observable.create 的第一个参数函数,不需要闭包?什么是规范建议?
可以通过多种方式完成,这里是一种不会过多更改您的代码的解决方案。
请注意,代码生成也可以分解为生成单个字符串的函数,结合一些 rx
魔法,使其更 rx
-like
我也稍微调整了代码,让flake8
开心
from __future__ import print_function
import random
import string
import time
from rx import Observable
def make_tickers(n=300, s=123):
"""
Generates up to n unique 3-letter strings each made up of uppercase letters
"""
random.seed(s)
tickers = [''.join(random.choice(string.ascii_uppercase) for _ in range(3))
for y in range(n)]
tickers = list(set(tickers)) # unique
print(len(tickers))
return(tickers)
def random_picker(tickers):
ticker = random.choice(tickers)
time.sleep(random.randint(20, 100) / 1000.0)
return ticker
if __name__ == "__main__":
xx = Observable\
.repeat(make_tickers())\
.map(random_picker)\
.subscribe(lambda s: print(s))
或没有make_tickers
的解决方案:
from __future__ import print_function
import random
import string
import time
from rx import Observable
def random_picker(tickers):
ticker = random.choice(tickers)
time.sleep(random.randint(20, 100) / 1000.0)
return ticker
if __name__ == "__main__":
random.seed(123)
Observable.range(1, 300)\
.map(lambda _: ''.join(random.choice(string.ascii_uppercase)
for _ in range(3)))\
.reduce(lambda x, y: x + [y], [])\
.do_while(lambda _: True)\
.map(random_picker)\
.subscribe(lambda s: print(s))
time.sleep
可以从 random_picker
移开,但代码会变得有点棘手
您还可以使用“partials”来包装您的 Subscription 方法。它允许您定义其他参数,但在仅等待 Observer 和 Scheduler 的方法上调用 rx.create:
def my_subscription_with_arguments(observer, scheduler, arg1):
observer.on_next(arg1)
my_subscription_wrapper = functools.partial(my_subscription_with_arguments, arg1='hello')
source = rx.create(my_subscription_wrapper)