如何根据输入字段将json数据流发送到kafka中的多个主题

how to send json data stream to multiple topics in kafka based on input fields

我必须使用 json 进入 kafka 流的数据并发送到不同的主题(应用程序 ID 和实体的不同组合)以供进一步使用。
主题名称:

    app1.entity1
    app1.entity2
    app2.entity1
    app2.entity2

Json数据

    [
        {
            "appId": "app1",
            "entity": "entity1",
            "extractType": "txn",
            "status": "success",
            "fileId": "21151235"
        },
        {
            "appId": "app1",
            "entity": "entity2",
            "extractType": "txn",
            "status": "fail",
            "fileId": "2134234123"
        },
        {
            "appId": "app2",
            "entity": "entity3",
            "extractType": "payment",
            "status": "success",
            "fileId": "2312de23e"
        },
        {
            "appId": "app2",
            "entity": "entity3",
            "extractType": "txn",
            "status": "fail",
            "fileId": "asxs3434"
        }
    ]

TestInput.java

        private String appId;           
        private String entity ;             
        private String extractType;         
        private String status;          
        private String fileId; 

        setter/gtter

SpringBootConfig.java

      @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
      public KafkaStreamsConfiguration kStreamsConfigs(KafkaProperties kafkaProperties) {
          Map<String, Object> config = new HashMap<>();
          config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
          config.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getClientId());
          config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
          config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new JsonSerde<>(TestInput.class).getClass());
          config.put(JsonDeserializer.DEFAULT_KEY_TYPE, String.class);
          config.put(JsonDeserializer.DEFAULT_VALUE_TYPE, TestInput.class);
          return new KafkaStreamsConfiguration(config);
      }

      @Bean
      public KStream<String, TestInput> kStream(StreamsBuilder kStreamBuilder) {
          KStream<String, TestInput> stream = kStreamBuilder.stream(inputTopic);
                 // how to form key , group records and send to different topics
          return stream;
      }

我搜索了很多但没有找到任何动态发布数据到主题的东西。请高手帮忙

使用stream.branch()

https://www.confluent.io/blog/putting-events-in-their-place-with-dynamic-routing/

Next, let’s modify the requirement. Instead of processing all events in the stream, each microservice should take action only on a subset of relevant events. One way to handle this requirement is to have a microservice that subscribes to the original stream with all the events, examines each record and then takes action only on the events it cares about while discarding the rest. However, depending on the application, this may be undesirable or resource intensive.

A cleaner way is to provide the service with a separate stream that contains only the relevant subset of events that the microservice cares about. To achieve this, a streaming application can branch the original event stream into different substreams using the method KStream#branch(). This results in new Kafka topics, so then the microservice can subscribe to one of the branched streams directly.

...