轨道交通。卡夫卡。如何在消息处理期间从 saga 状态机生成消息
MassTransit. Kafka. How to produce message from saga state machine, during message processing
PublishAsync 无效
示例program.cs:
namespace MassTransitKafka
{
class Program
{
private static ServiceProvider _serviceProvider;
static async Task Main(string[] args)
{
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.UsingInMemory((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
x.AddRider(rider =>
{
rider.AddProducer<Enter1>(nameof(Enter1));
rider.AddProducer<Enter2>(nameof(Enter2));
rider.AddProducer<Enter3>(nameof(Enter3));
rider.AddProducer<EnterEnter>(nameof(EnterEnter));
rider.AddSagaStateMachine<TestSaga1StateMachine, TestSaga1State>(typeof(TestSaga1StateDefinition))
.InMemoryRepository();
rider.UsingKafka((context, k) =>
{
k.Host("localhost:9092");
k.TopicEndpoint<Null, Enter1>(nameof(Enter1), nameof(TestSaga1StateMachine), c =>
{
c.AutoOffsetReset = AutoOffsetReset.Earliest;
c.ConfigureSaga<TestSaga1State>(context);
});
k.TopicEndpoint<Null, Enter2>(nameof(Enter2), nameof(TestSaga1StateMachine), c =>
{
c.AutoOffsetReset = AutoOffsetReset.Earliest;
c.ConfigureSaga<TestSaga1State>(context);
});
k.TopicEndpoint<Null, Enter3>(nameof(Enter3), nameof(TestSaga1StateMachine), c =>
{
c.AutoOffsetReset = AutoOffsetReset.Earliest;
c.ConfigureSaga<TestSaga1State>(context);
});
k.TopicEndpoint<Null, EnterEnter>(nameof(EnterEnter), nameof(TestSaga1StateMachine), c =>
{
c.AutoOffsetReset = AutoOffsetReset.Earliest;
c.ConfigureSaga<TestSaga1State>(context);
});
});
});
});
_serviceProvider = services.BuildServiceProvider();
var busControl = _serviceProvider.GetRequiredService<IBusControl>();
var observer = new ReceiveObserver();
busControl.ConnectReceiveObserver(observer);
await busControl.StartAsync();
var tokenSource = new CancellationTokenSource();
ThreadPool.QueueUserWorkItem(s =>
{
Work(busControl, tokenSource.Token).GetAwaiter().GetResult();
});
while (true)
{
var quit = Console.ReadLine();
if (quit == "quit")
{
tokenSource.Cancel();
break;
}
}
}
private static async Task Work(IPublishEndpoint publisher, CancellationToken token)
{
var correlationId = Guid.NewGuid();
var enter1Producer = _serviceProvider.GetRequiredService<ITopicProducer<Enter1>>();
await enter1Producer.Produce(new {CorrelationId = correlationId, EnteredText = "1"}, token);
while (token.IsCancellationRequested == false)
{
var cancelled = token.WaitHandle.WaitOne(5000);
if (cancelled)
break;
}
}
private static Dictionary<string, string> Configuration
{
get
{
return new Dictionary<string, string>
{
{ "bootstrap.servers", "localhost:9092" },
{ "group.id", "saga.group.id" }
};
}
}
}
}
示例TestSaga1StateMachine.cs
public class TestSaga1StateMachine : MassTransitStateMachine<TestSaga1State>
{
public TestSaga1StateMachine()
{
InstanceState(_ => _.CurrentState);
Event(() => Enter1Event, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Event(() => Enter2Event, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Event(() => Enter3Event, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Event(() => EnterEnterEvent, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Initially(
When(Enter1Event)
.Then(context => context.Instance.SaveEnter1(context.Data))
// Messages are not sent here
.PublishAsync(context => context.Init<Enter2>(new {EnteredText = "2"}))
.TransitionTo(Entered1)
);
During(Entered1,
When(Enter2Event)
.Then(context => context.Instance.SaveEnter2(context.Data))
// Messages are not sent here
.PublishAsync(context => context.Init<Enter3>(new {EnteredText = "3"}))
.TransitionTo(Entered2)
);
During(Entered2,
When(Enter3Event)
.Then(context => context.Instance.SaveEnter3(context.Data))
// Messages are not sent here
.PublishAsync(context => context.Init<EnterEnter>(new {EnteredText = "Enter"}))
.TransitionTo(Entered3)
);
During(Entered3,
When(EnterEnterEvent)
.Then(context => context.Instance.Print())
.TransitionTo(EnteredEnter)
.Finalize());
SetCompletedWhenFinalized();
}
public State Entered1 { get; set; }
public State Entered2 { get; set; }
public State Entered3 { get; set; }
public State EnteredEnter { get; set; }
public Event<Enter1> Enter1Event { get; set; }
public Event<Enter2> Enter2Event { get; set; }
public Event<Enter3> Enter3Event { get; set; }
public Event<EnterEnter> EnterEnterEvent { get; set; }
}
这个项目只是为了我的学习。
我不明白如何在那里产生信息
总线配置与文档中的相同。第一条Enter1消息发布成功,saga收到,但是如何从saga向kafka发送消息还不清楚
您需要创建一个自定义状态机 activity,依赖于 producer 接口(在配置 Kafka 时设置),以便生成消息到 Kafka 主题。作为第 2 季的一部分,我最近对此进行了 video。
您可以在 unit tests
中查看生产者设置示例
services.AddMassTransit(x =>
{
x.AddRider(rider =>
{
rider.AddProducer<KafkaMessage>(Topic);
rider.UsingKafka((context, k) =>
{
k.Host("localhost:9092");
});
});
});
然后,在您的自定义状态机 activity 中,您将在 ITopicProducer<KafkaMessage>
上添加构造函数依赖项并使用它来生成消息。它可能看起来类似于 this one:
public class ProduceEnter2Activity :
Activity<TestSaga1State>
{
readonly ITopicProducer<Enter2> _producer;
public ProduceEnter2Activity(ITopicProducer<Enter2> producer)
{
_producer = producer;
}
public void Probe(ProbeContext context)
{
context.CreateScope("notifyMember");
}
public void Accept(StateMachineVisitor visitor)
{
visitor.Visit(this);
}
public async Task Execute(BehaviorContext<TestSaga1State> context, Behavior<TestSaga1State> next)
{
await Execute(context);
await next.Execute(context);
}
public async Task Execute<T>(BehaviorContext<TestSaga1State, T> context, Behavior<TestSaga1State, T> next)
{
await Execute(context);
await next.Execute(context);
}
public Task Faulted<TException>(BehaviorExceptionContext<TestSaga1State, TException> context, Behavior<TestSaga1State> next)
where TException : Exception
{
return next.Faulted(context);
}
public Task Faulted<T, TException>(BehaviorExceptionContext<TestSaga1State, T, TException> context, Behavior<TestSaga1State, T> next)
where TException : Exception
{
return next.Faulted(context);
}
async Task Execute(BehaviorContext<TestSaga1State> context)
{
await _producer.Produce(new Enter2(...));
}
}
然后,在您的状态机中,您将使用:
.Activity(x => x.OfInstanceType<ProduceEnter2Activity>())
PublishAsync 无效
示例program.cs:
namespace MassTransitKafka
{
class Program
{
private static ServiceProvider _serviceProvider;
static async Task Main(string[] args)
{
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.UsingInMemory((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
x.AddRider(rider =>
{
rider.AddProducer<Enter1>(nameof(Enter1));
rider.AddProducer<Enter2>(nameof(Enter2));
rider.AddProducer<Enter3>(nameof(Enter3));
rider.AddProducer<EnterEnter>(nameof(EnterEnter));
rider.AddSagaStateMachine<TestSaga1StateMachine, TestSaga1State>(typeof(TestSaga1StateDefinition))
.InMemoryRepository();
rider.UsingKafka((context, k) =>
{
k.Host("localhost:9092");
k.TopicEndpoint<Null, Enter1>(nameof(Enter1), nameof(TestSaga1StateMachine), c =>
{
c.AutoOffsetReset = AutoOffsetReset.Earliest;
c.ConfigureSaga<TestSaga1State>(context);
});
k.TopicEndpoint<Null, Enter2>(nameof(Enter2), nameof(TestSaga1StateMachine), c =>
{
c.AutoOffsetReset = AutoOffsetReset.Earliest;
c.ConfigureSaga<TestSaga1State>(context);
});
k.TopicEndpoint<Null, Enter3>(nameof(Enter3), nameof(TestSaga1StateMachine), c =>
{
c.AutoOffsetReset = AutoOffsetReset.Earliest;
c.ConfigureSaga<TestSaga1State>(context);
});
k.TopicEndpoint<Null, EnterEnter>(nameof(EnterEnter), nameof(TestSaga1StateMachine), c =>
{
c.AutoOffsetReset = AutoOffsetReset.Earliest;
c.ConfigureSaga<TestSaga1State>(context);
});
});
});
});
_serviceProvider = services.BuildServiceProvider();
var busControl = _serviceProvider.GetRequiredService<IBusControl>();
var observer = new ReceiveObserver();
busControl.ConnectReceiveObserver(observer);
await busControl.StartAsync();
var tokenSource = new CancellationTokenSource();
ThreadPool.QueueUserWorkItem(s =>
{
Work(busControl, tokenSource.Token).GetAwaiter().GetResult();
});
while (true)
{
var quit = Console.ReadLine();
if (quit == "quit")
{
tokenSource.Cancel();
break;
}
}
}
private static async Task Work(IPublishEndpoint publisher, CancellationToken token)
{
var correlationId = Guid.NewGuid();
var enter1Producer = _serviceProvider.GetRequiredService<ITopicProducer<Enter1>>();
await enter1Producer.Produce(new {CorrelationId = correlationId, EnteredText = "1"}, token);
while (token.IsCancellationRequested == false)
{
var cancelled = token.WaitHandle.WaitOne(5000);
if (cancelled)
break;
}
}
private static Dictionary<string, string> Configuration
{
get
{
return new Dictionary<string, string>
{
{ "bootstrap.servers", "localhost:9092" },
{ "group.id", "saga.group.id" }
};
}
}
}
}
示例TestSaga1StateMachine.cs
public class TestSaga1StateMachine : MassTransitStateMachine<TestSaga1State>
{
public TestSaga1StateMachine()
{
InstanceState(_ => _.CurrentState);
Event(() => Enter1Event, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Event(() => Enter2Event, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Event(() => Enter3Event, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Event(() => EnterEnterEvent, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Initially(
When(Enter1Event)
.Then(context => context.Instance.SaveEnter1(context.Data))
// Messages are not sent here
.PublishAsync(context => context.Init<Enter2>(new {EnteredText = "2"}))
.TransitionTo(Entered1)
);
During(Entered1,
When(Enter2Event)
.Then(context => context.Instance.SaveEnter2(context.Data))
// Messages are not sent here
.PublishAsync(context => context.Init<Enter3>(new {EnteredText = "3"}))
.TransitionTo(Entered2)
);
During(Entered2,
When(Enter3Event)
.Then(context => context.Instance.SaveEnter3(context.Data))
// Messages are not sent here
.PublishAsync(context => context.Init<EnterEnter>(new {EnteredText = "Enter"}))
.TransitionTo(Entered3)
);
During(Entered3,
When(EnterEnterEvent)
.Then(context => context.Instance.Print())
.TransitionTo(EnteredEnter)
.Finalize());
SetCompletedWhenFinalized();
}
public State Entered1 { get; set; }
public State Entered2 { get; set; }
public State Entered3 { get; set; }
public State EnteredEnter { get; set; }
public Event<Enter1> Enter1Event { get; set; }
public Event<Enter2> Enter2Event { get; set; }
public Event<Enter3> Enter3Event { get; set; }
public Event<EnterEnter> EnterEnterEvent { get; set; }
}
这个项目只是为了我的学习。 我不明白如何在那里产生信息 总线配置与文档中的相同。第一条Enter1消息发布成功,saga收到,但是如何从saga向kafka发送消息还不清楚
您需要创建一个自定义状态机 activity,依赖于 producer 接口(在配置 Kafka 时设置),以便生成消息到 Kafka 主题。作为第 2 季的一部分,我最近对此进行了 video。
您可以在 unit tests
中查看生产者设置示例services.AddMassTransit(x =>
{
x.AddRider(rider =>
{
rider.AddProducer<KafkaMessage>(Topic);
rider.UsingKafka((context, k) =>
{
k.Host("localhost:9092");
});
});
});
然后,在您的自定义状态机 activity 中,您将在 ITopicProducer<KafkaMessage>
上添加构造函数依赖项并使用它来生成消息。它可能看起来类似于 this one:
public class ProduceEnter2Activity :
Activity<TestSaga1State>
{
readonly ITopicProducer<Enter2> _producer;
public ProduceEnter2Activity(ITopicProducer<Enter2> producer)
{
_producer = producer;
}
public void Probe(ProbeContext context)
{
context.CreateScope("notifyMember");
}
public void Accept(StateMachineVisitor visitor)
{
visitor.Visit(this);
}
public async Task Execute(BehaviorContext<TestSaga1State> context, Behavior<TestSaga1State> next)
{
await Execute(context);
await next.Execute(context);
}
public async Task Execute<T>(BehaviorContext<TestSaga1State, T> context, Behavior<TestSaga1State, T> next)
{
await Execute(context);
await next.Execute(context);
}
public Task Faulted<TException>(BehaviorExceptionContext<TestSaga1State, TException> context, Behavior<TestSaga1State> next)
where TException : Exception
{
return next.Faulted(context);
}
public Task Faulted<T, TException>(BehaviorExceptionContext<TestSaga1State, T, TException> context, Behavior<TestSaga1State, T> next)
where TException : Exception
{
return next.Faulted(context);
}
async Task Execute(BehaviorContext<TestSaga1State> context)
{
await _producer.Produce(new Enter2(...));
}
}
然后,在您的状态机中,您将使用:
.Activity(x => x.OfInstanceType<ProduceEnter2Activity>())