如何从 Rx Observable 中提取值?
How can I extract values from an Rx Observable?
我正在尝试将一些 ReactiveX 概念集成到现有项目中,认为这可能是一种很好的做法,也是一种使某些任务更清晰的方法。
我打开一个文件,从它的行创建一个 Observable,然后进行一些过滤,直到我得到我想要的行。现在,我想使用 re.search() 从其中两行中提取一些信息到 return 特定组。我一辈子都想不出如何从 Observable 中获取这些值(不将它们分配给全局变量)。
train = 'ChooChoo'
with open(some_file) as fd:
line_stream = Observable.from_(fd.readlines())
a_stream = line_stream.skip_while(
# Begin at dictionary
lambda x: 'config = {' not in x
).skip_while(
# Begin at train key
lambda x: "'" + train.lower() + "'" not in x
).take_while(
# End at closing brace of dict value
lambda x: '}' not in x
).filter(
# Filter sdk and clang lines only
lambda x: "'sdk'" in x or "'clang'" in x
).subscribe(lambda x: match_some_regex(x))
代替该流末尾的 .subscribe()
,我尝试使用 .to_list()
获取一个列表,我可以在该列表上迭代 "the normal way," 但它仅 returns 类型的值:
<class 'rx.anonymousobservable.AnonymousObservable'>
我做错了什么?
我见过的每个 Rx 示例除了打印结果外什么都不做。如果我希望它们在我可以同步使用的数据结构中怎么办?
短期内,我使用 itertools 实现了我想要的功能(正如@jonrsharpe 所建议的)。这个问题仍然困扰着我,所以我今天回过头来解决了。
这不是 Rx 的一个很好的例子,因为它只使用一个线程,但至少现在我知道如何在需要的时候跳出 "the monad"。
#!/usr/bin/env python
from __future__ import print_function
from rx import *
def my_on_next(item):
print(item, end="", flush=True)
def my_on_error(throwable):
print(throwable)
def my_on_completed():
print('Done')
pass
def main():
foo = []
# Create an observable from a list of numbers
a = Observable.from_([14, 9, 5, 2, 10, 13, 4])
# Keep only the even numbers
b = a.filter(lambda x: x % 2 == 0)
# For every item, call a function that appends the item to a local list
c = b.map(lambda x: foo.append(x))
c.subscribe(lambda x: x, my_on_error, my_on_completed)
# Use the list outside the monad!
print(foo)
if __name__ == "__main__":
main()
这个例子有点做作,所有的中间可观察量都不是必需的,但它表明您可以轻松地完成我最初描述的操作。
我正在尝试将一些 ReactiveX 概念集成到现有项目中,认为这可能是一种很好的做法,也是一种使某些任务更清晰的方法。
我打开一个文件,从它的行创建一个 Observable,然后进行一些过滤,直到我得到我想要的行。现在,我想使用 re.search() 从其中两行中提取一些信息到 return 特定组。我一辈子都想不出如何从 Observable 中获取这些值(不将它们分配给全局变量)。
train = 'ChooChoo'
with open(some_file) as fd:
line_stream = Observable.from_(fd.readlines())
a_stream = line_stream.skip_while(
# Begin at dictionary
lambda x: 'config = {' not in x
).skip_while(
# Begin at train key
lambda x: "'" + train.lower() + "'" not in x
).take_while(
# End at closing brace of dict value
lambda x: '}' not in x
).filter(
# Filter sdk and clang lines only
lambda x: "'sdk'" in x or "'clang'" in x
).subscribe(lambda x: match_some_regex(x))
代替该流末尾的 .subscribe()
,我尝试使用 .to_list()
获取一个列表,我可以在该列表上迭代 "the normal way," 但它仅 returns 类型的值:
<class 'rx.anonymousobservable.AnonymousObservable'>
我做错了什么?
我见过的每个 Rx 示例除了打印结果外什么都不做。如果我希望它们在我可以同步使用的数据结构中怎么办?
短期内,我使用 itertools 实现了我想要的功能(正如@jonrsharpe 所建议的)。这个问题仍然困扰着我,所以我今天回过头来解决了。
这不是 Rx 的一个很好的例子,因为它只使用一个线程,但至少现在我知道如何在需要的时候跳出 "the monad"。
#!/usr/bin/env python
from __future__ import print_function
from rx import *
def my_on_next(item):
print(item, end="", flush=True)
def my_on_error(throwable):
print(throwable)
def my_on_completed():
print('Done')
pass
def main():
foo = []
# Create an observable from a list of numbers
a = Observable.from_([14, 9, 5, 2, 10, 13, 4])
# Keep only the even numbers
b = a.filter(lambda x: x % 2 == 0)
# For every item, call a function that appends the item to a local list
c = b.map(lambda x: foo.append(x))
c.subscribe(lambda x: x, my_on_error, my_on_completed)
# Use the list outside the monad!
print(foo)
if __name__ == "__main__":
main()
这个例子有点做作,所有的中间可观察量都不是必需的,但它表明您可以轻松地完成我最初描述的操作。