同步提交发布者
Synchronous SubmissionPublisher
是否可以让订阅者 运行 与发布者在同一线程上(同步)?
我可以使用 CompletableFuture,但它只提供一个结果。但是,如果我需要将许多结果交付给订阅者怎么办。请查看这个小测试以获得更好的解释。
@Test
public void testCompletableFutureThreads() throws InterruptedException {
CompletableFuture<String> f = new CompletableFuture<String>();
f.thenAccept(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println("accept " + s + " in " + Thread.currentThread().getName());
}
});
Thread.sleep(200);
System.out.println("send complete from " + Thread.currentThread().getName());
f.complete("test");
Thread.sleep(1000);
}
@Test
public void testSubmissionPublisherThreads() throws InterruptedException {
SubmissionPublisher<String> publisher = new SubmissionPublisher<String>();
publisher.subscribe(new Flow.Subscriber<String>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(String item) {
System.out.println("onNext in " + Thread.currentThread().getName() + " received " + item);
}
@Override
public void onError(Throwable throwable) {
System.err.println("onError in " + Thread.currentThread().getName());
throwable.printStackTrace(System.err);
}
@Override
public void onComplete() {
System.err.println("onComplete in " + Thread.currentThread().getName());
}
});
int i = 10;
while (i-- > 0) {
Thread.sleep(100);
System.out.println("publisher from " + Thread.currentThread().getName());
publisher.submit("" + System.currentTimeMillis());
}
}
您可以使用 SubmissionPublisher(Executor, int)
构造函数并提供一个 Executor
在当前线程上运行所有内容:
final SubmissionPublisher<String> publisher = new SubmissionPublisher<>(new Executor() {
@Override
public void execute(Runnable command) {
command.run();
}
}, Flow.defaultBufferSize());
或者,如果您更喜欢 lambda:
final SubmissionPublisher<String> publisher = new SubmissionPublisher<>(Runnable::run, Flow.defaultBufferSize());
默认构造函数使用ForkJoinPool.commonPool()
发送信号。
是否可以让订阅者 运行 与发布者在同一线程上(同步)? 我可以使用 CompletableFuture,但它只提供一个结果。但是,如果我需要将许多结果交付给订阅者怎么办。请查看这个小测试以获得更好的解释。
@Test
public void testCompletableFutureThreads() throws InterruptedException {
CompletableFuture<String> f = new CompletableFuture<String>();
f.thenAccept(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println("accept " + s + " in " + Thread.currentThread().getName());
}
});
Thread.sleep(200);
System.out.println("send complete from " + Thread.currentThread().getName());
f.complete("test");
Thread.sleep(1000);
}
@Test
public void testSubmissionPublisherThreads() throws InterruptedException {
SubmissionPublisher<String> publisher = new SubmissionPublisher<String>();
publisher.subscribe(new Flow.Subscriber<String>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(String item) {
System.out.println("onNext in " + Thread.currentThread().getName() + " received " + item);
}
@Override
public void onError(Throwable throwable) {
System.err.println("onError in " + Thread.currentThread().getName());
throwable.printStackTrace(System.err);
}
@Override
public void onComplete() {
System.err.println("onComplete in " + Thread.currentThread().getName());
}
});
int i = 10;
while (i-- > 0) {
Thread.sleep(100);
System.out.println("publisher from " + Thread.currentThread().getName());
publisher.submit("" + System.currentTimeMillis());
}
}
您可以使用 SubmissionPublisher(Executor, int)
构造函数并提供一个 Executor
在当前线程上运行所有内容:
final SubmissionPublisher<String> publisher = new SubmissionPublisher<>(new Executor() {
@Override
public void execute(Runnable command) {
command.run();
}
}, Flow.defaultBufferSize());
或者,如果您更喜欢 lambda:
final SubmissionPublisher<String> publisher = new SubmissionPublisher<>(Runnable::run, Flow.defaultBufferSize());
默认构造函数使用ForkJoinPool.commonPool()
发送信号。