如何使用 RxCpp 创建 ReplaySubject?
How to create a ReplaySubject with RxCpp?
在我的 C++ 项目中,我需要创建具有初始值的主题,该值可能会被更新。在每个 subscription/update 上,订阅者可能会触发然后进行数据处理...在之前的 Angular (RxJS) 项目中,这种行为是使用 ReplaySubject(1) 处理的。
我无法使用 c++ rxcpp lib
重现此内容。
我查找了文档、片段、教程,但没有成功。
预期的伪代码(打字稿):
private dataSub: ReplaySubject<Data> = new ReplaySubject<Data>(1);
private init = false;
// public Observable, immediatly share last published value
get currentData$(): Observable<Data> {
if (!this.init) {
return this.initData().pipe(switchMap(
() => this.dataSub.asObservable()
));
}
return this.dataSub.asObservable();
}
我认为您正在寻找 rxcpp::subjects::replay。
请试试这个:
auto coordination = rxcpp::observe_on_new_thread();
rxcpp::subjects::replay<int, decltype(coordination)> test(1, coordination);
// to emit data
test.get_observer().on_next(1);
test.get_observer().on_next(2);
test.get_observer().on_next(3);
// to subscribe
test.get_observable().subscribe([](auto && v)
printf("%d\n", v); // this will print '3'
});
在我的 C++ 项目中,我需要创建具有初始值的主题,该值可能会被更新。在每个 subscription/update 上,订阅者可能会触发然后进行数据处理...在之前的 Angular (RxJS) 项目中,这种行为是使用 ReplaySubject(1) 处理的。
我无法使用 c++ rxcpp lib
重现此内容。
我查找了文档、片段、教程,但没有成功。
预期的伪代码(打字稿):
private dataSub: ReplaySubject<Data> = new ReplaySubject<Data>(1);
private init = false;
// public Observable, immediatly share last published value
get currentData$(): Observable<Data> {
if (!this.init) {
return this.initData().pipe(switchMap(
() => this.dataSub.asObservable()
));
}
return this.dataSub.asObservable();
}
我认为您正在寻找 rxcpp::subjects::replay。 请试试这个:
auto coordination = rxcpp::observe_on_new_thread();
rxcpp::subjects::replay<int, decltype(coordination)> test(1, coordination);
// to emit data
test.get_observer().on_next(1);
test.get_observer().on_next(2);
test.get_observer().on_next(3);
// to subscribe
test.get_observable().subscribe([](auto && v)
printf("%d\n", v); // this will print '3'
});