将 Flowable 拆分为 2,处理 2 个流,但一个依赖于另一个?
Split a Flowable in 2, process 2 streams, but one depends on the other?
我遇到以下情况:我需要处理作为 Flowable 接收的流。流中的每个项目都有一段数据,只有流中的第一个元素包含元数据。可以处理数据流的函数需要元数据中的信息才能这样做。
类似于:
// Stream items look like this
class StreamItem{
Metadata meta;
Data data;
}
// Processor looks like this
Single<Result> processStream(Meta meta, Flowable<Data> data);
我收到 Flowable<StreamItem>
。我试过做类似的事情:
Flowable<StreamItem> input = ...
ConnectableFlowable<StreamItem> multi = input.publish;
Single<Meta> streamMeta = multi.firstOrError().map(StreamItem::getMeta);
Flowable<Data> streamData = multi.map(StreamItem::getData);
multi.connect();
Single<Result> result = streamMeta.flatMap(meta -> processStream(meta,streamData));
在那之后我只是 return result.ignoreResult()
(因为我们需要过程的副作用而不是真正的对象),并且从客户端(这是入口点)我们只是映射Completable
变成调用的标准响应。不确定最后一部分是否相关。
我也试过:
Flowable<Result> res = input.publish(
flow -> {
Single<Meta> meta = flow.firstOrError().map(StreamItem::getMeta);
Flowable<Data> data = flow.map(StreamITem::getData);
return meta.flatMap(met -> processStream(met,data)).toFlowable();
});
然后 return res.ignoreElements()
用于上述相同的 Completable
过程。
我已经能够处理 Meta,或者存根 Meta 并处理数据流,但是一旦我像上面描述的那样连接两者,似乎没有完成任何处理。我想可能是我在嵌套处理同一个流?无论如何,我认为我可能误解了这一切是如何工作的(我对 Rx 还很陌生),所以如果有人对如何实现这一点有更好的想法,我很乐意听听!
稍微改变一些东西,我认为你可以利用 Flowable::withLatestFrom( Flowable, BiFunction )
。
// Stream items look like this
class StreamItem
{
String meta;
Integer data;
public String getMeta()
{
return meta;
}
public Integer getData()
{
return data;
}
}
// Processor looks like this
interface Processor
{
String processStream( String meta, Integer data );
}
@Test
public void testFlowable()
{
// Set up mock input:
AtomicBoolean first = new AtomicBoolean( true );
Flowable<StreamItem> input = Flowable.generate( emitter -> {
StreamItem item = new StreamItem();
item.data = (int)( Math.random() * 100 );
if ( first.getAndSet( false )) {
item.meta = UUID.randomUUID().toString();
}
emitter.onNext( item );
} );
// Mock processor:
Processor processor = ( meta, data ) -> meta + " : " + data;
// Set up rx pipeline:
Flowable<StreamItem> multi = input.share();
Maybe<String> streamMeta = multi.firstElement().map( StreamItem::getMeta );
Flowable<String> result = multi.map( StreamItem::getData )
.withLatestFrom( streamMeta.toFlowable(),
( data, meta ) -> processor.processStream( meta, data ));
// Subscribe:
result.take( 5 ).blockingSubscribe( System.out::println );
}
输出:
3fba00bd-027b-4802-8b7d-674497d72052 : 14
3fba00bd-027b-4802-8b7d-674497d72052 : 72
3fba00bd-027b-4802-8b7d-674497d72052 : 47
3fba00bd-027b-4802-8b7d-674497d72052 : 14
3fba00bd-027b-4802-8b7d-674497d72052 : 93
根据反馈更新:
如果您确实需要数据 Flowable
以及具体的元数据对象,这似乎可以解决问题:
// Stream items look like this
class StreamItem
{
String meta;
Integer data;
public String getMeta()
{
return meta;
}
public Integer getData()
{
return data;
}
}
// Processor looks like this
interface Processor
{
String processStream( String meta, Flowable<Integer> data );
}
@Test
public void testFlowable()
{
// Set up mock input:
AtomicBoolean first = new AtomicBoolean( true );
Flowable<StreamItem> input = Flowable.generate( emitter -> {
StreamItem item = new StreamItem();
item.data = (int)( Math.random() * 100 );
if ( first.getAndSet( false )) {
item.meta = UUID.randomUUID().toString();
}
emitter.onNext( item );
} );
// Mock processor:
Processor processor = ( meta, data ) -> {
System.out.println( meta );
data.subscribe( System.out::println );
return meta;
};
// Set up rx pipeline:
Flowable<StreamItem> multi = input.take( 5 ).share();
Maybe<String> streamMeta = multi.firstElement().map( StreamItem::getMeta );
streamMeta.map( meta ->
processor.processStream( meta, multi.map( StreamItem::getData )))
.subscribe();
}
输出:
3421c5f6-8554-43ce-aa69-e6cef9c1ed89
47
46
74
59
57
我遇到以下情况:我需要处理作为 Flowable 接收的流。流中的每个项目都有一段数据,只有流中的第一个元素包含元数据。可以处理数据流的函数需要元数据中的信息才能这样做。
类似于:
// Stream items look like this
class StreamItem{
Metadata meta;
Data data;
}
// Processor looks like this
Single<Result> processStream(Meta meta, Flowable<Data> data);
我收到 Flowable<StreamItem>
。我试过做类似的事情:
Flowable<StreamItem> input = ...
ConnectableFlowable<StreamItem> multi = input.publish;
Single<Meta> streamMeta = multi.firstOrError().map(StreamItem::getMeta);
Flowable<Data> streamData = multi.map(StreamItem::getData);
multi.connect();
Single<Result> result = streamMeta.flatMap(meta -> processStream(meta,streamData));
在那之后我只是 return result.ignoreResult()
(因为我们需要过程的副作用而不是真正的对象),并且从客户端(这是入口点)我们只是映射Completable
变成调用的标准响应。不确定最后一部分是否相关。
我也试过:
Flowable<Result> res = input.publish(
flow -> {
Single<Meta> meta = flow.firstOrError().map(StreamItem::getMeta);
Flowable<Data> data = flow.map(StreamITem::getData);
return meta.flatMap(met -> processStream(met,data)).toFlowable();
});
然后 return res.ignoreElements()
用于上述相同的 Completable
过程。
我已经能够处理 Meta,或者存根 Meta 并处理数据流,但是一旦我像上面描述的那样连接两者,似乎没有完成任何处理。我想可能是我在嵌套处理同一个流?无论如何,我认为我可能误解了这一切是如何工作的(我对 Rx 还很陌生),所以如果有人对如何实现这一点有更好的想法,我很乐意听听!
稍微改变一些东西,我认为你可以利用 Flowable::withLatestFrom( Flowable, BiFunction )
。
// Stream items look like this
class StreamItem
{
String meta;
Integer data;
public String getMeta()
{
return meta;
}
public Integer getData()
{
return data;
}
}
// Processor looks like this
interface Processor
{
String processStream( String meta, Integer data );
}
@Test
public void testFlowable()
{
// Set up mock input:
AtomicBoolean first = new AtomicBoolean( true );
Flowable<StreamItem> input = Flowable.generate( emitter -> {
StreamItem item = new StreamItem();
item.data = (int)( Math.random() * 100 );
if ( first.getAndSet( false )) {
item.meta = UUID.randomUUID().toString();
}
emitter.onNext( item );
} );
// Mock processor:
Processor processor = ( meta, data ) -> meta + " : " + data;
// Set up rx pipeline:
Flowable<StreamItem> multi = input.share();
Maybe<String> streamMeta = multi.firstElement().map( StreamItem::getMeta );
Flowable<String> result = multi.map( StreamItem::getData )
.withLatestFrom( streamMeta.toFlowable(),
( data, meta ) -> processor.processStream( meta, data ));
// Subscribe:
result.take( 5 ).blockingSubscribe( System.out::println );
}
输出:
3fba00bd-027b-4802-8b7d-674497d72052 : 14
3fba00bd-027b-4802-8b7d-674497d72052 : 72
3fba00bd-027b-4802-8b7d-674497d72052 : 47
3fba00bd-027b-4802-8b7d-674497d72052 : 14
3fba00bd-027b-4802-8b7d-674497d72052 : 93
根据反馈更新:
如果您确实需要数据 Flowable
以及具体的元数据对象,这似乎可以解决问题:
// Stream items look like this
class StreamItem
{
String meta;
Integer data;
public String getMeta()
{
return meta;
}
public Integer getData()
{
return data;
}
}
// Processor looks like this
interface Processor
{
String processStream( String meta, Flowable<Integer> data );
}
@Test
public void testFlowable()
{
// Set up mock input:
AtomicBoolean first = new AtomicBoolean( true );
Flowable<StreamItem> input = Flowable.generate( emitter -> {
StreamItem item = new StreamItem();
item.data = (int)( Math.random() * 100 );
if ( first.getAndSet( false )) {
item.meta = UUID.randomUUID().toString();
}
emitter.onNext( item );
} );
// Mock processor:
Processor processor = ( meta, data ) -> {
System.out.println( meta );
data.subscribe( System.out::println );
return meta;
};
// Set up rx pipeline:
Flowable<StreamItem> multi = input.take( 5 ).share();
Maybe<String> streamMeta = multi.firstElement().map( StreamItem::getMeta );
streamMeta.map( meta ->
processor.processStream( meta, multi.map( StreamItem::getData )))
.subscribe();
}
输出:
3421c5f6-8554-43ce-aa69-e6cef9c1ed89
47
46
74
59
57