总结 reactx 中的 Observables python
summarizing Observables in reactivex python
在 Python 中使用 ReactiveX,我如何总结 Observable 流?
我有一个字典流,它们是 {"user": "...", "date": ...}。我想创建一个我可以应用的函数,它为每个用户累积具有最新日期的字典,然后在流结束时发出累积的可观察值(就像 max,但必须查看用户字段,并且会发出多个值)。
示例 - 输入流:
{ "user": "a", "date": "2017-02-14" }
{ "user": "b", "date": "2016-01-01" }
{ "user": "c", "date": "2015-01-01" }
{ "user": "a", "date": "2017-01-01" }
{ "user": "b", "date": "2017-01-01" }
预期输出(顺序无关紧要)
{ "user": "a", "date": "2017-02-14" }
{ "user": "c", "date": "2015-01-01" }
{ "user": "b", "date": "2017-01-01" }
我在 https://ninmesara.github.io/RxPY/api/operators/index.html 阅读了 "Filtering Observables"、"Transforming Observables"、"Combining Observables" 和 "Decision Tree of Observable Operators",并查看了 reduce/aggregate(仅发出单末尾的值)和 flat_map(不知道如何检测流的结尾)。 many_select 和 window(尤其是)看起来很有前途,但我很难理解它们。
我如何使用 rx 执行此操作(使用现有运算符之一,或创建自定义运算符 [我还不知道该怎么做]?)
我认为以下可能会满足您的要求。
import rx
rx.Observable.from_([
{ "user": "a", "date": "2017-02-14" },
{ "user": "b", "date": "2016-01-01" },
{ "user": "c", "date": "2015-01-01" },
{ "user": "a", "date": "2017-01-01" },
{ "user": "b", "date": "2017-01-01" }]) \
.group_by(lambda x: x['user']) \
.flat_map(lambda x: x.max_by(lambda y: y['date'], lambda a,b: -1 if a < b else 1 if a>b else 0)) \
.subscribe(print)
Hans 的答案很接近,只是需要调整一下。
我的观察员希望得到 { 'user': ..., 'date': }
字典:
import rx
def pp1(x):
print type(x), x
rx.Observable.from_([
{ "user": "a", "date": "2017-02-14" },
{ "user": "b", "date": "2016-01-01" },
{ "user": "c", "date": "2015-01-01" },
{ "user": "a", "date": "2017-01-01" },
{ "user": "b", "date": "2017-01-01" }]) \
.map(lambda x: x[0]) \
.subscribe(pp1)
产量
<type 'dict'> {'date': '2017-02-14', 'user': 'a'}
<type 'dict'> {'date': '2016-01-01', 'user': 'b'}
<type 'dict'> {'date': '2015-01-01', 'user': 'c'}
<type 'dict'> {'date': '2017-01-01', 'user': 'a'}
<type 'dict'> {'date': '2017-01-01', 'user': 'b'}
执行 .group_by 和 .flat_map 会导致观察者最后得到包含摘要的长度为 1 的列表,而不仅仅是摘要。
import rx
def pp1(x):
print type(x), x
rx.Observable.from_([
{ "user": "a", "date": "2017-02-14" },
{ "user": "b", "date": "2016-01-01" },
{ "user": "c", "date": "2015-01-01" },
{ "user": "a", "date": "2017-01-01" },
{ "user": "b", "date": "2017-01-01" }]) \
.group_by(lambda x: x['user']) \
.flat_map(lambda x: x.max_by(lambda y: y['date'], lambda a,b: -1 if a < b else 1 if a>b else 0)) \
.subscribe(pp1)
产量
<type 'list'> [{'date': '2017-02-14', 'user': 'a'}]
<type 'list'> [{'date': '2017-01-01', 'user': 'b'}]
<type 'list'> [{'date': '2015-01-01', 'user': 'c'}]
需要添加地图:
import rx
def pp1(x):
print type(x), x
rx.Observable.from_([
{ "user": "a", "date": "2017-02-14" },
{ "user": "b", "date": "2016-01-01" },
{ "user": "c", "date": "2015-01-01" },
{ "user": "a", "date": "2017-01-01" },
{ "user": "b", "date": "2017-01-01" }]) \
.group_by(lambda x: x['user']) \
.flat_map(lambda x: x.max_by(lambda y: y['date'], lambda a,b: -1 if a < b else 1 if a>b else 0)) \
.map(lambda x: x[0]) \
.subscribe(pp1)
产生预期的
<type 'dict'> {'date': '2017-02-14', 'user': 'a'}
<type 'dict'> {'date': '2017-01-01', 'user': 'b'}
<type 'dict'> {'date': '2015-01-01', 'user': 'c'}
在 Python 中使用 ReactiveX,我如何总结 Observable 流?
我有一个字典流,它们是 {"user": "...", "date": ...}。我想创建一个我可以应用的函数,它为每个用户累积具有最新日期的字典,然后在流结束时发出累积的可观察值(就像 max,但必须查看用户字段,并且会发出多个值)。
示例 - 输入流:
{ "user": "a", "date": "2017-02-14" }
{ "user": "b", "date": "2016-01-01" }
{ "user": "c", "date": "2015-01-01" }
{ "user": "a", "date": "2017-01-01" }
{ "user": "b", "date": "2017-01-01" }
预期输出(顺序无关紧要)
{ "user": "a", "date": "2017-02-14" }
{ "user": "c", "date": "2015-01-01" }
{ "user": "b", "date": "2017-01-01" }
我在 https://ninmesara.github.io/RxPY/api/operators/index.html 阅读了 "Filtering Observables"、"Transforming Observables"、"Combining Observables" 和 "Decision Tree of Observable Operators",并查看了 reduce/aggregate(仅发出单末尾的值)和 flat_map(不知道如何检测流的结尾)。 many_select 和 window(尤其是)看起来很有前途,但我很难理解它们。
我如何使用 rx 执行此操作(使用现有运算符之一,或创建自定义运算符 [我还不知道该怎么做]?)
我认为以下可能会满足您的要求。
import rx
rx.Observable.from_([
{ "user": "a", "date": "2017-02-14" },
{ "user": "b", "date": "2016-01-01" },
{ "user": "c", "date": "2015-01-01" },
{ "user": "a", "date": "2017-01-01" },
{ "user": "b", "date": "2017-01-01" }]) \
.group_by(lambda x: x['user']) \
.flat_map(lambda x: x.max_by(lambda y: y['date'], lambda a,b: -1 if a < b else 1 if a>b else 0)) \
.subscribe(print)
Hans 的答案很接近,只是需要调整一下。
我的观察员希望得到 { 'user': ..., 'date': }
字典:
import rx
def pp1(x):
print type(x), x
rx.Observable.from_([
{ "user": "a", "date": "2017-02-14" },
{ "user": "b", "date": "2016-01-01" },
{ "user": "c", "date": "2015-01-01" },
{ "user": "a", "date": "2017-01-01" },
{ "user": "b", "date": "2017-01-01" }]) \
.map(lambda x: x[0]) \
.subscribe(pp1)
产量
<type 'dict'> {'date': '2017-02-14', 'user': 'a'}
<type 'dict'> {'date': '2016-01-01', 'user': 'b'}
<type 'dict'> {'date': '2015-01-01', 'user': 'c'}
<type 'dict'> {'date': '2017-01-01', 'user': 'a'}
<type 'dict'> {'date': '2017-01-01', 'user': 'b'}
执行 .group_by 和 .flat_map 会导致观察者最后得到包含摘要的长度为 1 的列表,而不仅仅是摘要。
import rx
def pp1(x):
print type(x), x
rx.Observable.from_([
{ "user": "a", "date": "2017-02-14" },
{ "user": "b", "date": "2016-01-01" },
{ "user": "c", "date": "2015-01-01" },
{ "user": "a", "date": "2017-01-01" },
{ "user": "b", "date": "2017-01-01" }]) \
.group_by(lambda x: x['user']) \
.flat_map(lambda x: x.max_by(lambda y: y['date'], lambda a,b: -1 if a < b else 1 if a>b else 0)) \
.subscribe(pp1)
产量
<type 'list'> [{'date': '2017-02-14', 'user': 'a'}]
<type 'list'> [{'date': '2017-01-01', 'user': 'b'}]
<type 'list'> [{'date': '2015-01-01', 'user': 'c'}]
需要添加地图:
import rx
def pp1(x):
print type(x), x
rx.Observable.from_([
{ "user": "a", "date": "2017-02-14" },
{ "user": "b", "date": "2016-01-01" },
{ "user": "c", "date": "2015-01-01" },
{ "user": "a", "date": "2017-01-01" },
{ "user": "b", "date": "2017-01-01" }]) \
.group_by(lambda x: x['user']) \
.flat_map(lambda x: x.max_by(lambda y: y['date'], lambda a,b: -1 if a < b else 1 if a>b else 0)) \
.map(lambda x: x[0]) \
.subscribe(pp1)
产生预期的
<type 'dict'> {'date': '2017-02-14', 'user': 'a'}
<type 'dict'> {'date': '2017-01-01', 'user': 'b'}
<type 'dict'> {'date': '2015-01-01', 'user': 'c'}