如何连接两个返回可观察量的函数?
How to connect two functions returning observables?
我写了一个函数,return 一对 QDateTime
是可观察的,比如这个:
rxcpp::observable<std::tuple<QDateTime, QDateTime>> experimentOne(const QDateTimeAxis * const axis
{
return rxcpp::observable<>::create<std::tuple<QDateTime, QDateTime>>(
[axis](rxcpp::subscriber<std::tuple<QDateTime, QDateTime>> s) {
auto rangeCallback = [s](QDateTime minv, QDateTime maxv) {
if (s.is_subscribed()) {
// send to the subscriber
s.on_next(std::make_tuple<QDateTime, QDateTime>(std::move(minv), std::move(maxv)));
}
};
QObject::connect(axis, &QDateTimeAxis::rangeChanged, rangeCallback);
});
}
因此,通过此功能,我可以订阅 QChart
轴上日期范围的变化。
我还写了另一个函数,给定两个日期,returns 是一个可观察值,其值来自 sqlite 数据库,如下所示
rxcpp::observable<std::tuple<double, double>> Database::getValueRange(const std::string& table, unsigned long start, unsigned long end)
{
return rxcpp::observable<>::create<std::tuple<double, double>>(
[this, table, start, end](rxcpp::subscriber<std::tuple<double, double>> s) {
// get the prepared statement for the query 1, i.e. ohlcv values
// within a date range
sqlite3_stmt *stmt = this->m_query3_stms[table].get();
// bind first parameter, the start timestamp
int rc = sqlite3_bind_int64(stmt, 1, start);
checkSqliteCode(rc, m_db.get());
// bind the second parameter, the end timestamp
rc = sqlite3_bind_int64(stmt, 2, end);
checkSqliteCode(rc, m_db.get());
// step through the query results
while ( sqlite3_step(stmt)==SQLITE_ROW && s.is_subscribed() ) {
// extract name values from the current result row
float minv = sqlite3_column_double(stmt, 0);
float maxv = sqlite3_column_double(stmt, 1);
// send to the subscriber
s.on_next(std::make_tuple<double, double>(minv, maxv));
}
// reset the statement for reuse
sqlite3_reset(stmt);
// send complete to the subscriber
s.on_completed();
});
}
如何在 RxCpp 中以惯用形式将第一个函数(两个日期)的值作为输入传递给第二个函数?在管道末尾,我可以根据输入日期订阅来自数据库的值?
为每对新日期值创建新值范围的规范方法是使用 map 后跟 flattening 运算符
auto valueRanges = experimentOne(/*params*/).
map(rxcpp::util::apply_to([](QDateTime d1, QDateTime d2){
return getValueRange(/*params*/).
map(rxcpp::util::apply_to([=](double db1, double db2){
return std::make_tuple(d1, d2, db1, db2);
}));
})).
/* switch_on_next() or merge() or concat() */
/* this will produce std::tuple< QDateTime, QDateTime, double, double>
switch_on_next
将取消先前的值范围并开始新的值范围。
merge
将尽快生成所有值域。
concat
将按顺序一次生成一个值范围。
如果值范围在不同线程上为 运行,则必须将线程安全协调传递给 merge
,以便值范围安全交错。
要 select 超出特定范围,请使用 filter()
。如果您希望能够将范围拆分为单独的表达式,请先使用 publish()
共享范围。
auto hotValueRanges = valueRanges.
publish().ref_count();
auto aDateRange = hotValueRanges.
filter(rxcpp::util::apply_to([](QDateTime d1, QDateTime d2, double, double){
return isADesiredDate(d1, d2);
})).
subscribe(/*use the range*/);
auto anotherDateRange = hotValueRanges.
filter(rxcpp::util::apply_to([](QDateTime d1, QDateTime d2, double, double){
return isAnotherDesiredDate(d1, d2);
})).
subscribe(/*use the range*/);
我写了一个函数,return 一对 QDateTime
是可观察的,比如这个:
rxcpp::observable<std::tuple<QDateTime, QDateTime>> experimentOne(const QDateTimeAxis * const axis
{
return rxcpp::observable<>::create<std::tuple<QDateTime, QDateTime>>(
[axis](rxcpp::subscriber<std::tuple<QDateTime, QDateTime>> s) {
auto rangeCallback = [s](QDateTime minv, QDateTime maxv) {
if (s.is_subscribed()) {
// send to the subscriber
s.on_next(std::make_tuple<QDateTime, QDateTime>(std::move(minv), std::move(maxv)));
}
};
QObject::connect(axis, &QDateTimeAxis::rangeChanged, rangeCallback);
});
}
因此,通过此功能,我可以订阅 QChart
轴上日期范围的变化。
我还写了另一个函数,给定两个日期,returns 是一个可观察值,其值来自 sqlite 数据库,如下所示
rxcpp::observable<std::tuple<double, double>> Database::getValueRange(const std::string& table, unsigned long start, unsigned long end)
{
return rxcpp::observable<>::create<std::tuple<double, double>>(
[this, table, start, end](rxcpp::subscriber<std::tuple<double, double>> s) {
// get the prepared statement for the query 1, i.e. ohlcv values
// within a date range
sqlite3_stmt *stmt = this->m_query3_stms[table].get();
// bind first parameter, the start timestamp
int rc = sqlite3_bind_int64(stmt, 1, start);
checkSqliteCode(rc, m_db.get());
// bind the second parameter, the end timestamp
rc = sqlite3_bind_int64(stmt, 2, end);
checkSqliteCode(rc, m_db.get());
// step through the query results
while ( sqlite3_step(stmt)==SQLITE_ROW && s.is_subscribed() ) {
// extract name values from the current result row
float minv = sqlite3_column_double(stmt, 0);
float maxv = sqlite3_column_double(stmt, 1);
// send to the subscriber
s.on_next(std::make_tuple<double, double>(minv, maxv));
}
// reset the statement for reuse
sqlite3_reset(stmt);
// send complete to the subscriber
s.on_completed();
});
}
如何在 RxCpp 中以惯用形式将第一个函数(两个日期)的值作为输入传递给第二个函数?在管道末尾,我可以根据输入日期订阅来自数据库的值?
为每对新日期值创建新值范围的规范方法是使用 map 后跟 flattening 运算符
auto valueRanges = experimentOne(/*params*/).
map(rxcpp::util::apply_to([](QDateTime d1, QDateTime d2){
return getValueRange(/*params*/).
map(rxcpp::util::apply_to([=](double db1, double db2){
return std::make_tuple(d1, d2, db1, db2);
}));
})).
/* switch_on_next() or merge() or concat() */
/* this will produce std::tuple< QDateTime, QDateTime, double, double>
switch_on_next
将取消先前的值范围并开始新的值范围。merge
将尽快生成所有值域。concat
将按顺序一次生成一个值范围。
如果值范围在不同线程上为 运行,则必须将线程安全协调传递给 merge
,以便值范围安全交错。
要 select 超出特定范围,请使用 filter()
。如果您希望能够将范围拆分为单独的表达式,请先使用 publish()
共享范围。
auto hotValueRanges = valueRanges.
publish().ref_count();
auto aDateRange = hotValueRanges.
filter(rxcpp::util::apply_to([](QDateTime d1, QDateTime d2, double, double){
return isADesiredDate(d1, d2);
})).
subscribe(/*use the range*/);
auto anotherDateRange = hotValueRanges.
filter(rxcpp::util::apply_to([](QDateTime d1, QDateTime d2, double, double){
return isAnotherDesiredDate(d1, d2);
})).
subscribe(/*use the range*/);