MassTransit Redis 集成
MassTransit Redis integration
我开始使用 Masstransit sagas 并构建了一个示例,如果我使用 InMemorySagaRepository 一切正常,但是当使用 Redis 存储库时,Increase 消息被移动到 RabbitMq 中的 Consumer_Error 队列错误消息:'Value cannot be null. Parameter name: key' 和此堆栈跟踪:
at System.Collections.Generic.Dictionary`2.FindEntry(TKey key)
at System.Collections.Generic.Dictionary`2.TryGetValue(TKey key, TValue& value)
at Automatonymous.AutomatonymousStateMachine`1.GetState(String name)
at Automatonymous.Accessors.RawStateAccessor`1.Automatonymous.StateAccessor<TInstance>.Get(InstanceContext`1 context)
at Automatonymous.Accessors.InitialIfNullStateAccessor`1.<Automatonymous-StateAccessor<TInstance>-Get>d__3.MoveNext()
at Automatonymous.Pipeline.StateMachineSagaMessageFilter`2.<Send>d__5.MoveNext()
at GreenPipes.Filters.ConcurrencyLimitFilter`1.<Send>d__5.MoveNext()
at MassTransit.RedisIntegration.RedisSagaRepository`1.<SendToInstance>d__7`1.MoveNext()
这是我的代码:
public class SagaConsumer : MassTransitStateMachine<Number>
{
public SagaConsumer()
{
State(() => Active);
InstanceState(x => x.Status);
Event(() => Created);
Event(() => Increased);
Initially(
When(Created)
.Then(context =>
{
context.Instance.Name = context.Data.Name;
context.Instance.Value = 0;
})
.ThenAsync(context => Console.Out.WriteLineAsync($"New name:{context.Data.Name}"))
.TransitionTo(Active)
);
During(Active,
When(Increased)
.Then(context =>
{
context.Instance.Value = context.Data.Increase ? context.Instance.Value + 1 : context.Instance.Value - 1;
})
.ThenAsync(context => Console.Out.WriteLineAsync($"{context.Instance.Name} increased to value {context.Instance.Value}"))
);
}
public State Active
{
get; set;
}
public Event<ICreateMessage> Created { get; set; }
public Event<IIncreaseMessage> Increased { get; set; }
}
以及配置代码:
var bus = Bus.Factory.CreateUsingRabbitMq(configurator =>
{
var host = configurator.Host(new Uri("rabbitmq://localhost"), h =>
{
h.Username("guest");
h.Password("guest");
});
var redisManager = new PooledRedisClientManager("localhost");
configurator.ReceiveEndpoint("Consumer", endpoint =>
{
endpoint.StateMachineSaga(new SagaConsumer(),
//new InMemorySagaRepository<Number>(),
new RedisSagaRepository<Number>(redisManager),
c => c.UseConcurrencyLimit(10));
});
});
和佐贺 class:
public class Number : SagaStateMachineInstance, IHasGuidId, IVersionedSaga, CorrelatedBy<Guid>
{
public Guid CorrelationId
{
get;
set;
}
public string Name { get; set; }
public State Status { get; set; }
public int Value { get; set; }
public int Version { get; set; }
public Guid Id
=> CorrelationId;
}
这似乎与 Number class 的 Status 属性 有关,它在某些时候为空,但我不确定为什么或为什么只有在使用 Redis 时才会发生这种情况
因此,您应该将状态存储为 int
或 string
- 而不是现在的 State
。这将解决您遇到的问题。
我开始使用 Masstransit sagas 并构建了一个示例,如果我使用 InMemorySagaRepository 一切正常,但是当使用 Redis 存储库时,Increase 消息被移动到 RabbitMq 中的 Consumer_Error 队列错误消息:'Value cannot be null. Parameter name: key' 和此堆栈跟踪:
at System.Collections.Generic.Dictionary`2.FindEntry(TKey key)
at System.Collections.Generic.Dictionary`2.TryGetValue(TKey key, TValue& value)
at Automatonymous.AutomatonymousStateMachine`1.GetState(String name)
at Automatonymous.Accessors.RawStateAccessor`1.Automatonymous.StateAccessor<TInstance>.Get(InstanceContext`1 context)
at Automatonymous.Accessors.InitialIfNullStateAccessor`1.<Automatonymous-StateAccessor<TInstance>-Get>d__3.MoveNext()
at Automatonymous.Pipeline.StateMachineSagaMessageFilter`2.<Send>d__5.MoveNext()
at GreenPipes.Filters.ConcurrencyLimitFilter`1.<Send>d__5.MoveNext()
at MassTransit.RedisIntegration.RedisSagaRepository`1.<SendToInstance>d__7`1.MoveNext()
这是我的代码:
public class SagaConsumer : MassTransitStateMachine<Number>
{
public SagaConsumer()
{
State(() => Active);
InstanceState(x => x.Status);
Event(() => Created);
Event(() => Increased);
Initially(
When(Created)
.Then(context =>
{
context.Instance.Name = context.Data.Name;
context.Instance.Value = 0;
})
.ThenAsync(context => Console.Out.WriteLineAsync($"New name:{context.Data.Name}"))
.TransitionTo(Active)
);
During(Active,
When(Increased)
.Then(context =>
{
context.Instance.Value = context.Data.Increase ? context.Instance.Value + 1 : context.Instance.Value - 1;
})
.ThenAsync(context => Console.Out.WriteLineAsync($"{context.Instance.Name} increased to value {context.Instance.Value}"))
);
}
public State Active
{
get; set;
}
public Event<ICreateMessage> Created { get; set; }
public Event<IIncreaseMessage> Increased { get; set; }
}
以及配置代码:
var bus = Bus.Factory.CreateUsingRabbitMq(configurator =>
{
var host = configurator.Host(new Uri("rabbitmq://localhost"), h =>
{
h.Username("guest");
h.Password("guest");
});
var redisManager = new PooledRedisClientManager("localhost");
configurator.ReceiveEndpoint("Consumer", endpoint =>
{
endpoint.StateMachineSaga(new SagaConsumer(),
//new InMemorySagaRepository<Number>(),
new RedisSagaRepository<Number>(redisManager),
c => c.UseConcurrencyLimit(10));
});
});
和佐贺 class:
public class Number : SagaStateMachineInstance, IHasGuidId, IVersionedSaga, CorrelatedBy<Guid>
{
public Guid CorrelationId
{
get;
set;
}
public string Name { get; set; }
public State Status { get; set; }
public int Value { get; set; }
public int Version { get; set; }
public Guid Id
=> CorrelationId;
}
这似乎与 Number class 的 Status 属性 有关,它在某些时候为空,但我不确定为什么或为什么只有在使用 Redis 时才会发生这种情况
因此,您应该将状态存储为 int
或 string
- 而不是现在的 State
。这将解决您遇到的问题。