Hangfire ContinueWith 多个来源
Hangfire ContinueWith with multiple sources
我正在研究使用 ContinueWith 来完成以下任务的方法:
- 原始应用程序提交(非并发)作业
- 作业启动,进行一些处理,在别处调用 Web 服务
- 作业"completes"(进入静止状态)
- Web 服务在完成内部处理后提交 "continuation" 作业(可能是 30 分钟,也可能是 2 天)
- A Jobs.ContinueWith 以某种方式被调用?
- 作业终于完成,原来的作业被标记为完成
我 运行 喜欢的是太多的活动部件。原始应用程序是 C#/MVC 应用程序。用户做他的事,最后提交一个长 运行 的作业来执行。作业处理器(C# 库)会做一些工作,然后调用 JAVA SOAP 端点来传递初始处理的结果。 JAVA SOAP 端点调用 COTS 应用程序进行处理,然后使用 "I'm done".
回调作业
如您所见,我没有明确的方法来执行以下操作:
var parentId = _jobs.Enqueue<MyJob>(x => x.StartExecution(job.Id));
_jobs.ContinueWith<JAVA_ENDPOINT>(parentId, x => x.JAVA_EXECUTION(job.Id)); // this part is not in my control!
_jobs.ContinueWith<MyJob>(parentId, x => x.ContinueExecution(job.Id));
我确实有一个 REST 服务 (POST),我正在使用它作为开始工作的唯一途径。基本上,传递一个格式良好的有效载荷(JSON),控制器从 IoC 容器中选择作业对象,决定它是哪种类型的作业(临时、重复、连续等),然后执行正确的 Hangfire 调用( s) 将其入队。 JAVA 端点也可以轻松调用此 REST 服务。
[HttpPost]
public string Post()
{
// safety checks removed for brevity...
var command = new MinimumCommandModel(Request.Content.ReadAsStringAsync().Result);
return GetPostPipeline().Handle(command).Id;
}
private static IRequestHandler<MinimumCommandModel, MinimumResultModel> GetPostPipeline()
{
return new MediatorPipeline<MinimumCommandModel, MinimumResultModel>
(new QueuePostMediator()
, new IPreRequestHandler<MinimumCommandModel>[]
{
new PreJobLogger(),
new PreJobExistsValidator(),
new PreJobPropertiesValidator()
}
, new IPostRequestHandler<MinimumCommandModel, MinimumResultModel>[]
{
new PostJobLogger()
}
);
}
QueuePostMediator 处理作业类型的细节(AdHoc 等)。我现在正在尝试编写延续处理程序,但对如何进行此操作感到有点受阻。我当然不想在 Hangfire 之外进行任何类型的阻止操作。我不确定如何 "start" 另一个工作作为原始工作的延续,当它们最初没有与原始工作的 parentId 连接时。
基本上,如果我可以从工作内部暂停工作,直到外部刺激告诉 hangfire 继续工作,我将是黄金。我还没有破解如何完成这个。
想法?想法?
好的。我已经想出一个 hack 来让它工作。
我正在使用一种策略模式 运行 每项工作的不同部分。我有一个名为 Handoff 的 JobStatus,现在执行此操作:
public class Processing : BaseJobExecutor<PayloadModel>, IJobExecutor<PayloadModel>
{
public Processing(JobPingPong job) : base(job, JobStatus.Processing) {}
public void Handle()
{
JobInfo.JobStatus = JobStatus.ExtProcessing;
JobInfo.HangfireParentJobId = JobInfo.HangfireJobId;
Payload.PostToQueueText(@"http://localhost:8080/api/clone");
// Pause the current job (this is the parent job) so the outside web service has a chance to complete...
var enqueuedIn = new TimeSpan(0, 6, 0, 0); // 6 hours out...
JobPutOnHold(JobInfo.HangfireJobId, enqueuedIn);
// The next status to be executed upon hydration...
JobInfo.JobStatus = JobStatus.Complete;
Job.CachePut();
// Signal the job executor that this job is "done" due to an outside process needing to run...
JobInfo.JobStatus = JobStatus.Handoff;
}
}
public void JobPutOnHold(string jobId, TimeSpan enqueuedIn)
{
var jobClient = new BackgroundJobClient();
jobClient.ChangeState(jobId, new ScheduledState(enqueuedIn));
}
现在,在策略执行器中我可以这样做:
public string Execute(IServerFilter jobContext, IJobCancellationToken cancellationToken)
{
while (Payload.JobInfo.JobStatus != JobStatus.Done)
{
cancellationToken?.ThrowIfCancellationRequested();
var jobStrategy = new JobExecutorStrategy<TPayload>(Executors);
Payload = jobStrategy.Execute(Payload);
if (Payload.JobInfo.JobStatus == JobStatus.Handoff)
break;
}
return PayloadAsString;
}
作业的第二部分与第一部分相同,但来自外部服务,状态为 ExtComplete,这允许作业根据来自的结果执行 post 处理外面的世界(存储在数据库中)。像这样:
public class ExtComplete : BaseJobExecutor<PayloadModel>, IJobExecutor<PayloadModel>
{
public ExtComplete(JobPingPong job) : base(job, JobStatus.ExtComplete) { }
public void Handle()
{
// do post processing here...
Payload.Tokens = null;
JobInfo.JobStatus = JobStatus.Complete;
if (JobInfo.HangfireJobId != JobContext.JobId || JobInfo.HangfireParentJobId == JobInfo.HangfireJobId)
{
JobInfo.HangfireParentJobId = JobInfo.HangfireJobId;
JobInfo.HangfireJobId = JobContext.JobId;
}
// Enqueue the previous (parent) job so it can complete...
JobExecuteNow(JobInfo.HangfireParentJobId);
}
}
public void JobExecuteNow(string jobId)
{
var enqueuedIn = new TimeSpan(0, 0, 0, 15);
var jobClient = new BackgroundJobClient();
jobClient.ChangeState(jobId, new ScheduledState(enqueuedIn));
}
最终,时间将由配置驱动,但现在我将其设置为让第一个作业在 15 秒内开始执行。
我面对这种方法的唯一挑战是进来的作业有效载荷是在任何处理发生之前的原始有效载荷。这就是为什么您在上方看到 "caching" 的原因。当作业重新启动时,我检查是否存在该 Hangfire JobId 的缓存,如果存在,则从缓存中加载最后一个已知的有效负载,然后让执行程序继续其快乐的方式。
到目前为止效果很好。
注意:我仍在尝试学习如何 alter/inject Hangfire 中的命令链和状态对象,以使其更适合 hangfire。我们有一份工作可以拨打十几个或更多的外线电话。目前,运行.
大约需要 12 个小时
我正在研究使用 ContinueWith 来完成以下任务的方法:
- 原始应用程序提交(非并发)作业
- 作业启动,进行一些处理,在别处调用 Web 服务
- 作业"completes"(进入静止状态)
- Web 服务在完成内部处理后提交 "continuation" 作业(可能是 30 分钟,也可能是 2 天)
- A Jobs.ContinueWith 以某种方式被调用?
- 作业终于完成,原来的作业被标记为完成
我 运行 喜欢的是太多的活动部件。原始应用程序是 C#/MVC 应用程序。用户做他的事,最后提交一个长 运行 的作业来执行。作业处理器(C# 库)会做一些工作,然后调用 JAVA SOAP 端点来传递初始处理的结果。 JAVA SOAP 端点调用 COTS 应用程序进行处理,然后使用 "I'm done".
回调作业如您所见,我没有明确的方法来执行以下操作:
var parentId = _jobs.Enqueue<MyJob>(x => x.StartExecution(job.Id));
_jobs.ContinueWith<JAVA_ENDPOINT>(parentId, x => x.JAVA_EXECUTION(job.Id)); // this part is not in my control!
_jobs.ContinueWith<MyJob>(parentId, x => x.ContinueExecution(job.Id));
我确实有一个 REST 服务 (POST),我正在使用它作为开始工作的唯一途径。基本上,传递一个格式良好的有效载荷(JSON),控制器从 IoC 容器中选择作业对象,决定它是哪种类型的作业(临时、重复、连续等),然后执行正确的 Hangfire 调用( s) 将其入队。 JAVA 端点也可以轻松调用此 REST 服务。
[HttpPost]
public string Post()
{
// safety checks removed for brevity...
var command = new MinimumCommandModel(Request.Content.ReadAsStringAsync().Result);
return GetPostPipeline().Handle(command).Id;
}
private static IRequestHandler<MinimumCommandModel, MinimumResultModel> GetPostPipeline()
{
return new MediatorPipeline<MinimumCommandModel, MinimumResultModel>
(new QueuePostMediator()
, new IPreRequestHandler<MinimumCommandModel>[]
{
new PreJobLogger(),
new PreJobExistsValidator(),
new PreJobPropertiesValidator()
}
, new IPostRequestHandler<MinimumCommandModel, MinimumResultModel>[]
{
new PostJobLogger()
}
);
}
QueuePostMediator 处理作业类型的细节(AdHoc 等)。我现在正在尝试编写延续处理程序,但对如何进行此操作感到有点受阻。我当然不想在 Hangfire 之外进行任何类型的阻止操作。我不确定如何 "start" 另一个工作作为原始工作的延续,当它们最初没有与原始工作的 parentId 连接时。
基本上,如果我可以从工作内部暂停工作,直到外部刺激告诉 hangfire 继续工作,我将是黄金。我还没有破解如何完成这个。
想法?想法?
好的。我已经想出一个 hack 来让它工作。
我正在使用一种策略模式 运行 每项工作的不同部分。我有一个名为 Handoff 的 JobStatus,现在执行此操作:
public class Processing : BaseJobExecutor<PayloadModel>, IJobExecutor<PayloadModel>
{
public Processing(JobPingPong job) : base(job, JobStatus.Processing) {}
public void Handle()
{
JobInfo.JobStatus = JobStatus.ExtProcessing;
JobInfo.HangfireParentJobId = JobInfo.HangfireJobId;
Payload.PostToQueueText(@"http://localhost:8080/api/clone");
// Pause the current job (this is the parent job) so the outside web service has a chance to complete...
var enqueuedIn = new TimeSpan(0, 6, 0, 0); // 6 hours out...
JobPutOnHold(JobInfo.HangfireJobId, enqueuedIn);
// The next status to be executed upon hydration...
JobInfo.JobStatus = JobStatus.Complete;
Job.CachePut();
// Signal the job executor that this job is "done" due to an outside process needing to run...
JobInfo.JobStatus = JobStatus.Handoff;
}
}
public void JobPutOnHold(string jobId, TimeSpan enqueuedIn)
{
var jobClient = new BackgroundJobClient();
jobClient.ChangeState(jobId, new ScheduledState(enqueuedIn));
}
现在,在策略执行器中我可以这样做:
public string Execute(IServerFilter jobContext, IJobCancellationToken cancellationToken)
{
while (Payload.JobInfo.JobStatus != JobStatus.Done)
{
cancellationToken?.ThrowIfCancellationRequested();
var jobStrategy = new JobExecutorStrategy<TPayload>(Executors);
Payload = jobStrategy.Execute(Payload);
if (Payload.JobInfo.JobStatus == JobStatus.Handoff)
break;
}
return PayloadAsString;
}
作业的第二部分与第一部分相同,但来自外部服务,状态为 ExtComplete,这允许作业根据来自的结果执行 post 处理外面的世界(存储在数据库中)。像这样:
public class ExtComplete : BaseJobExecutor<PayloadModel>, IJobExecutor<PayloadModel>
{
public ExtComplete(JobPingPong job) : base(job, JobStatus.ExtComplete) { }
public void Handle()
{
// do post processing here...
Payload.Tokens = null;
JobInfo.JobStatus = JobStatus.Complete;
if (JobInfo.HangfireJobId != JobContext.JobId || JobInfo.HangfireParentJobId == JobInfo.HangfireJobId)
{
JobInfo.HangfireParentJobId = JobInfo.HangfireJobId;
JobInfo.HangfireJobId = JobContext.JobId;
}
// Enqueue the previous (parent) job so it can complete...
JobExecuteNow(JobInfo.HangfireParentJobId);
}
}
public void JobExecuteNow(string jobId)
{
var enqueuedIn = new TimeSpan(0, 0, 0, 15);
var jobClient = new BackgroundJobClient();
jobClient.ChangeState(jobId, new ScheduledState(enqueuedIn));
}
最终,时间将由配置驱动,但现在我将其设置为让第一个作业在 15 秒内开始执行。
我面对这种方法的唯一挑战是进来的作业有效载荷是在任何处理发生之前的原始有效载荷。这就是为什么您在上方看到 "caching" 的原因。当作业重新启动时,我检查是否存在该 Hangfire JobId 的缓存,如果存在,则从缓存中加载最后一个已知的有效负载,然后让执行程序继续其快乐的方式。
到目前为止效果很好。
注意:我仍在尝试学习如何 alter/inject Hangfire 中的命令链和状态对象,以使其更适合 hangfire。我们有一份工作可以拨打十几个或更多的外线电话。目前,运行.
大约需要 12 个小时