Apache Storm:通过唯一 ID 跟踪从 Source Spout 到 Final Bolt 的元组
Apache Storm: Track tuples by unique ID from Source Spout to Final Bolt
我想要一种在整个 Storm 拓扑中唯一标识元组的方法,以便可以从 Spout 到最终的 Bolt 跟踪每个元组。
我的理解是在传递一个带有喷口发射的唯一消息 ID 时,例如:
String msgID = UUID.randomUUID();
// emits a line from user tasks with msg id
outputCollector.emit(new Values(task), msgID);
当确认到 Spout 时,这个 ID 会以某种方式返回(这可以更早地模拟以在任何时候取回传递的 ID 吗?)。但是在元组上使用 get message id 例如:
inputTuple.getMessageId()
这 returns 一条新消息 ID 不是元组生成的在 Spout 中传入的消息。参考https://groups.google.com/forum/#!topic/storm-user/xBEqMDa-RZs
问题
1) 有没有办法在收集器发出元组时获取 tuple.getMessageId() 。
2) 或者,可以从拓扑中任何 spout 或 bolt 的元组中以某种方式获取 spout 处传入的 messageId 吗?
结束解决方案
我希望能够在发出元组时为其设置一个 ID,然后能够在 Storm 拓扑中的任何点再次识别该元组。
或者我的系统将跟踪的唯一 messageId 必须作为 field/value 在每个 spout 和 bolt 的每个输出上传递。
谢谢
无法在生产者处访问系统生成的 ID(只能通过 tuple.getMessageId()
在消费者处访问。为了按照您的意愿跟踪元组,您需要(按照您自己的想法) 将 ID 作为常规字段值添加到元组中,并在每个螺栓中将其复制到相应的输出元组中。
这个答案的几个部分。首先,正如您正确指出的那样,您需要在喷口中为您发出的每个元组提供一个唯一的 ID。其次,如果您想在拓扑中的任何位置访问该 ID,则将该 ID 添加到 Spout 发出的复合元组中。第三(只是为了完整起见),如果在您发出的元组中有任何您在处理 Spout 中的 ack 或失败时需要知道的信息,那么将该信息添加为构成您的消息 ID 的复合值的一部分。
举个例子,当从喷口发出元组时,我通常也使用元组本身作为消息 ID:
outputCollector.emit(myTuple, myTuple);
这可能有点矫枉过正,但至少我可以随时随地访问元组中的所有信息。
我想要一种在整个 Storm 拓扑中唯一标识元组的方法,以便可以从 Spout 到最终的 Bolt 跟踪每个元组。
我的理解是在传递一个带有喷口发射的唯一消息 ID 时,例如:
String msgID = UUID.randomUUID();
// emits a line from user tasks with msg id
outputCollector.emit(new Values(task), msgID);
当确认到 Spout 时,这个 ID 会以某种方式返回(这可以更早地模拟以在任何时候取回传递的 ID 吗?)。但是在元组上使用 get message id 例如:
inputTuple.getMessageId()
这 returns 一条新消息 ID 不是元组生成的在 Spout 中传入的消息。参考https://groups.google.com/forum/#!topic/storm-user/xBEqMDa-RZs
问题
1) 有没有办法在收集器发出元组时获取 tuple.getMessageId() 。
2) 或者,可以从拓扑中任何 spout 或 bolt 的元组中以某种方式获取 spout 处传入的 messageId 吗?
结束解决方案 我希望能够在发出元组时为其设置一个 ID,然后能够在 Storm 拓扑中的任何点再次识别该元组。
或者我的系统将跟踪的唯一 messageId 必须作为 field/value 在每个 spout 和 bolt 的每个输出上传递。
谢谢
无法在生产者处访问系统生成的 ID(只能通过 tuple.getMessageId()
在消费者处访问。为了按照您的意愿跟踪元组,您需要(按照您自己的想法) 将 ID 作为常规字段值添加到元组中,并在每个螺栓中将其复制到相应的输出元组中。
这个答案的几个部分。首先,正如您正确指出的那样,您需要在喷口中为您发出的每个元组提供一个唯一的 ID。其次,如果您想在拓扑中的任何位置访问该 ID,则将该 ID 添加到 Spout 发出的复合元组中。第三(只是为了完整起见),如果在您发出的元组中有任何您在处理 Spout 中的 ack 或失败时需要知道的信息,那么将该信息添加为构成您的消息 ID 的复合值的一部分。
举个例子,当从喷口发出元组时,我通常也使用元组本身作为消息 ID:
outputCollector.emit(myTuple, myTuple);
这可能有点矫枉过正,但至少我可以随时随地访问元组中的所有信息。