Kafka Streams table 次转换

Kafka Streams table transformations

我在SQL服务器中有一个table我想流到Kafka主题,结构如下:

(UserID, ReportID)

此 table 将不断更改(添加、插入、无更新的记录)

我想把这个改成这样的结构放到Elasticsearch中:

{
  "UserID": 1,
  "Reports": [1, 2, 3, 4, 5, 6]
}

到目前为止,我看到的示例是日志或点击流,它们在我的情况下不起作用。

这种用例是否可能?我总是可以只查看 UserID 更改和查询数据库,但这似乎很幼稚,不是最好的方法。

更新

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;

import java.util.ArrayList;
import java.util.Properties;

public class MyDemo {
  public static void main(String... args) {
    System.out.println("Hello KTable!");

    final Serde<Long> longSerde = Serdes.Long();

    KStreamBuilder builder = new KStreamBuilder();

    KStream<Long, Long> reportPermission = builder.stream(TOPIC);

    KTable<Long, ArrayList<Long>> result = reportPermission
        .groupByKey()
        .aggregate(
            new Initializer<ArrayList<Long>>() {
              @Override
              public ArrayList<Long> apply() {
                return null;
              }
            },
            new Aggregator<Long, Long, ArrayList<Long>>() {
              @Override
              public ArrayList<Long> apply(Long key, Long value, ArrayList<Long> aggregate) {
                aggregate.add(value);
                return aggregate;
              }
            },
            new Serde<ArrayList<Long>>() {
              @Override
              public void configure(Map<String, ?> configs, boolean isKey) {}

              @Override
              public void close() {}

              @Override
              public Serializer<ArrayList<Long>> serializer() {
                return null;
              }

              @Override
              public Deserializer<ArrayList<Long>> deserializer() {
                return null;
              }
            });

    result.to("report-aggregated-topic");

    KafkaStreams streams = new KafkaStreams(builder, createStreamProperties());
    streams.cleanUp();
    streams.start();

    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  }

  private static final String TOPIC = "report-permission";

  private static final Properties createStreamProperties() {
    Properties props = new Properties();

    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "report-permission-app");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");

    return props;
  }
}

我实际上陷入了聚合阶段,因为我无法为 ArrayList<Long> 编写适当的 SerDe(还没有足够的技能),lambda 似乎不适用于聚合器 - 它不知道是什么类型共 agg

KTable<Long, ArrayList<Long>> sample = builder.stream(TOPIC)
    .groupByKey()
    .aggregate(
        () -> new ArrayList<Long>(),
        (key, val, agg) -> agg.add(val),
        longSerde
    );

直接在SQL和Kafka Streams中不允许这种方法,但是用例是可能的并且可以按如下方式实现:

1) 使用 SOLRJ API 在 SQL 服务器上编写自定义应用程序,每当在 SQL 中执行 DML(插入、更新、删除等)操作时,该应用程序将命中 Solr 实例。 https://wiki.apache.org/solr/Solrj

2) 使用 Solr SQL Data Import Handler 通过使用它 SQL 服务器将在 SQL 中发生 DML(插入、更新、删除等)操作时自动通知 solr . https://wiki.apache.org/solr/DataImportHandler

您可以使用 Kafka 的连接 API 从 SQL 服务器获取数据到 Kafka。我不知道 SQL 服务器有任何特定的连接器,但您可以使用任何基于 JDBC 的通用连接器:https://www.confluent.io/product/connectors/

要处理数据,您可以使用 Kafka 的 Streams API。您可以简单地 aggregate() 每个用户的所有报告。像这样:

KTable<UserId, List<Reports>> result =
    builder.stream("topic-name")
           .groupByKey()
           // init a new empty list and
           // `add` the items to the list in the actual aggregation
           .aggregate(...);

result.to("result-topic");

查看文档以了解有关流的更多详细信息API:https://docs.confluent.io/current/streams/index.html

Note, that you need to make sure that the list of reports does not grow unbounded. Kafka has some (configurable) maximum message size and the whole list will be contained in a single message. Thus, you should estimate the maximum message size and apply the corresponding configuration (-> max.message.bytes) before going into production. Check out configs at the webpage: http://kafka.apache.org/documentation/#brokerconfigs

最后,您使用 Connect API 将数据推送到 Elastic Search。有多种不同的连接器可用(我当然会推荐 Confluent 连接器)。有关 Connect API 的更多详细信息:https://docs.confluent.io/current/connect/userguide.html