将 Hangfire 作业记录到 Application Insights 并将 activity 关联到操作 ID

Logging Hangfire jobs to Application Insights and correlating activity to an Operation Id

我觉得这应该比结果要简单得多,或者我想太多了。

我有一个 .NET Core 3.1 Web API 应用程序,它使用 HangFire 在后台处理一些作业。我还配置了 Application Insights 以从 .NET Core API.

记录遥测

我可以看到 Application Insights 中记录的日志记录事件和依赖项遥测数据。但是,每个 event/log/dependency 都根据唯一的 OperationId 和 Parent Id 进行记录。

我正在尝试确定如何确保根据 OperationId and/or 的父 ID 记录任何已记录的 activity 或在后台作业上下文中使用的任何依赖项排队后台作业的原始请求。

当我对作业进行排队时,我可以获得传入 HTTP 请求的当前 OperationId,并将其与作业一起推送到 HangFire 队列中。然后执行作业时,我可以取回该 OperationId。然后我需要做的是使该 OperationID 在作业执行的整个 context/lifetime 期间可用,以便它附加到发送到 Application Insightd 的任何 Telemetry。

我想我可以创建一个 IJobContext 接口,它可以注入到执行作业的 class 中。在这种情况下,我可以推送 OperationID。然后我可以创建一个 ITelemetryInitializer,它也将 IJobContext 作为依赖项。然后,在 ITelemetryInitializer 中,我可以设置发送到 Application Insights 的遥测数据的 OperationID 和 ParentId。这是一些简单的代码:

public class HangFirePanelMessageQueue : IMessageQueue
{
    private readonly MessageProcessor _messageProcessor;
    private readonly IHangFireJobContext _jobContext;
    private readonly TelemetryClient _telemetryClient;

    public HangFirePanelMessageQueue(MessageProcessor panelMessageProcessor,
        IIoTMessageSerializer iotHubMessageSerialiser,
        IHangFireJobContext jobContext, TelemetryClient telemetryClient)
    {
        _messageProcessor = panelMessageProcessor;
        _jobContext = jobContext;
        _telemetryClient = telemetryClient;
    }

    public async Task ProcessQueuedMessage(string message, string operationId)
    {
        var iotMessage = _iotHubMessageSerialiser.GetMessage(message);

        _jobContext?.Set(iotMessage.CorrelationID, iotMessage.MessageID);

        await _messageProcessor.ProcessMessage(iotMessage);
    }

    public Task QueueMessageForProcessing(string message)
    {
        var dummyTrace = new TraceTelemetry("Queuing message for processing", SeverityLevel.Information);
        _telemetryClient.TrackTrace(dummyTrace);
        string opId = dummyTrace.Context.Operation.Id;

        BackgroundJob.Enqueue(() =>
        ProcessQueuedMessage(message, opId));

        return Task.CompletedTask;
    }
}

IJobContext 看起来像这样:

public interface IHangFireJobContext
{
    bool Initialised { get; }

    string OperationId { get; }

    string JobId { get; }

    void Set(string operationId, string jobId);
}

然后我会有一个 ITelemetryInitializer,它丰富了任何 ITelemetry:

public class EnrichBackgroundJobTelemetry : ITelemetryInitializer
{
    private readonly IHangFireJobContext jobContext;

    public EnrichBackgroundJobTelemetry(IHangFireJobContext jobContext)
    {
        this.jobContext = jobContext;
    }

    public void Initialize(ITelemetry telemetry)
    {
        if (!jobContext.Initialised)
        {
            return;
        }

        telemetry.Context.Operation.Id = jobContext.OperationId;
    }
}

然而,我遇到的问题是 ITelemetryInitializer 是一个单例,因此它将使用 IHangFireJobContext 实例化一次,然后将永远不会为任何后续的 HangFire 作业更新。

我确实找到了 https://github.com/skwasjer/Hangfire.Correlate project, which extends https://github.com/skwasjer/Correlate。关联创建关联上下文,可以通过类似于 IHttpContextAccessor 的 ICorrelationContextAccessor 访问它。

但是,Correlate 的脚注指出“请考虑 .NET Core 3 现在内置了对 W3C TraceContext 的支持(博客),并且还有其他分布式跟踪库的功能比 Correlate 多。”其中将 Application Insights 列为更高级分布式跟踪的替代方案之一。

因此,任何人都可以帮助我了解在 HangFire 作业的上下文中创建的任何 Telemetry 到 Application Insights 时如何丰富它吗?我觉得正确的答案是使用 ITelemetryInitializer 并在该 ITelemetry 项目上填充 OperationId,但是,我不确定要将什么依赖项注入 ITelemetryInitialzer 才能访问 HangFire 作业上下文。

When I queue a job, I can get the current OperationId of the incoming HTTP request, and I push that into the HangFire queue with the job.

所以,我说你有一个将工作推到 hangfire 的控制器动作是正确的吗?如果是这样,您可以做的是在控制器方法内部获取操作 ID 并将其传递给作业。使用该操作 ID 开始使用操作 ID 的新操作。该操作连同该操作期间生成的所有遥测数据将链接到原始请求。

我没有 hangfire 集成,但下面的代码显示了总体思路:一些工作在后台排队等待完成,应该链接到有关遥测的请求:

        [HttpGet("/api/demo5")]
        public ActionResult TrackWorker()
        {
            var requestTelemetry = HttpContext.Features.Get<RequestTelemetry>();

            _taskQueue.QueueBackgroundWorkItem(async ct =>
            {
                using(var op = _telemetryClient.StartOperation<DependencyTelemetry>("QueuedWork", requestTelemetry.Context.Operation.Id))
                {
                    _ = await new HttpClient().GetStringAsync("http://blank.org");

                    await Task.Delay(250);
                    op.Telemetry.ResultCode = "200";
                    op.Telemetry.Success = true;
                }
            });

            return Accepted();
        }

可以找到完整的示例 here

根据 Peter Bons 的示例,我是这样做的:

最初由控制器操作触发的代码:

    // Get the current ApplicationInsights Id. Could use .RootId if 
    // you only want the OperationId, but I want the ParentId too
    var activityId = System.Diagnostics.Activity.Current?.Id;

    _backgroundJobClient.Enqueue<JobDefinition>(x => 
        x.MyMethod(queueName, otherMethodParams, activityId));

在我的 JobDefinition class:

    // I use different queues, but you don't need to. 
    // otherMethodParams is just an example. Have as many as you need, like normal.
    [AutomaticRetry(OnAttemptsExceeded = AttemptsExceededAction.Delete, Attempts = 10)]
    [QueueNameFromFirstParameter]
    public async Task MyMethod(string queueName, string otherMethodParams,
                                 string activityId)
    {

        var (operationId, parentId) = SplitCorrelationIdIntoOperationIdAndParentId(
                                         activityId);

        // Starting this new operation will initialise 
        // System.Diagnostics.Activity.Current.
        using (var operation = _telemetryClient.StartOperation<DependencyTelemetry>(
                                   "JobDefinition.MyMethod", operationId, parentId))
        {
            try
            {
                operation.Telemetry.Data = $"something useful here";

                // If you have other state you'd like in App Insights logs, 
                // call AddBaggage and they show up as a customDimension, 
                // e.g. in any trace logs. 
                System.Diagnostics.Activity.Current.AddBaggage("QueueName", queueName);
                
                // ... do the real background work here...

                operation.Telemetry.Success = true;
            }
            catch (Exception)
            {
                operation.Telemetry.Success = false;

                throw;
            }
        }
    }

    // Splits full value from System.Diagnostics.Current.Activity.Id 
    // like "00-12994526f1cb134bbddd0f256e8bc3f0-872b3bd78c345a46-00"
    // into values ( "12994526f1cb134bbddd0f256e8bc3f0", "872b3bd78c345a46" ) 
    private static (string, string) SplitCorrelationIdIntoOperationIdAndParentId(string activityId)
    {
        if (string.IsNullOrEmpty(activityId))
            return (null, null);

        var splits = activityId.Split('-');

        // This is what should happen
        if (splits.Length >= 3)
            return (splits[1], splits[2]);

        // Must be in a weird format. Try to return something useful. 
        if (splits.Length == 2)
            return (splits[0], splits[1]);

        return (activityId, null);
    }

我不确定在这里使用 OperationId 和 ParentId 是否正确,例如它确实将后台作业与原始请求的 OperationId 相关联,但是如果原始请求有一个 ParentId,那么这个后台作业实际上应该将其 ParentId 设置为请求,而不是请求的 ParentId。有人知道吗?