NServiceBus 事件在单独的线程中发布时丢失
NServiceBus events lost when published in separate thread
我一直致力于让长 运行 消息与 Azure 传输上的 NServiceBus 一起工作。基于 this document,我认为我可以在单独的线程中触发长进程,将事件处理程序任务标记为完成,然后侦听自定义 OperationStarted 或 OperationComplete 事件。我注意到在大多数情况下我的处理程序没有收到 OperationComplete 事件。事实上,唯一一次收到它是在我发布 OperationStarted 事件后立即发布它的时候。两者之间的任何实际处理都会以某种方式阻止接收到完成事件。这是我的代码:
摘要 class 用于长 运行 消息
public abstract class LongRunningOperationHandler<TMessage> : IHandleMessages<TMessage> where TMessage : class
{
protected ILog _logger => LogManager.GetLogger<LongRunningOperationHandler<TMessage>>();
public Task Handle(TMessage message, IMessageHandlerContext context)
{
var opStarted = new OperationStarted
{
OperationID = Guid.NewGuid(),
OperationType = typeof(TMessage).FullName
};
var errors = new List<string>();
// Fire off the long running task in a separate thread
Task.Run(() =>
{
try
{
_logger.Info($"Operation Started: {JsonConvert.SerializeObject(opStarted)}");
context.Publish(opStarted);
ProcessMessage(message, context);
}
catch (Exception ex)
{
errors.Add(ex.Message);
}
finally
{
var opComplete = new OperationComplete
{
OperationType = typeof(TMessage).FullName,
OperationID = opStarted.OperationID,
Errors = errors
};
context.Publish(opComplete);
_logger.Info($"Operation Complete: {JsonConvert.SerializeObject(opComplete)}");
}
});
return Task.CompletedTask;
}
protected abstract void ProcessMessage(TMessage message, IMessageHandlerContext context);
}
测试实施
public class TestLongRunningOpHandler : LongRunningOperationHandler<TestCommand>
{
protected override void ProcessMessage(TestCommand message, IMessageHandlerContext context)
{
// If I remove this, or lessen it to something like 200 milliseconds, the
// OperationComplete event gets handled
Thread.Sleep(1000);
}
}
操作事件
public sealed class OperationComplete : IEvent
{
public Guid OperationID { get; set; }
public string OperationType { get; set; }
public bool Success => !Errors?.Any() ?? true;
public List<string> Errors { get; set; } = new List<string>();
public DateTimeOffset CompletedOn { get; set; } = DateTimeOffset.UtcNow;
}
public sealed class OperationStarted : IEvent
{
public Guid OperationID { get; set; }
public string OperationType { get; set; }
public DateTimeOffset StartedOn { get; set; } = DateTimeOffset.UtcNow;
}
处理程序
public class OperationHandler : IHandleMessages<OperationStarted>
, IHandleMessages<OperationComplete>
{
static ILog logger = LogManager.GetLogger<OperationHandler>();
public Task Handle(OperationStarted message, IMessageHandlerContext context)
{
return PrintJsonMessage(message);
}
public Task Handle(OperationComplete message, IMessageHandlerContext context)
{
// This is not hit if ProcessMessage takes too long
return PrintJsonMessage(message);
}
private Task PrintJsonMessage<T>(T message) where T : class
{
var msgObj = new
{
Message = typeof(T).Name,
Data = message
};
logger.Info(JsonConvert.SerializeObject(msgObj, Formatting.Indented));
return Task.CompletedTask;
}
}
我确定 context.Publish()
调用正在被触发,因为 _logger.Info()
调用正在将消息打印到我的测试控制台。我还验证了它们遇到了断点。在我的测试中,运行时间超过 500 毫秒的任何内容都会阻止处理 OperationComplete 事件。
如果有人可以就在 ProcessMessage 实现中经过任何大量时间后为什么 OperationComplete 事件没有触发处理程序提出建议,我将非常感激听到他们的意见。谢谢!
--更新--
万一其他人遇到这个并对我最终做了什么感到好奇:
在an exchange with the developers of NServiceBus, I decided on using a watchdog saga that implemented the IHandleTimeouts interface to periodically check for job completion. I was using saga data, updated when the job was finished, to determine whether to fire off the OperationComplete
event in the timeout handler. This presented an other issue: when using In-Memory Persistence, the saga data was not persisted跨线程之后,即使它被每个线程锁定。为了解决这个问题,我创建了一个专门用于 long 运行 内存数据持久化的接口。此接口作为单例注入到 saga 中,因此用于 read/write 跨线程的 saga 数据以进行长 运行 操作。
我知道不建议使用内存中持久性,但对于我的需要,配置另一种类型的持久性(如 Azure 表)太过分了;我只是希望 OperationComplete
事件在正常情况下触发。如果在 运行 作业期间发生重启,我不需要保留 saga 数据。作业无论如何都会被缩短,如果作业运行时间超过设定的最长时间,传奇超时将处理触发 OperationComplete
事件并出现错误。
这是因为如果 ProcessMessage
足够快,您可能会在它失效之前获得当前 context
,例如被处置。
通过从 Handle
成功返回,您告诉 NServiceBus:"I'm done with this message",因此它也可以对 context
做它想做的事情,例如使它无效。在后台处理器中,您需要一个端点实例,而不是消息上下文。
到新任务开始的时候运行,你并不知道Handle
是否已经返回,所以你应该认为消息已经被消费了,因此是不可恢复的。如果您的单独任务发生错误,您将无法重试。
避免无持久性的长 运行 过程。您提到的示例有一个服务器,用于存储消息中的工作项,以及一个为工作项轮询此存储的进程。也许不理想,以防你横向扩展处理器,但它不会丢失消息。
为避免持续轮询,将服务器和处理器合并,启动时无条件轮询一次,并在Handle
安排轮询任务。注意此任务仅在没有其他轮询任务 运行 时才轮询,否则它可能变得比持续轮询更糟糕。您可以使用信号量来控制它。
要横向扩展,您必须有更多的服务器。您需要测量 N 个处理器轮询的成本是否大于以 round-robin 方式发送到 N 个服务器的成本,对于某些 N,要知道哪种方法实际上执行得更好。实际上,轮询对于低 N 就足够了。
修改多个处理器的示例可能需要较少的部署和配置工作,您只需添加或使用处理器,而添加或删除服务器需要在指向它们的所有位置(例如配置文件)更改它们的点。
另一种方法是将漫长的过程分成几个步骤。 NServiceBus 有 sagas。这是一种通常为已知或有限数量的步骤实施的方法。对于未知数量的步骤,它仍然是可行的,尽管有些人可能认为这是对 sagas 看似预期目的的滥用。
我一直致力于让长 运行 消息与 Azure 传输上的 NServiceBus 一起工作。基于 this document,我认为我可以在单独的线程中触发长进程,将事件处理程序任务标记为完成,然后侦听自定义 OperationStarted 或 OperationComplete 事件。我注意到在大多数情况下我的处理程序没有收到 OperationComplete 事件。事实上,唯一一次收到它是在我发布 OperationStarted 事件后立即发布它的时候。两者之间的任何实际处理都会以某种方式阻止接收到完成事件。这是我的代码:
摘要 class 用于长 运行 消息
public abstract class LongRunningOperationHandler<TMessage> : IHandleMessages<TMessage> where TMessage : class
{
protected ILog _logger => LogManager.GetLogger<LongRunningOperationHandler<TMessage>>();
public Task Handle(TMessage message, IMessageHandlerContext context)
{
var opStarted = new OperationStarted
{
OperationID = Guid.NewGuid(),
OperationType = typeof(TMessage).FullName
};
var errors = new List<string>();
// Fire off the long running task in a separate thread
Task.Run(() =>
{
try
{
_logger.Info($"Operation Started: {JsonConvert.SerializeObject(opStarted)}");
context.Publish(opStarted);
ProcessMessage(message, context);
}
catch (Exception ex)
{
errors.Add(ex.Message);
}
finally
{
var opComplete = new OperationComplete
{
OperationType = typeof(TMessage).FullName,
OperationID = opStarted.OperationID,
Errors = errors
};
context.Publish(opComplete);
_logger.Info($"Operation Complete: {JsonConvert.SerializeObject(opComplete)}");
}
});
return Task.CompletedTask;
}
protected abstract void ProcessMessage(TMessage message, IMessageHandlerContext context);
}
测试实施
public class TestLongRunningOpHandler : LongRunningOperationHandler<TestCommand>
{
protected override void ProcessMessage(TestCommand message, IMessageHandlerContext context)
{
// If I remove this, or lessen it to something like 200 milliseconds, the
// OperationComplete event gets handled
Thread.Sleep(1000);
}
}
操作事件
public sealed class OperationComplete : IEvent
{
public Guid OperationID { get; set; }
public string OperationType { get; set; }
public bool Success => !Errors?.Any() ?? true;
public List<string> Errors { get; set; } = new List<string>();
public DateTimeOffset CompletedOn { get; set; } = DateTimeOffset.UtcNow;
}
public sealed class OperationStarted : IEvent
{
public Guid OperationID { get; set; }
public string OperationType { get; set; }
public DateTimeOffset StartedOn { get; set; } = DateTimeOffset.UtcNow;
}
处理程序
public class OperationHandler : IHandleMessages<OperationStarted>
, IHandleMessages<OperationComplete>
{
static ILog logger = LogManager.GetLogger<OperationHandler>();
public Task Handle(OperationStarted message, IMessageHandlerContext context)
{
return PrintJsonMessage(message);
}
public Task Handle(OperationComplete message, IMessageHandlerContext context)
{
// This is not hit if ProcessMessage takes too long
return PrintJsonMessage(message);
}
private Task PrintJsonMessage<T>(T message) where T : class
{
var msgObj = new
{
Message = typeof(T).Name,
Data = message
};
logger.Info(JsonConvert.SerializeObject(msgObj, Formatting.Indented));
return Task.CompletedTask;
}
}
我确定 context.Publish()
调用正在被触发,因为 _logger.Info()
调用正在将消息打印到我的测试控制台。我还验证了它们遇到了断点。在我的测试中,运行时间超过 500 毫秒的任何内容都会阻止处理 OperationComplete 事件。
如果有人可以就在 ProcessMessage 实现中经过任何大量时间后为什么 OperationComplete 事件没有触发处理程序提出建议,我将非常感激听到他们的意见。谢谢!
--更新-- 万一其他人遇到这个并对我最终做了什么感到好奇:
在an exchange with the developers of NServiceBus, I decided on using a watchdog saga that implemented the IHandleTimeouts interface to periodically check for job completion. I was using saga data, updated when the job was finished, to determine whether to fire off the OperationComplete
event in the timeout handler. This presented an other issue: when using In-Memory Persistence, the saga data was not persisted跨线程之后,即使它被每个线程锁定。为了解决这个问题,我创建了一个专门用于 long 运行 内存数据持久化的接口。此接口作为单例注入到 saga 中,因此用于 read/write 跨线程的 saga 数据以进行长 运行 操作。
我知道不建议使用内存中持久性,但对于我的需要,配置另一种类型的持久性(如 Azure 表)太过分了;我只是希望 OperationComplete
事件在正常情况下触发。如果在 运行 作业期间发生重启,我不需要保留 saga 数据。作业无论如何都会被缩短,如果作业运行时间超过设定的最长时间,传奇超时将处理触发 OperationComplete
事件并出现错误。
这是因为如果 ProcessMessage
足够快,您可能会在它失效之前获得当前 context
,例如被处置。
通过从 Handle
成功返回,您告诉 NServiceBus:"I'm done with this message",因此它也可以对 context
做它想做的事情,例如使它无效。在后台处理器中,您需要一个端点实例,而不是消息上下文。
到新任务开始的时候运行,你并不知道Handle
是否已经返回,所以你应该认为消息已经被消费了,因此是不可恢复的。如果您的单独任务发生错误,您将无法重试。
避免无持久性的长 运行 过程。您提到的示例有一个服务器,用于存储消息中的工作项,以及一个为工作项轮询此存储的进程。也许不理想,以防你横向扩展处理器,但它不会丢失消息。
为避免持续轮询,将服务器和处理器合并,启动时无条件轮询一次,并在Handle
安排轮询任务。注意此任务仅在没有其他轮询任务 运行 时才轮询,否则它可能变得比持续轮询更糟糕。您可以使用信号量来控制它。
要横向扩展,您必须有更多的服务器。您需要测量 N 个处理器轮询的成本是否大于以 round-robin 方式发送到 N 个服务器的成本,对于某些 N,要知道哪种方法实际上执行得更好。实际上,轮询对于低 N 就足够了。
修改多个处理器的示例可能需要较少的部署和配置工作,您只需添加或使用处理器,而添加或删除服务器需要在指向它们的所有位置(例如配置文件)更改它们的点。
另一种方法是将漫长的过程分成几个步骤。 NServiceBus 有 sagas。这是一种通常为已知或有限数量的步骤实施的方法。对于未知数量的步骤,它仍然是可行的,尽管有些人可能认为这是对 sagas 看似预期目的的滥用。