如果 Origin 不 'Produce Events',如何将 StreamSets 管道转换为完成状态?
How to transition a StreamSets pipeline to Finished state if the Origin does not 'Produce Events'?
我创建了一个 StreamSets 管道,其中起点是 'Kafka Consumer',终点是 'JDBC Producer'。为了 运行 这个管道,我创建了一个 StreamSets 作业。
在我点击 'Start Job' 到 运行 管道后,作业状态变为 'Active' 并无限期地保持在 'Active' 状态,即使从origin Kafka Topic 通过插入到目标数据库中进行消费和处理。
我正在尝试让 StreamSets 作业在处理完 Kafka 主题中的所有数据后进入 'InActive' 状态。
The StreamSets pipeline
对于我的其他管道(当没有更多数据时可以选择 'Produce Events'),我使用了 'Pipeline Finisher Executor'.
为了将此管道转换为完成状态,我已经一一尝试了以下选项但没有成功:
1. 将 'Kafka Consumer' 中的 'Batch Wait Time' 设置为较低的值。
2. 在管道的常规选项卡中将 'Runner Idle Time (sec)' 的值设置为 -1。
3. 在 StreamSets 作业的 'Job Status' 选项卡中设置 'Pipeline Force Stop' 超时值。
请告诉我如何使管道进入完成状态而不是连续流式传输。
这是一种处理方法。在你的管道中添加Jython(或Groovy或JavaScript)Evaluator并使用state
对象跟踪并检查它是第一批还是后续批次,以及是否有任何记录在批次中。如果不是第一个批次并且该批次中没有任何记录,则生成一个自定义事件并将其发送到 Pipeline Finisher。
注意:您将需要使用批量等待时间(毫秒)和最大批量大小(记录) 取决于您的用例和生成消息的速度,但只要您确定了这一点,这就会起作用。
初始化脚本:state['first_batch'] = "true"
脚本:
if (state['first_batch'] == "false" and len(records) == 0):
sdc.log.info("No more Kafka messages to consume. Stopping pipeline. See ya!")
sdc.toEvent(sdc.createEvent("no-more-messages", 0))
for record in sdc.records:
try:
sdc.output.write(record)
except Exception as e:
# Send record to error
sdc.error.write(record, str(e))
if (state['first_batch'] == "true" and len(records) > 0):
state['first_batch'] = "false"
干杯,
破折号
我创建了一个 StreamSets 管道,其中起点是 'Kafka Consumer',终点是 'JDBC Producer'。为了 运行 这个管道,我创建了一个 StreamSets 作业。
在我点击 'Start Job' 到 运行 管道后,作业状态变为 'Active' 并无限期地保持在 'Active' 状态,即使从origin Kafka Topic 通过插入到目标数据库中进行消费和处理。
我正在尝试让 StreamSets 作业在处理完 Kafka 主题中的所有数据后进入 'InActive' 状态。
The StreamSets pipeline
对于我的其他管道(当没有更多数据时可以选择 'Produce Events'),我使用了 'Pipeline Finisher Executor'.
为了将此管道转换为完成状态,我已经一一尝试了以下选项但没有成功: 1. 将 'Kafka Consumer' 中的 'Batch Wait Time' 设置为较低的值。 2. 在管道的常规选项卡中将 'Runner Idle Time (sec)' 的值设置为 -1。 3. 在 StreamSets 作业的 'Job Status' 选项卡中设置 'Pipeline Force Stop' 超时值。
请告诉我如何使管道进入完成状态而不是连续流式传输。
这是一种处理方法。在你的管道中添加Jython(或Groovy或JavaScript)Evaluator并使用state
对象跟踪并检查它是第一批还是后续批次,以及是否有任何记录在批次中。如果不是第一个批次并且该批次中没有任何记录,则生成一个自定义事件并将其发送到 Pipeline Finisher。
注意:您将需要使用批量等待时间(毫秒)和最大批量大小(记录) 取决于您的用例和生成消息的速度,但只要您确定了这一点,这就会起作用。
初始化脚本:state['first_batch'] = "true"
脚本:
if (state['first_batch'] == "false" and len(records) == 0):
sdc.log.info("No more Kafka messages to consume. Stopping pipeline. See ya!")
sdc.toEvent(sdc.createEvent("no-more-messages", 0))
for record in sdc.records:
try:
sdc.output.write(record)
except Exception as e:
# Send record to error
sdc.error.write(record, str(e))
if (state['first_batch'] == "true" and len(records) > 0):
state['first_batch'] = "false"
干杯, 破折号