反应堆选择 sink/processor

Reactor choosing a sink/processor

我有以下用例:

在我使用 X 线程使用一些消息的应用程序中,我有一个这样定义的 Consumer 实现:

public interface Consumer {
     onMessage(Object message);
}

问题是 Consumer 不是每个线程的不同实例,而是单个实例,因为它是一个 Spring bean,我们还希望它在每次调用 onMessage 时不会产生副作用。

但是,我想构建的是一个重复消息检测机制,它看起来像这样:

public static <T> Flux<OcurrenceCache<T>> getExceedingRates(Flux<T> values, int maxHits, int bufferSize, Duration bufferTimeout) {
        return values.bufferTimeout(bufferSize, bufferTimeout)
            .map(vals -> {
                OcurrenceCache<T> occurrenceCache = new OcurrenceCache<>(maxHits);
                for (T value : vals) {
                    occurrenceCache.incrementNrOccurrences(value);
                }
                return occurrenceCache;
            });
}

我基本上从值的 Flux 返回一个事件缓存,其中遇到的元素超过所需的最大命中数。

天真地,我可以实现这样的事情:

public class MyConsumer implements Consumer {
    private final EmitterProcessor<Object> emitterProcessor;

    public MyConsumer(Integer maxHits, Integer bufferSize, Long timeoutMillis){
       this.emitterProcessor = EmitterProcessor.create();
       this.emitterProcessor
                .bufferTimeout(bufferSize, Duration.ofMillis(timeoutMillis))
                .subscribe(integers -> {
                    getExceedingRates(Flux.fromIterable(integers), maxHits, bufferSize, Duration.ofMillis(timeoutMillis))
                            .subscribe(integerOcurrenceCache -> {
                                System.out.println(integerOcurrenceCache.getExceedingValues());
                        });
                });
    }

    @Override
    public void onMessage(Object message){
        emitterProcessor.onNext(message); 
    }
}

然而,这远非最佳,因为我知道我来自特定线程的消息将 永远 包含来自另一个线程的任何消息(它们被预先分组为我们使用 jms 分组和 kinesis 分片)。所以,在某种程度上,我想使用这样的处理器:

您可以使用线程本地处理器:

private final ThreadLocal<EmitterProcessor<Object>> emitterProcessorHolder = ThreadLocal.withInitial(() -> {
  EmitterProcessor<Object> processor = ...
  return processor;
});
...
@Override
public void onMessage(Object message){
  emitterProcessorHolder.get().onNext(message); 
}