反应堆选择 sink/processor
Reactor choosing a sink/processor
我有以下用例:
在我使用 X 线程使用一些消息的应用程序中,我有一个这样定义的 Consumer 实现:
public interface Consumer {
onMessage(Object message);
}
问题是 Consumer 不是每个线程的不同实例,而是单个实例,因为它是一个 Spring bean,我们还希望它在每次调用 onMessage 时不会产生副作用。
但是,我想构建的是一个重复消息检测机制,它看起来像这样:
public static <T> Flux<OcurrenceCache<T>> getExceedingRates(Flux<T> values, int maxHits, int bufferSize, Duration bufferTimeout) {
return values.bufferTimeout(bufferSize, bufferTimeout)
.map(vals -> {
OcurrenceCache<T> occurrenceCache = new OcurrenceCache<>(maxHits);
for (T value : vals) {
occurrenceCache.incrementNrOccurrences(value);
}
return occurrenceCache;
});
}
我基本上从值的 Flux 返回一个事件缓存,其中遇到的元素超过所需的最大命中数。
天真地,我可以实现这样的事情:
public class MyConsumer implements Consumer {
private final EmitterProcessor<Object> emitterProcessor;
public MyConsumer(Integer maxHits, Integer bufferSize, Long timeoutMillis){
this.emitterProcessor = EmitterProcessor.create();
this.emitterProcessor
.bufferTimeout(bufferSize, Duration.ofMillis(timeoutMillis))
.subscribe(integers -> {
getExceedingRates(Flux.fromIterable(integers), maxHits, bufferSize, Duration.ofMillis(timeoutMillis))
.subscribe(integerOcurrenceCache -> {
System.out.println(integerOcurrenceCache.getExceedingValues());
});
});
}
@Override
public void onMessage(Object message){
emitterProcessor.onNext(message);
}
}
然而,这远非最佳,因为我知道我来自特定线程的消息将 永远 包含来自另一个线程的任何消息(它们被预先分组为我们使用 jms 分组和 kinesis 分片)。所以,在某种程度上,我想使用这样的处理器:
- 使用调用 onMessage 的同一个线程来隔离通量,这样隔离通量的值就不会与另一个线程中的变量混淆。
您可以使用线程本地处理器:
private final ThreadLocal<EmitterProcessor<Object>> emitterProcessorHolder = ThreadLocal.withInitial(() -> {
EmitterProcessor<Object> processor = ...
return processor;
});
...
@Override
public void onMessage(Object message){
emitterProcessorHolder.get().onNext(message);
}
我有以下用例:
在我使用 X 线程使用一些消息的应用程序中,我有一个这样定义的 Consumer 实现:
public interface Consumer {
onMessage(Object message);
}
问题是 Consumer 不是每个线程的不同实例,而是单个实例,因为它是一个 Spring bean,我们还希望它在每次调用 onMessage 时不会产生副作用。
但是,我想构建的是一个重复消息检测机制,它看起来像这样:
public static <T> Flux<OcurrenceCache<T>> getExceedingRates(Flux<T> values, int maxHits, int bufferSize, Duration bufferTimeout) {
return values.bufferTimeout(bufferSize, bufferTimeout)
.map(vals -> {
OcurrenceCache<T> occurrenceCache = new OcurrenceCache<>(maxHits);
for (T value : vals) {
occurrenceCache.incrementNrOccurrences(value);
}
return occurrenceCache;
});
}
我基本上从值的 Flux 返回一个事件缓存,其中遇到的元素超过所需的最大命中数。
天真地,我可以实现这样的事情:
public class MyConsumer implements Consumer {
private final EmitterProcessor<Object> emitterProcessor;
public MyConsumer(Integer maxHits, Integer bufferSize, Long timeoutMillis){
this.emitterProcessor = EmitterProcessor.create();
this.emitterProcessor
.bufferTimeout(bufferSize, Duration.ofMillis(timeoutMillis))
.subscribe(integers -> {
getExceedingRates(Flux.fromIterable(integers), maxHits, bufferSize, Duration.ofMillis(timeoutMillis))
.subscribe(integerOcurrenceCache -> {
System.out.println(integerOcurrenceCache.getExceedingValues());
});
});
}
@Override
public void onMessage(Object message){
emitterProcessor.onNext(message);
}
}
然而,这远非最佳,因为我知道我来自特定线程的消息将 永远 包含来自另一个线程的任何消息(它们被预先分组为我们使用 jms 分组和 kinesis 分片)。所以,在某种程度上,我想使用这样的处理器:
- 使用调用 onMessage 的同一个线程来隔离通量,这样隔离通量的值就不会与另一个线程中的变量混淆。
您可以使用线程本地处理器:
private final ThreadLocal<EmitterProcessor<Object>> emitterProcessorHolder = ThreadLocal.withInitial(() -> {
EmitterProcessor<Object> processor = ...
return processor;
});
...
@Override
public void onMessage(Object message){
emitterProcessorHolder.get().onNext(message);
}