正确处理 RxJava 中的空 Observable
Properly handling empty Observable in RxJava
我有一种情况,我正在创建一个包含来自数据库的结果的 Observable。然后我对它们应用了一系列过滤器。然后我有一个订阅者正在记录结果。可能没有元素通过过滤器。我的业务逻辑表明这不是错误。但是,当发生这种情况时,我的 onError 被调用并包含以下异常:java.util.NoSuchElementException: Sequence contains no elements
仅检测该类型的异常并忽略它是公认的做法吗?或者有更好的方法来处理这个问题吗?
版本为1.0.0.
这是一个简单的测试用例,展示了我所看到的内容。它似乎与在到达 map 和 reduce 之前过滤所有事件有关。
@Test
public void test()
{
Integer values[] = new Integer[]{1, 2, 3, 4, 5};
Observable.from(values).filter(new Func1<Integer, Boolean>()
{
@Override
public Boolean call(Integer integer)
{
if (integer < 0)
return true;
else
return false;
}
}).map(new Func1<Integer, String>()
{
@Override
public String call(Integer integer)
{
return String.valueOf(integer);
}
}).reduce(new Func2<String, String, String>()
{
@Override
public String call(String s, String s2)
{
return s + "," + s2;
}
})
.subscribe(new Action1<String>()
{
@Override
public void call(String s)
{
System.out.println(s);
}
});
}
因为我使用的是安全订阅者,它最初会抛出一个 OnErrorNotImplementedException,它包含以下异常:
java.util.NoSuchElementException: Sequence contains no elements
at rx.internal.operators.OperatorSingle.onCompleted(OperatorSingle.java:82)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140)
at rx.internal.operators.TakeLastQueueProducer.emit(TakeLastQueueProducer.java:73)
at rx.internal.operators.TakeLastQueueProducer.startEmitting(TakeLastQueueProducer.java:45)
at rx.internal.operators.OperatorTakeLast.onCompleted(OperatorTakeLast.java:59)
at rx.internal.operators.OperatorScan.onCompleted(OperatorScan.java:121)
at rx.internal.operators.OperatorMap.onCompleted(OperatorMap.java:43)
at rx.internal.operators.OperatorFilter.onCompleted(OperatorFilter.java:42)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:79)
at rx.internal.operators.OperatorScan.request(OperatorScan.java:147)
at rx.Subscriber.setProducer(Subscriber.java:139)
at rx.internal.operators.OperatorScan.setProducer(OperatorScan.java:139)
at rx.Subscriber.setProducer(Subscriber.java:133)
at rx.Subscriber.setProducer(Subscriber.java:133)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:47)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:33)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.subscribe(Observable.java:7284)
根据下面@davem 的回答,我创建了一个新的测试用例:
@Test
public void testFromBlockingAndSingle()
{
Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};
List<String> results = Observable.from(values).filter(new Func1<Integer, Boolean>()
{
@Override
public Boolean call(Integer integer)
{
if (integer < 0)
return true;
else
return false;
}
}).map(new Func1<Integer, String>()
{
@Override
public String call(Integer integer)
{
return String.valueOf(integer);
}
}).reduce(new Func2<String, String, String>()
{
@Override
public String call(String s, String s2)
{
return s + "," + s2;
}
}).toList().toBlocking().single();
System.out.println("Test: " + results + " Size: " + results.size());
}
此测试导致以下行为:
当输入为:
Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};
那么结果(如预期的那样)是:
Test: [-2,-1] Size: 1
当输入为:
Integer values[] = new Integer[]{0, 1, 2, 3, 4, 5};
然后结果是以下堆栈跟踪:
java.util.NoSuchElementException: Sequence contains no elements
at rx.internal.operators.OperatorSingle.onCompleted(OperatorSingle.java:82)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140)
at rx.internal.operators.TakeLastQueueProducer.emit(TakeLastQueueProducer.java:73)
at rx.internal.operators.TakeLastQueueProducer.startEmitting(TakeLastQueueProducer.java:45)
at rx.internal.operators.OperatorTakeLast.onCompleted(OperatorTakeLast.java:59)
at rx.internal.operators.OperatorScan.onCompleted(OperatorScan.java:121)
at rx.internal.operators.OperatorMap.onCompleted(OperatorMap.java:43)
at rx.internal.operators.OperatorFilter.onCompleted(OperatorFilter.java:42)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:79)
at rx.internal.operators.OperatorScan.request(OperatorScan.java:147)
at rx.Subscriber.setProducer(Subscriber.java:139)
at rx.internal.operators.OperatorScan.setProducer(OperatorScan.java:139)
at rx.Subscriber.setProducer(Subscriber.java:133)
at rx.Subscriber.setProducer(Subscriber.java:133)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:47)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:33)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.subscribe(Observable.java:7284)
at rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:441)
at rx.observables.BlockingObservable.single(BlockingObservable.java:340)
at EmptyTest2.test(EmptyTest2.java:19)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at org.junit.runners.model.FrameworkMethod.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access[=16=]0(ParentRunner.java:53)
at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:211)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:67)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
看来问题肯定出在reduce函数的使用上。请参阅以下处理这两种情况的测试用例:
@Test
public void testNoReduce()
{
Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};
List<String> results = Observable.from(values).filter(new Func1<Integer, Boolean>()
{
@Override
public Boolean call(Integer integer)
{
if (integer < 0)
return true;
else
return false;
}
}).map(new Func1<Integer, String>()
{
@Override
public String call(Integer integer)
{
return String.valueOf(integer);
}
}).toList().toBlocking().first();
Iterator<String> itr = results.iterator();
StringBuilder b = new StringBuilder();
while (itr.hasNext())
{
b.append(itr.next());
if (itr.hasNext())
b.append(",");
}
System.out.println("Test NoReduce: " + b);
}
使用以下输入:
Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};
我得到了以下预期结果:
Test NoReduce: -2,-1
并使用以下输入:
Integer values[] = new Integer[]{0, 1, 2, 3, 4, 5};
我得到以下预期输出:
Test NoReduce:
因此,除非我完全误解了某些东西,否则真正处理过滤器产生的零元素 Observable 的唯一方法是在 Observable 链之外实现 reduce 逻辑。大家同意这个说法吗?
最终解决方案
这是我在实施 Tomáš Dvořák 和 David Motten 建议后的最终解决方案。我觉得这个方案是合理的。
@Test
public void testWithToList()
{
Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};
Observable.from(values).filter(new Func1<Integer, Boolean>()
{
@Override
public Boolean call(Integer integer)
{
if (integer < 0)
return true;
else
return false;
}
}).toList().map(new Func1<List<Integer>, String>()
{
@Override
public String call(List<Integer> integers)
{
Iterator<Integer> intItr = integers.iterator();
StringBuilder b = new StringBuilder();
while (intItr.hasNext())
{
b.append(intItr.next());
if (intItr.hasNext())
{
b.append(",");
}
}
return b.toString();
}
}).subscribe(new Action1<String>()
{
@Override
public void call(String s)
{
System.out.println("With a toList: " + s);
}
});
}
以下是此测试在给定以下输入时的表现。
当给定一个流,其中一些值将通过过滤器时:
Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};
结果是:
With a toList: -2,-1
当给定一个没有任何值的流通过过滤器时:
Integer values[] = new Integer[]{0, 1, 2, 3, 4, 5};
结果是:
With a toList: <empty string>
发生这种情况的原因是您在空流上使用 toBlocking().single()
。如果您希望流中有 0 或 1 个值,您可以 toList().toBlocking().single()
然后检查列表中的值(可以为空但不会引发您得到的异常)。
更新后错误就很明显了。如果 RxJava 中的 Reduce
正在减少的 observable 为空,则将失败并返回 IllegalArgumentException
,完全按照规范 (http://reactivex.io/documentation/operators/reduce.html).
在函数式编程中,通常有两个泛型运算符将一个集合聚合成一个值,fold
和reduce
。在公认的术语中,fold
采用初始累加器值,以及采用 运行 累加器和集合中的值并生成另一个累加器值的函数。伪代码示例:
[1, 2, 3, 4].fold(0, (accumulator, value) => accumulator + value)
将从 0 开始,最终将 1、2、3、4 添加到 运行 累加器,最后产生 10,即值的总和。
Reduce 非常相似,只是它没有明确地取初始累加器值,它使用第一个值作为初始累加器,然后累加所有剩余值。如果你是,这是有道理的。寻找最小值或最大值。
[1, 2, 3, 4].reduce((accumulator, value) => min(accumulator, value))
看看 fold 和 reduce 不同的方式,你可能会使用 fold
只要聚合值即使在空集合上也有意义(比如,在 sum
中,0 是有意义的),并且 reduce
否则(minimum
对空集合没有意义,并且 reduce
将无法对此类集合进行操作,在您的情况下会抛出异常)。
您正在进行类似的聚合,用逗号散布一组字符串以生成单个字符串。那是有点困难的情况。它可能对一个空集合有意义(你可能期望一个空字符串),但另一方面,如果你从一个空累加器开始,你将在结果中比你预期的多一个逗号。正确的解决方案是首先检查集合是否为空,然后 return 空集合的后备字符串,或者对非空集合执行 reduce
。您可能会观察到,通常您实际上并不需要在空集合情况下使用空字符串,但 "collection is empty" 之类的内容可能更合适,从而进一步向您保证此解决方案是干净的解决方案。
顺便说一句,我在这里随意使用 collection 这个词,而不是 observable,只是出于教育目的。此外,在 RxJava 中,fold
和 reduce
的调用相同,reduce
,只是该方法有两个版本,一个只接受一个参数,其他两个参数。
至于你的最后一个问题:你不必离开 Observable 链。正如 David Motten 建议的那样,只需使用 toList()。
.filter(...)
.toList()
.map(listOfValues => listOfValues.intersperse(", "))
其中 intersperse
可以根据 reduce
来实现,如果还不是库函数的话(这很常见)。
collection.intersperse(separator) =
if (collection.isEmpty())
""
else
collection.reduce(accumulator, element => accumulator + separator + element)
您可以使用具有初始值版本的 reduce .reduce(0, (x,y) -> x+y )
它应该像 Tomáš Dvořák 解释的折叠运算符一样工作
我有一种情况,我正在创建一个包含来自数据库的结果的 Observable。然后我对它们应用了一系列过滤器。然后我有一个订阅者正在记录结果。可能没有元素通过过滤器。我的业务逻辑表明这不是错误。但是,当发生这种情况时,我的 onError 被调用并包含以下异常:java.util.NoSuchElementException: Sequence contains no elements
仅检测该类型的异常并忽略它是公认的做法吗?或者有更好的方法来处理这个问题吗?
版本为1.0.0.
这是一个简单的测试用例,展示了我所看到的内容。它似乎与在到达 map 和 reduce 之前过滤所有事件有关。
@Test
public void test()
{
Integer values[] = new Integer[]{1, 2, 3, 4, 5};
Observable.from(values).filter(new Func1<Integer, Boolean>()
{
@Override
public Boolean call(Integer integer)
{
if (integer < 0)
return true;
else
return false;
}
}).map(new Func1<Integer, String>()
{
@Override
public String call(Integer integer)
{
return String.valueOf(integer);
}
}).reduce(new Func2<String, String, String>()
{
@Override
public String call(String s, String s2)
{
return s + "," + s2;
}
})
.subscribe(new Action1<String>()
{
@Override
public void call(String s)
{
System.out.println(s);
}
});
}
因为我使用的是安全订阅者,它最初会抛出一个 OnErrorNotImplementedException,它包含以下异常:
java.util.NoSuchElementException: Sequence contains no elements
at rx.internal.operators.OperatorSingle.onCompleted(OperatorSingle.java:82)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140)
at rx.internal.operators.TakeLastQueueProducer.emit(TakeLastQueueProducer.java:73)
at rx.internal.operators.TakeLastQueueProducer.startEmitting(TakeLastQueueProducer.java:45)
at rx.internal.operators.OperatorTakeLast.onCompleted(OperatorTakeLast.java:59)
at rx.internal.operators.OperatorScan.onCompleted(OperatorScan.java:121)
at rx.internal.operators.OperatorMap.onCompleted(OperatorMap.java:43)
at rx.internal.operators.OperatorFilter.onCompleted(OperatorFilter.java:42)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:79)
at rx.internal.operators.OperatorScan.request(OperatorScan.java:147)
at rx.Subscriber.setProducer(Subscriber.java:139)
at rx.internal.operators.OperatorScan.setProducer(OperatorScan.java:139)
at rx.Subscriber.setProducer(Subscriber.java:133)
at rx.Subscriber.setProducer(Subscriber.java:133)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:47)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:33)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.subscribe(Observable.java:7284)
根据下面@davem 的回答,我创建了一个新的测试用例:
@Test
public void testFromBlockingAndSingle()
{
Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};
List<String> results = Observable.from(values).filter(new Func1<Integer, Boolean>()
{
@Override
public Boolean call(Integer integer)
{
if (integer < 0)
return true;
else
return false;
}
}).map(new Func1<Integer, String>()
{
@Override
public String call(Integer integer)
{
return String.valueOf(integer);
}
}).reduce(new Func2<String, String, String>()
{
@Override
public String call(String s, String s2)
{
return s + "," + s2;
}
}).toList().toBlocking().single();
System.out.println("Test: " + results + " Size: " + results.size());
}
此测试导致以下行为:
当输入为:
Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};
那么结果(如预期的那样)是:
Test: [-2,-1] Size: 1
当输入为:
Integer values[] = new Integer[]{0, 1, 2, 3, 4, 5};
然后结果是以下堆栈跟踪:
java.util.NoSuchElementException: Sequence contains no elements
at rx.internal.operators.OperatorSingle.onCompleted(OperatorSingle.java:82)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140)
at rx.internal.operators.TakeLastQueueProducer.emit(TakeLastQueueProducer.java:73)
at rx.internal.operators.TakeLastQueueProducer.startEmitting(TakeLastQueueProducer.java:45)
at rx.internal.operators.OperatorTakeLast.onCompleted(OperatorTakeLast.java:59)
at rx.internal.operators.OperatorScan.onCompleted(OperatorScan.java:121)
at rx.internal.operators.OperatorMap.onCompleted(OperatorMap.java:43)
at rx.internal.operators.OperatorFilter.onCompleted(OperatorFilter.java:42)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:79)
at rx.internal.operators.OperatorScan.request(OperatorScan.java:147)
at rx.Subscriber.setProducer(Subscriber.java:139)
at rx.internal.operators.OperatorScan.setProducer(OperatorScan.java:139)
at rx.Subscriber.setProducer(Subscriber.java:133)
at rx.Subscriber.setProducer(Subscriber.java:133)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:47)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:33)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.call(Observable.java:144)
at rx.Observable.call(Observable.java:136)
at rx.Observable.subscribe(Observable.java:7284)
at rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:441)
at rx.observables.BlockingObservable.single(BlockingObservable.java:340)
at EmptyTest2.test(EmptyTest2.java:19)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at org.junit.runners.model.FrameworkMethod.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access[=16=]0(ParentRunner.java:53)
at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:211)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:67)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
看来问题肯定出在reduce函数的使用上。请参阅以下处理这两种情况的测试用例:
@Test
public void testNoReduce()
{
Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};
List<String> results = Observable.from(values).filter(new Func1<Integer, Boolean>()
{
@Override
public Boolean call(Integer integer)
{
if (integer < 0)
return true;
else
return false;
}
}).map(new Func1<Integer, String>()
{
@Override
public String call(Integer integer)
{
return String.valueOf(integer);
}
}).toList().toBlocking().first();
Iterator<String> itr = results.iterator();
StringBuilder b = new StringBuilder();
while (itr.hasNext())
{
b.append(itr.next());
if (itr.hasNext())
b.append(",");
}
System.out.println("Test NoReduce: " + b);
}
使用以下输入:
Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};
我得到了以下预期结果:
Test NoReduce: -2,-1
并使用以下输入:
Integer values[] = new Integer[]{0, 1, 2, 3, 4, 5};
我得到以下预期输出:
Test NoReduce:
因此,除非我完全误解了某些东西,否则真正处理过滤器产生的零元素 Observable 的唯一方法是在 Observable 链之外实现 reduce 逻辑。大家同意这个说法吗?
最终解决方案
这是我在实施 Tomáš Dvořák 和 David Motten 建议后的最终解决方案。我觉得这个方案是合理的。
@Test
public void testWithToList()
{
Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};
Observable.from(values).filter(new Func1<Integer, Boolean>()
{
@Override
public Boolean call(Integer integer)
{
if (integer < 0)
return true;
else
return false;
}
}).toList().map(new Func1<List<Integer>, String>()
{
@Override
public String call(List<Integer> integers)
{
Iterator<Integer> intItr = integers.iterator();
StringBuilder b = new StringBuilder();
while (intItr.hasNext())
{
b.append(intItr.next());
if (intItr.hasNext())
{
b.append(",");
}
}
return b.toString();
}
}).subscribe(new Action1<String>()
{
@Override
public void call(String s)
{
System.out.println("With a toList: " + s);
}
});
}
以下是此测试在给定以下输入时的表现。
当给定一个流,其中一些值将通过过滤器时:
Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};
结果是:
With a toList: -2,-1
当给定一个没有任何值的流通过过滤器时:
Integer values[] = new Integer[]{0, 1, 2, 3, 4, 5};
结果是:
With a toList: <empty string>
发生这种情况的原因是您在空流上使用 toBlocking().single()
。如果您希望流中有 0 或 1 个值,您可以 toList().toBlocking().single()
然后检查列表中的值(可以为空但不会引发您得到的异常)。
更新后错误就很明显了。如果 RxJava 中的 Reduce
正在减少的 observable 为空,则将失败并返回 IllegalArgumentException
,完全按照规范 (http://reactivex.io/documentation/operators/reduce.html).
在函数式编程中,通常有两个泛型运算符将一个集合聚合成一个值,fold
和reduce
。在公认的术语中,fold
采用初始累加器值,以及采用 运行 累加器和集合中的值并生成另一个累加器值的函数。伪代码示例:
[1, 2, 3, 4].fold(0, (accumulator, value) => accumulator + value)
将从 0 开始,最终将 1、2、3、4 添加到 运行 累加器,最后产生 10,即值的总和。
Reduce 非常相似,只是它没有明确地取初始累加器值,它使用第一个值作为初始累加器,然后累加所有剩余值。如果你是,这是有道理的。寻找最小值或最大值。
[1, 2, 3, 4].reduce((accumulator, value) => min(accumulator, value))
看看 fold 和 reduce 不同的方式,你可能会使用 fold
只要聚合值即使在空集合上也有意义(比如,在 sum
中,0 是有意义的),并且 reduce
否则(minimum
对空集合没有意义,并且 reduce
将无法对此类集合进行操作,在您的情况下会抛出异常)。
您正在进行类似的聚合,用逗号散布一组字符串以生成单个字符串。那是有点困难的情况。它可能对一个空集合有意义(你可能期望一个空字符串),但另一方面,如果你从一个空累加器开始,你将在结果中比你预期的多一个逗号。正确的解决方案是首先检查集合是否为空,然后 return 空集合的后备字符串,或者对非空集合执行 reduce
。您可能会观察到,通常您实际上并不需要在空集合情况下使用空字符串,但 "collection is empty" 之类的内容可能更合适,从而进一步向您保证此解决方案是干净的解决方案。
顺便说一句,我在这里随意使用 collection 这个词,而不是 observable,只是出于教育目的。此外,在 RxJava 中,fold
和 reduce
的调用相同,reduce
,只是该方法有两个版本,一个只接受一个参数,其他两个参数。
至于你的最后一个问题:你不必离开 Observable 链。正如 David Motten 建议的那样,只需使用 toList()。
.filter(...)
.toList()
.map(listOfValues => listOfValues.intersperse(", "))
其中 intersperse
可以根据 reduce
来实现,如果还不是库函数的话(这很常见)。
collection.intersperse(separator) =
if (collection.isEmpty())
""
else
collection.reduce(accumulator, element => accumulator + separator + element)
您可以使用具有初始值版本的 reduce .reduce(0, (x,y) -> x+y )
它应该像 Tomáš Dvořák 解释的折叠运算符一样工作