使用kafka流来隔离消息

Using kafka streams to segregate messages

我有一个设置,其中每个 kafka 消息都将包含一个 "sender" 字段。所有这些消息都发送到一个主题。

有没有办法在消费者端隔离这些消息?我希望发件人特定的消费者能够单独阅读与该发件人有关的所有消息。

我应该使用 Kafka Streams 来实现吗?我是 Kafka Streams 的新手,任何建议指导都会有所帮助。

public class KafkaStreams3 {

public static void main(String[] args) throws JSONException {       

    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafkastreams1");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    final Serde < String > stringSerde = Serdes.String();

    Properties kafkaProperties = new Properties();
    kafkaProperties.put("key.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");
    kafkaProperties.put("value.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");

    kafkaProperties.put("bootstrap.servers", "localhost:9092");

    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(kafkaProperties);



    KStreamBuilder builder = new KStreamBuilder();

   KStream<String, String> source = builder.stream(stringSerde, stringSerde, "topic1");


    KStream<String, String> s1 = source.map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
        @Override
        public KeyValue<String, String> apply(String dummy, String record) {
            JSONObject jsonObject;

            try {
                jsonObject = new JSONObject(record);
                return new KeyValue<String,String>(jsonObject.get("sender").toString(), record);
            } catch (JSONException e) {
                e.printStackTrace();
                return new KeyValue<>(record, record);
            }

        }
      });

    s1.print();

    s1.foreach(new ForeachAction<String, String>() {

        @Override
        public void apply(String key, String value) {
            ProducerRecord<String, String> data1 = new ProducerRecord<String, String>(
                    key, key, value);
            producer.send(data1);

        }

    });

    KafkaStreams streams = new KafkaStreams(builder, props);

    streams.start();

    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
        @Override
        public void run() {
          streams.close();
          producer.close();
        }
      }));

}

}

我相信实现此目的的最简单方法是使用您的 "sender" 字段作为键并使用 "sender" 划分单个主题,这将为您提供每个 [=] 的位置和顺序22=] 这样您就可以根据 "sender" 获得更强的排序保证,并且您可以连接客户端以从特定分区使用。

另一种可能性是,从最初的主题开始,您可以将消息流式传输到其他主题,并通过按键聚合,这样您最终每个 "sender" 都有一个主题。

这是生产者的代码片段,然后使用 json 序列化器和反序列化器进行流式传输。

制作人:

private Properties kafkaClientProperties() {
    Properties properties = new Properties();

    final Serializer<JsonNode> jsonSerializer = new JsonSerializer();

    properties.put("bootstrap.servers", config.getHost());
    properties.put("client.id", clientId);
    properties.put("key.serializer", StringSerializer.class);
    properties.put("value.serializer", jsonSerializer.getClass());

    return properties;
} 

public Future<RecordMetadata> send(String topic, String key, Object instance) {
    ObjectMapper objectMapper = new ObjectMapper();
    JsonNode jsonNode = objectMapper.convertValue(instance, JsonNode.class);
    return kafkaProducer.send(new ProducerRecord<>(topic, key,
            jsonNode));
}

流:

log.info("loading kafka stream configuration");
    final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
    final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
    final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

    KStreamBuilder kStreamBuilder = new KStreamBuilder();
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, config.getStreamEnrichProduce().getId());
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);

    //stream from topic...
    KStream<String, JsonNode> stockQuoteRawStream = kStreamBuilder.stream(Serdes.String(), jsonSerde , config.getStockQuote().getTopic());

    Map<String, Map> exchanges = stockExchangeMaps.getExchanges();
    ObjectMapper objectMapper = new ObjectMapper();
    kafkaProducer.configure(config.getStreamEnrichProduce().getTopic());
    // - enrich stockquote with stockdetails before producing to new topic
    stockQuoteRawStream.foreach((key, jsonNode) -> {
        StockQuote stockQuote = null;
        StockDetail stockDetail;
        try {
            stockQuote = objectMapper.treeToValue(jsonNode, StockQuote.class);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        JsonNode exchangeNode = jsonNode.get("exchange");
        // get stockDetail that matches current quote being processed
        Map<String, StockDetail> stockDetailMap = exchanges.get(exchangeNode.toString().replace("\"", ""));
        stockDetail = stockDetailMap.get(key);
        stockQuote.setStockDetail(stockDetail);
        kafkaProducer.send(config.getStreamEnrichProduce().getTopic(), null, stockQuote);
    });

    return new KafkaStreams(kStreamBuilder, props);