在 Storm 中按顺序处理记录
Processing records in order in Storm
我是 Storm 的新手,我在弄清楚如何按顺序处理记录时遇到问题。
我有一个数据集,其中包含具有以下字段的记录:
user_id、location_id、time_of_checking
现在,我想识别已完成我指定路径的用户(例如,从位置 A 到位置 B 再到位置 C 的用户)。
我正在使用 Kafka 生产者并从文件中读取这些记录来模拟实时数据。数据按日期排序。
所以,为了检查我的模式是否满足,我需要按顺序处理记录。问题是,由于并行化(螺栓复制),我没有按顺序获得用户签入。因为那个模式不起作用。
如何克服这个问题?如何按顺序处理记录?
Storm 中没有对有序处理的通用系统支持。您要么使用支持有序流处理的不同系统,例如 Apache Flink(免责声明,我是 Flink 的提交者),要么您需要自己在 bolt 代码中处理它。
Storm 提供的唯一支持是使用 Trident。您可以将特定时间段(例如一分钟)的元组放入一个批次中。因此,您可以在一分钟内一次处理所有元组。但是,这仅在您的用例允许时才有效,因为您无法将不同批次的元组相互关联。在您的情况下,只有当您知道存在所有用户都已到达目的地(并且没有其他用户开始新的交互)的时间点时才会出现这种情况;即,您需要任何两个用户都没有重叠的时间点。 (在我看来,您的用例无法满足此要求)。
对于非系统,即基于自定义用户代码的解决方案,有两种方法:
例如,您可以缓冲元组并在处理前按螺栓内的时间戳排序。为了使其正常工作,您需要注入 punctuations/watermarks 以确保没有比标点符号更大的时间戳的元组出现在标点符号之后。如果您从每个并行输入子流中收到标点符号,您可以安全地触发排序和处理。
另一种方法是在区域缓冲区中缓冲每个传入子流的元组(保留子流顺序)并按顺序合并缓冲区中的元组。这具有避免排序的优点。但是,您需要确保每个运算符发出有序的元组。此外,为了避免阻塞(即,如果没有可用于子流的输入),可能还需要标点符号。 (我实现了这种方法。随意使用代码或根据您的需要调整它:https://github.com/mjsax/aeolus/blob/master/queries/utils/src/main/java/de/hub/cs/dbis/aeolus/utils/TimestampMerger.java)
Storm 支持此用例。为此,您只需确保在所有相关组件的整个流程中保持秩序。因此,作为第一步,在 Kafka 生产者中,特定用户 ID 的所有消息都应该发送到 Kafka 中的同一个分区。为此,您可以在 KafkaProducer 中实现自定义分区程序。详情请参考link here。
由于 Kafka 中的一个分区只能被 Storm 中的一个 kafkaSpout 实例读取,因此该分区中的消息按顺序进入 spout 实例。从而保证同一个用户id的所有消息到达同一个spout。
现在是棘手的部分——为了维护 bolt 中的顺序,您要确保基于从 Kafka spout 发出的“user_id”字段在 bolt 上使用字段分组。提供的 kafkaSpout 不会破坏消息以发出字段,您必须覆盖 kafkaSpout 才能读取消息并从 spout 发出“user_id”字段。这样做的一种方法是使用一个中间螺栓,它从 Kafkaspout 读取消息并发出带有“user_id”字段的流。
当你最终指定一个带有“user_id”字段分组的 bolt 时,所有具有特定 user_id 值的消息都将发送到 bolt 的同一个实例,无论并行度如何螺栓。
适用于您的情况的示例拓扑如下 -
builder.setSpout("KafkaSpout", Kafkaspout);
builder.setBolt("FieldsEmitterBolt", FieldsEmitterBolt).shuffleGrouping("KafkaSpout");
builder.setBolt("CalculatorBolt", CalculatorBolt).fieldsGrouping("FieldsEmitterBolt", new Fields("user_id")); //user_id 由 Bolt2
发出的场
--请注意,如果您的 user_id 数量有限,则可能会出现所有 user_id 值都进入同一个 CalculatorBolt 实例的情况。这反过来会降低有效 'parallelism'!
我是 Storm 的新手,我在弄清楚如何按顺序处理记录时遇到问题。
我有一个数据集,其中包含具有以下字段的记录:
user_id、location_id、time_of_checking
现在,我想识别已完成我指定路径的用户(例如,从位置 A 到位置 B 再到位置 C 的用户)。
我正在使用 Kafka 生产者并从文件中读取这些记录来模拟实时数据。数据按日期排序。
所以,为了检查我的模式是否满足,我需要按顺序处理记录。问题是,由于并行化(螺栓复制),我没有按顺序获得用户签入。因为那个模式不起作用。
如何克服这个问题?如何按顺序处理记录?
Storm 中没有对有序处理的通用系统支持。您要么使用支持有序流处理的不同系统,例如 Apache Flink(免责声明,我是 Flink 的提交者),要么您需要自己在 bolt 代码中处理它。
Storm 提供的唯一支持是使用 Trident。您可以将特定时间段(例如一分钟)的元组放入一个批次中。因此,您可以在一分钟内一次处理所有元组。但是,这仅在您的用例允许时才有效,因为您无法将不同批次的元组相互关联。在您的情况下,只有当您知道存在所有用户都已到达目的地(并且没有其他用户开始新的交互)的时间点时才会出现这种情况;即,您需要任何两个用户都没有重叠的时间点。 (在我看来,您的用例无法满足此要求)。
对于非系统,即基于自定义用户代码的解决方案,有两种方法:
例如,您可以缓冲元组并在处理前按螺栓内的时间戳排序。为了使其正常工作,您需要注入 punctuations/watermarks 以确保没有比标点符号更大的时间戳的元组出现在标点符号之后。如果您从每个并行输入子流中收到标点符号,您可以安全地触发排序和处理。
另一种方法是在区域缓冲区中缓冲每个传入子流的元组(保留子流顺序)并按顺序合并缓冲区中的元组。这具有避免排序的优点。但是,您需要确保每个运算符发出有序的元组。此外,为了避免阻塞(即,如果没有可用于子流的输入),可能还需要标点符号。 (我实现了这种方法。随意使用代码或根据您的需要调整它:https://github.com/mjsax/aeolus/blob/master/queries/utils/src/main/java/de/hub/cs/dbis/aeolus/utils/TimestampMerger.java)
Storm 支持此用例。为此,您只需确保在所有相关组件的整个流程中保持秩序。因此,作为第一步,在 Kafka 生产者中,特定用户 ID 的所有消息都应该发送到 Kafka 中的同一个分区。为此,您可以在 KafkaProducer 中实现自定义分区程序。详情请参考link here。
由于 Kafka 中的一个分区只能被 Storm 中的一个 kafkaSpout 实例读取,因此该分区中的消息按顺序进入 spout 实例。从而保证同一个用户id的所有消息到达同一个spout。
现在是棘手的部分——为了维护 bolt 中的顺序,您要确保基于从 Kafka spout 发出的“user_id”字段在 bolt 上使用字段分组。提供的 kafkaSpout 不会破坏消息以发出字段,您必须覆盖 kafkaSpout 才能读取消息并从 spout 发出“user_id”字段。这样做的一种方法是使用一个中间螺栓,它从 Kafkaspout 读取消息并发出带有“user_id”字段的流。
当你最终指定一个带有“user_id”字段分组的 bolt 时,所有具有特定 user_id 值的消息都将发送到 bolt 的同一个实例,无论并行度如何螺栓。
适用于您的情况的示例拓扑如下 -
builder.setSpout("KafkaSpout", Kafkaspout);
builder.setBolt("FieldsEmitterBolt", FieldsEmitterBolt).shuffleGrouping("KafkaSpout");
builder.setBolt("CalculatorBolt", CalculatorBolt).fieldsGrouping("FieldsEmitterBolt", new Fields("user_id")); //user_id 由 Bolt2
发出的场--请注意,如果您的 user_id 数量有限,则可能会出现所有 user_id 值都进入同一个 CalculatorBolt 实例的情况。这反过来会降低有效 'parallelism'!