KSQL - 使用 GEO_DISTANCE 计算与 2 条消息的距离
KSQL - calculate distance from 2 messages using GEO_DISTANCE
我有一个 kafka 主题,主题中的每条消息都有 lat/lon 和事件时间戳。创建了一个引用主题的流,并想使用 geo_distance 计算 2 点之间的距离。
示例
GpsDateTime lat lon
2016-11-30 22:38:36, 32.685757, -96.735942
2016-11-30 22:39:07, 32.687347, -96.732841
2016-11-30 22:39:37, 32.68805, -96.729726
我想在上面的流上创建一个新的流,并用距离丰富它。
GpsDateTime lat lon Distance
2016-11-30 22:38:36, 32.685757, -96.735942 0
2016-11-30 22:39:07, 32.687347, -96.732841 0.340
2016-11-30 22:39:37, 32.68805, -96.729726 0.302
是否可以使用 KSQL 达到预期的结果?或者如何在处理新消息时参考之前的消息?
首先,这些读数是否来自某种设备?如果是这样,您是否有他们的唯一 ID (UUID)?我会把它放到你的信息流中,所以它喜欢 UUID, GpsDateTime, lat, lon
。
您将需要创建一个相当基本的 Kafka Streams 应用程序。在此应用程序中,您会将流中的最新阅读存储到 StoreBuilder 中。然后,当从 Kafka 收到一条新消息时,您将检索这个最新值,进行计算,然后将新的经纬度值存储到 StoreBuilder 中。
当然,我不清楚您是否只想 ever 有一个经纬度值,并且所有后续值都是根据第一个读数计算得出的。或者,如果您想进行滚动计算,您总是比较上次读数和当前读数之间的距离。
无论如何,您可以在实践中看到这段代码:https://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java
此示例是字数统计示例,但可以针对您的用例快速转换。
静态最终 class WordCountTransformerSupplier(第 78 行)将成为您的 LatLongDistanceComputation。
您将创建具有适当类型的 StoreBuilder(第 154 行)(无论您将 lat/lon 存储为什么)。
第 165 行是实际从流入的值流中读取项目的地方。
当然,您还需要编辑 inputTopic 和 outputTopic(第 66-67 行)以及其他一些内容。
我有一个 kafka 主题,主题中的每条消息都有 lat/lon 和事件时间戳。创建了一个引用主题的流,并想使用 geo_distance 计算 2 点之间的距离。 示例
GpsDateTime lat lon
2016-11-30 22:38:36, 32.685757, -96.735942
2016-11-30 22:39:07, 32.687347, -96.732841
2016-11-30 22:39:37, 32.68805, -96.729726
我想在上面的流上创建一个新的流,并用距离丰富它。
GpsDateTime lat lon Distance
2016-11-30 22:38:36, 32.685757, -96.735942 0
2016-11-30 22:39:07, 32.687347, -96.732841 0.340
2016-11-30 22:39:37, 32.68805, -96.729726 0.302
是否可以使用 KSQL 达到预期的结果?或者如何在处理新消息时参考之前的消息?
首先,这些读数是否来自某种设备?如果是这样,您是否有他们的唯一 ID (UUID)?我会把它放到你的信息流中,所以它喜欢 UUID, GpsDateTime, lat, lon
。
您将需要创建一个相当基本的 Kafka Streams 应用程序。在此应用程序中,您会将流中的最新阅读存储到 StoreBuilder 中。然后,当从 Kafka 收到一条新消息时,您将检索这个最新值,进行计算,然后将新的经纬度值存储到 StoreBuilder 中。
当然,我不清楚您是否只想 ever 有一个经纬度值,并且所有后续值都是根据第一个读数计算得出的。或者,如果您想进行滚动计算,您总是比较上次读数和当前读数之间的距离。
无论如何,您可以在实践中看到这段代码:https://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java
此示例是字数统计示例,但可以针对您的用例快速转换。
静态最终 class WordCountTransformerSupplier(第 78 行)将成为您的 LatLongDistanceComputation。
您将创建具有适当类型的 StoreBuilder(第 154 行)(无论您将 lat/lon 存储为什么)。
第 165 行是实际从流入的值流中读取项目的地方。
当然,您还需要编辑 inputTopic 和 outputTopic(第 66-67 行)以及其他一些内容。