MassTransit Redis 集成

MassTransit Redis integration

我开始使用 Masstransit sagas 并构建了一个示例,如果我使用 InMemorySagaRepository 一切正常,但是当使用 Redis 存储库时,Increase 消息被移动到 RabbitMq 中的 Consumer_Error 队列错误消息:'Value cannot be null. Parameter name: key' 和此堆栈跟踪:

at System.Collections.Generic.Dictionary`2.FindEntry(TKey key) 
at System.Collections.Generic.Dictionary`2.TryGetValue(TKey key, TValue& value) 
at Automatonymous.AutomatonymousStateMachine`1.GetState(String name) 
at Automatonymous.Accessors.RawStateAccessor`1.Automatonymous.StateAccessor<TInstance>.Get(InstanceContext`1 context) 
at Automatonymous.Accessors.InitialIfNullStateAccessor`1.<Automatonymous-StateAccessor<TInstance>-Get>d__3.MoveNext() 
at Automatonymous.Pipeline.StateMachineSagaMessageFilter`2.<Send>d__5.MoveNext() 
at GreenPipes.Filters.ConcurrencyLimitFilter`1.<Send>d__5.MoveNext() 
at MassTransit.RedisIntegration.RedisSagaRepository`1.<SendToInstance>d__7`1.MoveNext()

这是我的代码:

public class SagaConsumer : MassTransitStateMachine<Number>
{
    public SagaConsumer()
    {
        State(() => Active);

        InstanceState(x => x.Status);

        Event(() => Created); 
        Event(() => Increased);

        Initially(
                    When(Created)
                    .Then(context =>
                    {
                        context.Instance.Name = context.Data.Name;
                        context.Instance.Value = 0;
                    })
                    .ThenAsync(context => Console.Out.WriteLineAsync($"New name:{context.Data.Name}"))
                    .TransitionTo(Active)
                );

        During(Active,
                    When(Increased)
                    .Then(context =>
                    {
                        context.Instance.Value = context.Data.Increase ? context.Instance.Value + 1 : context.Instance.Value - 1;
                     })
                    .ThenAsync(context => Console.Out.WriteLineAsync($"{context.Instance.Name} increased to value {context.Instance.Value}"))
               );
    }

    public State Active
    {
        get; set;
    }

    public Event<ICreateMessage> Created { get; set; }
    public Event<IIncreaseMessage> Increased { get; set; }
}

以及配置代码:

var bus = Bus.Factory.CreateUsingRabbitMq(configurator =>
{
    var host = configurator.Host(new Uri("rabbitmq://localhost"), h =>
    {
       h.Username("guest");
       h.Password("guest");
    });

    var redisManager = new PooledRedisClientManager("localhost");
    configurator.ReceiveEndpoint("Consumer", endpoint =>
    {
        endpoint.StateMachineSaga(new SagaConsumer(), 
                                    //new InMemorySagaRepository<Number>(),
                                    new RedisSagaRepository<Number>(redisManager),
                                    c => c.UseConcurrencyLimit(10));
    });                
});

和佐贺 class:

public class Number : SagaStateMachineInstance, IHasGuidId, IVersionedSaga, CorrelatedBy<Guid>
{
    public Guid CorrelationId
    {
        get;
        set;
    }

    public string Name { get; set; }
    public State Status { get; set; }
    public int Value { get; set; }
    public int Version { get; set; }

    public Guid Id
        => CorrelationId;
}

这似乎与 Number class 的 Status 属性 有关,它在某些时候为空,但我不确定为什么或为什么只有在使用 Redis 时才会发生这种情况

因此,您应该将状态存储为 intstring - 而不是现在的 State。这将解决您遇到的问题。