如何使用 KSQLDB 做地理围栏 monitoring/analytics?

How to do Geofence monitoring/analytics using KSQLDB?

我正在尝试使用 KSQLDB 进行地理围栏 monitoring/analytics。我想在车辆 ENTERS/LEAVES 地理围栏时收到消息。从 [https://github.com/gschmutz/various-demos/tree/master/kafka-geofencing] 中汲取灵感,我创建了一个名为 GEOFENCE 的 UDF,下面是相同的代码。

下面是我对地理围栏流和实时车辆位置流执行连接的查询

CREATE stream join_live_pos_geofence_status_1 AS SELECT lp1.vehicleid, 
          lp1.lat, 
          lp1.lon, 
          s1p.geofencecoordinates, 
          Geofence(lp1.lat, lp1.lon, 'POLYGON(('+s1p.geofencecoordinates+'))') AS geofence_status 
FROM      live_position_1 LP1 
LEFT JOIN stream_1_processed S1P within 72 hours 
ON        kmdlp1.clusterid = kmds1p.clusterid emit changes;

我正在考虑过去 3 天内创建的所有地理围栏。

我创建了另一个查询来使用先前查询的地理围栏状态来计算车辆是否是 ENTERING/LEAVING 地理围栏。

CREATE stream join_geofence_monitoring_1 AS SELECT *, 
       Geofence(jlpgs1.lat, jlpgs1.lon, 'POLYGON(('+jlpgs1.geofencecoordinates+'))', jlpgs1.geofence_status) geofence_monitoring_status
FROM   join_live_pos_geofence_status_1 JLPGS1 emit changes;

以上查询分别为 geofence_status 和 geofence_monitoring_status 列提供 'INSIDE'、'INSIDE' 的输出,或者输出为 'OUTSIDE'、'OUTSIDE' 分别代表 geofence_status 和 geofence_monitoring_status 列。我知道我没有考虑时间方面,就像这两个查询永远不应该同时执行说 't0' 但我无法想到这样做的正确方法。

public class Geofence 
{
    private static final String OUTSIDE = "OUTSIDE";
    private static final String INSIDE = "INSIDE";
    private static GeometryFactory geometryFactory = JTSFactoryFinder.getGeometryFactory();
    private static WKTReader wktReader = new WKTReader(geometryFactory);

    @Udf(description = "Returns whether a coordinate lies within a polygon or not")
    public static String geofence(final double latitude, final double longitude, String geometryWKT) {
        boolean status = false;
        String result = "";
        Polygon polygon = null;
        try {
            polygon = (Polygon) wktReader.read(geometryWKT);

            // However, an important point to note is that the longitude is the X value 
            // and the latitude the Y value. So we say "lat/long", 
            // but JTS will expect it in the order "long/lat". 
            Coordinate coord = new Coordinate(longitude, latitude);
            Point point = geometryFactory.createPoint(coord);

            status = point.within(polygon);
            if(status)
            {
                result = INSIDE;
            }
            else
            {
                result = OUTSIDE;
            }
        } catch (ParseException e) {
            throw new RuntimeException(e.getMessage());
        }
        return result;
    }

    @Udf(description = "Returns whether a coordinate moved in or out of a polygon")
    public static String geofence(final double latitude, final double longitude, String geometryWKT, final String statusBefore) {

        String status = geofence(latitude, longitude, geometryWKT);
        if (statusBefore.equals("INSIDE") && status.equals("OUTSIDE")) {
            //status = "LEAVING";
            return "LEAVING";
        } else if (statusBefore.equals("OUTSIDE") && status.equals("INSIDE")) {
            //status = "ENTERING";
            return "ENTERING";
        }
        return status;
    }

}

我的问题是如何正确计算车辆是 ENTERING/LEAVING 地理围栏?甚至可以使用 KSQLDB 吗?

说 join_live_pos_geofence_status_1 流可以包含从 INSIDE -> OUTSIDE 然后从 OUTSIDE -> INSIDE 的行以获得某个键值是否正确?

而您想要做的是为这些转换输出 LEAVINGENTERING 事件?

您可以使用自定义 UDAF 做您想做的事。自定义 UDAF 通过某种中间状态获取并输入并计算输出。例如,一个 AVG udaf 将一些数字作为输入,它的中间状态将是输入的数量和输入的总和,输出将是 count/sum.

在您的情况下,输入将是当前状态,例如INSIDEOUTSIDE。 UDAF 需要将最后两个状态存储在其中间状态中,然后可以由此计算输出状态。例如

Input   Intermediate    Output
INSIDE  INSIDE          <only single in intermediate - your choice what you output>
INSIDE  INSIDE,INSIDE   no-change
OUTSIDE INSIDE,OUTSIDE  LEAVING
OUTSIDE OUTSIDE,OUTSIDE no-change
INSIDE  OUTSIDE,INSIDE  ENTERING

当中间状态只有一个条目时,即第一次看到一个键时,您需要决定输出什么。

然后您可以过滤输出以删除具有 no-change.

的任何行

您可能还需要将 cache.max.bytes.buffering 设置为零以停止合并任何结果。

更新:建议代码。

未经测试,但类似下面的代码可能会满足您的要求:

@UdafDescription(name = "my_geofence", description = "Computes the geofence status.")
public final class GoeFenceUdaf {

  private static final String STATUS_1 = "STATUS_1";
  private static final String STATUS_2 = "STATUS_2";

  @UdafFactory(description = "Computes the geofence status.",
      aggregateSchema = "STRUCT<" + STATUS_1 + " STRING, " + STATUS_2 + " STRING>")
  public static Udaf<String, Struct, String> calcGeoFenceStatus() {

    final Schema STRUCT_SCHEMA = SchemaBuilder.struct().optional()
        .field(STATUS_1, Schema.OPTIONAL_STRING_SCHEMA)
        .field(STATUS_2, Schema.OPTIONAL_STRING_SCHEMA)
        .build();

    return new Udaf<String, Struct, String>() {

      @Override
      public Struct initialize() {
        return new Struct(STRUCT_SCHEMA);
      }

      @Override
      public Struct aggregate(
          final String newValue,
          final Struct aggregate
      ) {
        if (newValue == null) {
          return aggregate;
        }

        if (aggregate.getString(STATUS_1) == null) {
          // First status for this key:
          return aggregate
              .put(STATUS_1, newValue);
        }

        final String lastStatus = aggregate.getString(STATUS_2);
        if (lastStatus == null) {
          // Second status for this key:
          return aggregate
              .put(STATUS_2, newValue);
        }

        // Third and subsequent status for this key:
        return aggregate
            .put(STATUS_1, lastStatus)
            .put(STATUS_2, newValue);
      }

      @Override
      public String map(final Struct aggregate) {
        final String previousStatus = aggregate.getString(STATUS_1);
        final String currentStatus = aggregate.getString(STATUS_2);
        if (currentStatus == null) {
          // Only have single status, i.e. first status for this key
          // What to do?  Probably want to do:
          return previousStatus.equalsIgnoreCase("OUTSIDE")
              ? "LEAVING"
              : "ENTERING";
        }

        // Two statuses ...
        if (currentStatus.equals(previousStatus)) {
          return "NO CHANGE";
        }

        return previousStatus.equalsIgnoreCase("OUTSIDE")
            ? "ENTERING"
            : "LEAVING";
      }

      @Override
      public Struct merge(final Struct agg1, final Struct agg2) {
        throw new RuntimeException("Function does not support session windows");
      }
    };
  }
}