运行 按组在单独的线程中处理
Run processing in separated thread by group
我正在尝试在我的 Kafka 消费者中使用 Rx。
public static event EventHandler<ConsumeResult<string, string>> GenericEvent;
那么我有下面的代码
var observable = Observable.FromEventPattern<ConsumeResult<string, string>>(
x => GenericEvent += x,
x => GenericEvent -= x)
.Select(x => x.EventArgs);
while (!cancellationToken.IsCancellationRequested)
{
ConsumeResult<string, string> consumeResult = _consumer.Consume(cancellationToken);
GenericEvent(consumeResult.Topic, consumeResult);
}
然后我会在某个地方使用它
var subscription = observable.Subscribe(message =>
{
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} ** {message.Topic}/{message.Partition} @{message.Offset}: '{message.Value}'");
//_kafkaConsumer.Commit(messages);
});
是否有可能 运行 按主题名称 (consumeResult.Topic
) 分隔线程? consumer收到消息后,按topic
重定向到对应线程
试一试:
Observable
.Interval(TimeSpan.FromSeconds(0.1))
.Take(20)
.GroupBy(x => x % 3)
.SelectMany(xs => Observable.Using(() => new EventLoopScheduler(), els => xs.ObserveOn(els)))
.Subscribe(x => Console.WriteLine($"{x} {Thread.CurrentThread.ManagedThreadId}"));
这确保在 new EventLoopScheduler()
调度程序中为 GroupBy
运算符创建的每个内部可观察对象创建一个线程。 SelectMany
使组变平,但 EventLoopScheduler
与每个组保持关联。
在你的情况下你 GroupBy
consumeResult.Topic
属性.
请确保您的源可观察对象随着线程永远存在而结束,直到它们结束。在订阅上调用 Dispose()
足以结束 observable。
我正在尝试在我的 Kafka 消费者中使用 Rx。
public static event EventHandler<ConsumeResult<string, string>> GenericEvent;
那么我有下面的代码
var observable = Observable.FromEventPattern<ConsumeResult<string, string>>(
x => GenericEvent += x,
x => GenericEvent -= x)
.Select(x => x.EventArgs);
while (!cancellationToken.IsCancellationRequested)
{
ConsumeResult<string, string> consumeResult = _consumer.Consume(cancellationToken);
GenericEvent(consumeResult.Topic, consumeResult);
}
然后我会在某个地方使用它
var subscription = observable.Subscribe(message =>
{
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} ** {message.Topic}/{message.Partition} @{message.Offset}: '{message.Value}'");
//_kafkaConsumer.Commit(messages);
});
是否有可能 运行 按主题名称 (consumeResult.Topic
) 分隔线程? consumer收到消息后,按topic
试一试:
Observable
.Interval(TimeSpan.FromSeconds(0.1))
.Take(20)
.GroupBy(x => x % 3)
.SelectMany(xs => Observable.Using(() => new EventLoopScheduler(), els => xs.ObserveOn(els)))
.Subscribe(x => Console.WriteLine($"{x} {Thread.CurrentThread.ManagedThreadId}"));
这确保在 new EventLoopScheduler()
调度程序中为 GroupBy
运算符创建的每个内部可观察对象创建一个线程。 SelectMany
使组变平,但 EventLoopScheduler
与每个组保持关联。
在你的情况下你 GroupBy
consumeResult.Topic
属性.
请确保您的源可观察对象随着线程永远存在而结束,直到它们结束。在订阅上调用 Dispose()
足以结束 observable。