运行 按组在单独的线程中处理

Run processing in separated thread by group

我正在尝试在我的 Kafka 消费者中使用 Rx。

public static event EventHandler<ConsumeResult<string, string>> GenericEvent;

那么我有下面的代码

var observable = Observable.FromEventPattern<ConsumeResult<string, string>>(
     x => GenericEvent += x,
     x => GenericEvent -= x)
  .Select(x => x.EventArgs);

while (!cancellationToken.IsCancellationRequested)
{
    ConsumeResult<string, string> consumeResult = _consumer.Consume(cancellationToken);
    GenericEvent(consumeResult.Topic, consumeResult);
}

然后我会在某个地方使用它

var subscription = observable.Subscribe(message =>
{
    Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} ** {message.Topic}/{message.Partition} @{message.Offset}: '{message.Value}'");

    //_kafkaConsumer.Commit(messages);
});

是否有可能 运行 按主题名称 (consumeResult.Topic) 分隔线程? consumer收到消息后,按topic

重定向到对应线程

试一试:

Observable
    .Interval(TimeSpan.FromSeconds(0.1))
    .Take(20)
    .GroupBy(x => x % 3)
    .SelectMany(xs => Observable.Using(() => new EventLoopScheduler(), els => xs.ObserveOn(els)))
    .Subscribe(x => Console.WriteLine($"{x} {Thread.CurrentThread.ManagedThreadId}"));

这确保在 new EventLoopScheduler() 调度程序中为 GroupBy 运算符创建的每个内部可观察对象创建一个线程。 SelectMany 使组变平,但 EventLoopScheduler 与每个组保持关联。

在你的情况下你 GroupBy consumeResult.Topic 属性.

请确保您的源可观察对象随着线程永远存在而结束,直到它们结束。在订阅上调用 Dispose() 足以结束 observable。