如果 Origin 不 'Produce Events',如何将 StreamSets 管道转换为完成状态?

How to transition a StreamSets pipeline to Finished state if the Origin does not 'Produce Events'?

我创建了一个 StreamSets 管道,其中起点是 'Kafka Consumer',终点是 'JDBC Producer'。为了 运行 这个管道,我创建了一个 StreamSets 作业。

在我点击 'Start Job' 到 运行 管道后,作业状态变为 'Active' 并无限期地保持在 'Active' 状态,即使从origin Kafka Topic 通过插入到目标数据库中进行消费和处理。

我正在尝试让 StreamSets 作业在处理完 Kafka 主题中的所有数据后进入 'InActive' 状态。

The StreamSets pipeline

对于我的其他管道(当没有更多数据时可以选择 'Produce Events'),我使用了 'Pipeline Finisher Executor'.

为了将此管道转换为完成状态,我已经一一尝试了以下选项但没有成功: 1. 将 'Kafka Consumer' 中的 'Batch Wait Time' 设置为较低的值。 2. 在管道的常规选项卡中将 'Runner Idle Time (sec)' 的值设置为 -1。 3. 在 StreamSets 作业的 'Job Status' 选项卡中设置 'Pipeline Force Stop' 超时值。

请告诉我如何使管道进入完成状态而不是连续流式传输。

这是一种处理方法。在你的管道中添加Jython(或Groovy或JavaScript)Evaluator并使用state对象跟踪并检查它是第一批还是后续批次,以及是否有任何记录在批次中。如果不是第一个批次并且该批次中没有任何记录,则生成一个自定义事件并将其发送到 Pipeline Finisher。

注意:您将需要使用批量等待时间(毫秒)最大批量大小(记录) 取决于您的用例和生成消息的速度,但只要您确定了这一点,这就会起作用。

初始化脚本state['first_batch'] = "true"

脚本:

if (state['first_batch'] == "false" and len(records) == 0):
  sdc.log.info("No more Kafka messages to consume. Stopping pipeline. See ya!")
  sdc.toEvent(sdc.createEvent("no-more-messages", 0))

for record in sdc.records:
  try:
    sdc.output.write(record)
  except Exception as e:
    # Send record to error
    sdc.error.write(record, str(e))

if (state['first_batch'] == "true" and len(records) > 0):
  state['first_batch'] = "false"

干杯, 破折号