公共交通时间表石英问题
Masstransit schedule quartz issue
我关注 ChrisPatterson tutorial,我尝试按照他在视频中所说的去做,但是当我为 Expiring the allocation
安排消息时它不起作用,当我仔细看时我才注意到RabbitMq
中的调度器队列没有任何消费者,我尝试解决但没有找到问题,这是我的状态机:
public class AllocationStateMachine : MassTransitStateMachine<AllocationState>
{
public AllocationStateMachine(ILogger<AllocationStateMachine> logger)
{
this.InstanceState(x => x.CurrentState);
this.ConfigureCorrelationIds();
Schedule(() => HoldExpiration, (allocationState) => allocationState.HoldDurationToken, (config) =>
{
//Set a deafult and fixed message delay
config.Delay = TimeSpan.FromHours(1);
config.Received = x => x.CorrelateById(m => m.Message.AllocationId);
});
Initially(When(AllocationCreated)
.Then(context => logger.LogInformation($"AllocationCreated for AllocationId: {context.Message.AllocationId}"))
.Schedule(HoldExpiration,
context =>
{
var msg = context.Init<AllocationHoldDurationExpired>(new { context.Message.AllocationId });
return msg;
},
context => context.Message.HoldDuration)
.TransitionTo(Allocated));
During(Allocated,
When(HoldExpiration.Received)
.Then(context =>
{
logger.LogInformation("Allocation expired {AllocationId}", context.Saga.CorrelationId);
})
.Finalize()
);
// Sets the state machine instance to Completed when in the final state. The saga
// repository removes completed state machine instances.
//SetCompletedWhenFinalized();
}
private void ConfigureCorrelationIds()
{
Event(() => AllocationCreated, x => x.CorrelateById(m => m.Message.AllocationId));
}
public Schedule<AllocationState, AllocationHoldDurationExpired> HoldExpiration { get; set; }
public State Allocated { get; set; }
public State Released { get; set; }
public Event<IAllocationCreated> AllocationCreated { get; set; }
}
和 Program.cs
:
builder.Services.AddMassTransit(x =>
{
x.AddConsumersFromNamespaceContaining<AllocateInventoryConsumer>();
x.AddSagaStateMachine<AllocationStateMachine, AllocationState>()
.EntityFrameworkRepository(r =>
{
r.ConcurrencyMode = ConcurrencyMode.Pessimistic; // or use Optimistic, which requires RowVersion
r.AddDbContext<DbContext, AllocationStateDbContext>((provider, dbContextOptionBuilder) =>
{
dbContextOptionBuilder.UseSqlServer(builder.Configuration.GetConnectionString("WarehouseConn"), m =>
{
m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
m.MigrationsHistoryTable($"__{nameof(AllocationStateDbContext)}");
});
});
});
Uri schedulerEndpoint = new Uri("queue:scheduler");
x.AddMessageScheduler(schedulerEndpoint);
x.UsingRabbitMq((context, cfg) =>
{
//for configuring Quartz endpoint
cfg.UseMessageScheduler(schedulerEndpoint);
//cfg.ReceiveEndpoint("scheduler", endpoint =>
//{
// cfg.UseMessageScheduler(schedulerEndpoint);
//});
cfg.ConfigureEndpoints(context);
cfg.Host(builder.Configuration["RabbitMQ:Host"], "/",
h =>
{
h.Username(BusConstants.Username);
h.Password(BusConstants.Password);
});
});
});
还有我的包裹:
<ItemGroup>
<PackageReference Include="MassTransit" Version="8.0.1" />
<PackageReference Include="MassTransit.Analyzers" Version="8.0.1" />
<PackageReference Include="MassTransit.AspNetCore" Version="7.3.1" />
<PackageReference Include="MassTransit.RabbitMQ" Version="8.0.1" />
<PackageReference Include="MassTransit.EntityFrameworkCore" Version="8.0.1" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="6.0.4" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="6.0.4">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="6.0.4" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="6.0.4">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.14.0" />
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.2.3" />
</ItemGroup>
正如我所说,scheduler
已创建,但它没有任何消费者:
有什么问题?
您在任何端点上都不是 运行 Quartz。您需要配置实际的 Quartz 消费者以及 Quartz 才能工作。有一个 sample available that shows how to configure it. Or you can configure it using the latest syntax 仅在此时的单元测试中显示。
我关注 ChrisPatterson tutorial,我尝试按照他在视频中所说的去做,但是当我为 Expiring the allocation
安排消息时它不起作用,当我仔细看时我才注意到RabbitMq
中的调度器队列没有任何消费者,我尝试解决但没有找到问题,这是我的状态机:
public class AllocationStateMachine : MassTransitStateMachine<AllocationState>
{
public AllocationStateMachine(ILogger<AllocationStateMachine> logger)
{
this.InstanceState(x => x.CurrentState);
this.ConfigureCorrelationIds();
Schedule(() => HoldExpiration, (allocationState) => allocationState.HoldDurationToken, (config) =>
{
//Set a deafult and fixed message delay
config.Delay = TimeSpan.FromHours(1);
config.Received = x => x.CorrelateById(m => m.Message.AllocationId);
});
Initially(When(AllocationCreated)
.Then(context => logger.LogInformation($"AllocationCreated for AllocationId: {context.Message.AllocationId}"))
.Schedule(HoldExpiration,
context =>
{
var msg = context.Init<AllocationHoldDurationExpired>(new { context.Message.AllocationId });
return msg;
},
context => context.Message.HoldDuration)
.TransitionTo(Allocated));
During(Allocated,
When(HoldExpiration.Received)
.Then(context =>
{
logger.LogInformation("Allocation expired {AllocationId}", context.Saga.CorrelationId);
})
.Finalize()
);
// Sets the state machine instance to Completed when in the final state. The saga
// repository removes completed state machine instances.
//SetCompletedWhenFinalized();
}
private void ConfigureCorrelationIds()
{
Event(() => AllocationCreated, x => x.CorrelateById(m => m.Message.AllocationId));
}
public Schedule<AllocationState, AllocationHoldDurationExpired> HoldExpiration { get; set; }
public State Allocated { get; set; }
public State Released { get; set; }
public Event<IAllocationCreated> AllocationCreated { get; set; }
}
和 Program.cs
:
builder.Services.AddMassTransit(x =>
{
x.AddConsumersFromNamespaceContaining<AllocateInventoryConsumer>();
x.AddSagaStateMachine<AllocationStateMachine, AllocationState>()
.EntityFrameworkRepository(r =>
{
r.ConcurrencyMode = ConcurrencyMode.Pessimistic; // or use Optimistic, which requires RowVersion
r.AddDbContext<DbContext, AllocationStateDbContext>((provider, dbContextOptionBuilder) =>
{
dbContextOptionBuilder.UseSqlServer(builder.Configuration.GetConnectionString("WarehouseConn"), m =>
{
m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
m.MigrationsHistoryTable($"__{nameof(AllocationStateDbContext)}");
});
});
});
Uri schedulerEndpoint = new Uri("queue:scheduler");
x.AddMessageScheduler(schedulerEndpoint);
x.UsingRabbitMq((context, cfg) =>
{
//for configuring Quartz endpoint
cfg.UseMessageScheduler(schedulerEndpoint);
//cfg.ReceiveEndpoint("scheduler", endpoint =>
//{
// cfg.UseMessageScheduler(schedulerEndpoint);
//});
cfg.ConfigureEndpoints(context);
cfg.Host(builder.Configuration["RabbitMQ:Host"], "/",
h =>
{
h.Username(BusConstants.Username);
h.Password(BusConstants.Password);
});
});
});
还有我的包裹:
<ItemGroup>
<PackageReference Include="MassTransit" Version="8.0.1" />
<PackageReference Include="MassTransit.Analyzers" Version="8.0.1" />
<PackageReference Include="MassTransit.AspNetCore" Version="7.3.1" />
<PackageReference Include="MassTransit.RabbitMQ" Version="8.0.1" />
<PackageReference Include="MassTransit.EntityFrameworkCore" Version="8.0.1" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="6.0.4" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="6.0.4">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="6.0.4" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="6.0.4">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.14.0" />
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.2.3" />
</ItemGroup>
正如我所说,scheduler
已创建,但它没有任何消费者:
有什么问题?
您在任何端点上都不是 运行 Quartz。您需要配置实际的 Quartz 消费者以及 Quartz 才能工作。有一个 sample available that shows how to configure it. Or you can configure it using the latest syntax 仅在此时的单元测试中显示。