我们如何将相同键的所有值与列表和 return Kafka Streams 的键和值合并为 String

How can we club all the values for same key as a list and return Kafka Streams with key and value as String

我有一个关于 kafka 主题的数据作为 (key:id, {id:1, body:...}) 表示消息的密钥与 id 相同。但是可以有多个具有相同 ID 但不同正文的消息。 所以我得到了 kstream <String, String>

现在我想获取所有具有相同 ID(键)的消息,并将所有值合并为一个列表,return 为

Kstream<String, List<String>>

有什么建议吗?

    //Create a Stream with a state store

    StreamsBuilder builder = new StreamsBuilder();

    StoreBuilder<KeyValueStore<String, List<String>>> logTracerStateStore = Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore(LOG_TRACE_STATE_STORE), Serdes.String(),
            new ListSerde<String>(Serdes.String()));
            
    //add this to stream builder
    builder.addStateStore(logTracerStateStore);

    KStream<String, String> kafkaStream = builder.stream(TOPIC);
    splitProcessor(kafkaStream);
    logger.info("creating stream for topic {} ..", TOPIC);

    final Topology topology = builder.build();
    return new KafkaStreams(topology, streamConfiguration(bootstrapServers));
    

    // Stream List Serde 
    
    public class ListSerde<T> implements Serde<List<T>> {

         private final Serde<List<T>> inner;

          public ListSerde( final Serde<T> avroSerde) {
            inner = Serdes.serdeFrom(new ListSerializer<>( avroSerde.serializer()),
                                     new ListDeserializer<>( avroSerde.deserializer()));
          }

          @Override
          public Serializer<List<T>> serializer() {
            return inner.serializer();
          }

          @Override
          public Deserializer<List<T>> deserializer() {
            return inner.deserializer();
          }

          @Override
          public void configure(final Map<String, ?> configs, final boolean isKey) {
            inner.serializer().configure(configs, isKey);
            inner.deserializer().configure(configs, isKey);
          }

          @Override
          public void close() {
            inner.serializer().close();
            inner.deserializer().close();
          }
    }
    
    // Serializer & deserializers 
    
    public class ListSerializer<T> implements Serializer<List<T>> {

    //  private final Comparator<T> comparator;
      private final Serializer<T> valueSerializer;

      public ListSerializer( final Serializer<T> valueSerializer) {
    //      this.comparator = comparator;
          this.valueSerializer = valueSerializer;
      }
      @Override
      public void configure(final Map<String, ?> configs, final boolean isKey) {
          // do nothing
      }

      @Override
      public byte[] serialize(final String topic, final List<T> list) {
          final int size = list.size();
          final ByteArrayOutputStream baos = new ByteArrayOutputStream();
          final DataOutputStream out = new DataOutputStream(baos);
          final Iterator<T> iterator = list.iterator();
          try {
              out.writeInt(size);
              while (iterator.hasNext()) {
                  final byte[] bytes = valueSerializer.serialize(topic, iterator.next());
                  out.writeInt(bytes.length);
                  out.write(bytes);
              }
              out.close();
          } catch (final IOException e) {
              throw new RuntimeException("unable to serialize List", e);
          }
          return baos.toByteArray();
      }

      @Override
      public void close() {

      }

    }
    
    //------------
    public class ListDeserializer<T> implements Deserializer<List<T>> {

    //  private final Comparator<T> comparator;
      private final Deserializer<T> valueDeserializer;

      public ListDeserializer(final Deserializer<T> valueDeserializer) {
    //      this.comparator = comparator;
          this.valueDeserializer = valueDeserializer;
      }

      @Override
      public void configure(final Map<String, ?> configs, final boolean isKey) {
          // do nothing
      }

      @Override
      public List<T> deserialize(final String s, final byte[] bytes) {
          if (bytes == null || bytes.length == 0) {
              return null;
          }
          final List<T> list = new ArrayList<>();
          final DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes));
          try {
              final int records = dataInputStream.readInt();
              for (int i = 0; i < records; i++) {
                  final byte[] valueBytes = new byte[dataInputStream.readInt()];
                  dataInputStream.read(valueBytes);
                  list.add(valueDeserializer.deserialize(s, valueBytes));
              }
    //          dataInputStream.close();
          } catch (final IOException e) {
              throw new RuntimeException("Unable to deserialize PriorityQueue", e);
          }finally {
                try {
                    dataInputStream.close();
                } catch (Exception e2) {
                    // TODO: handle exception
                }
          }
          return list;
      }

      @Override
      public void close() {

      }

    }
    /// Now create Stream Processors
    
    public class LogTraceStreamStateProcessor implements Processor<String, String>{

        private static final Logger logger = Logger.getLogger(LogTraceStreamStateProcessor.class);
        IStore stateStore;

        /**
         * Initialize the transformer.
         */
        @Override
        public void init(ProcessorContext context) {
            logger.info("initializing processor and looking for monitoring store");
            stateStore = MonitoringStateStoreFactory.getInstance().getStore();
            logger.debug("found the monitoring store - {} ", stateStore);
            stateStore.initLogTraceStoreProcess(context);
            logger.debug("initalizing monitoring store.");
        }

        @Override
        public void process(String key, String value) {

            logger.debug("Storing the value for logtrace storage - {} ", value);
            stateStore.storeLogTrace(value);
            logger.debug("finished Storing the value for logtrace storage - {} ", value);

        }

        @Override
        public void close() {
            // TODO Auto-generated method stub

        }

    }
    
    // access the key value state store like below
    KeyValueStore<String, List<String>> stateStore =  (KeyValueStore<String, List<String>>) traceStreamContext.getStateStore(EXEID_REQ_REL_STORE);
    
    //Now add a list to new key for a new message and if the key exists then add a new message in the list
     
    public void storeTraceData(String traceData) {
        try {
            TraceEvent tracer = new TraceEvent();

            logger.debug("Received the Trace value - {}", traceData);
            tracer = mapper.readValue(traceData, TraceEvent.class);
            logger.debug("trace unmarshelling has been completed successfully !!!");

            String key = tracer.getExecutionId();
            
            List<String> listEvents = stateStore.get(key);

            if (listEvents != null && !listEvents.isEmpty()) {

                logger.debug("event is already in store so storing in the list for execution id - {}", key);
                listEvents.add(requestId);
                stateStore.put(key, listEvents);
            } else {
                logger.debug(
                        "event is not present in the store so creating a new list and adding into store for execution id - {}",
                        key);
                List<String> list = new ArrayList<>();
                list.add(requestId);

                stateStore.put(key, list);

            }

        } catch (Throwable e) {
            logger.error("exception while processing the trace event .. ", e);
        } finally {
            try {
                traceStreamContext.commit();
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }

    }
    /// now this is how you can access the message from state store
    public ReadOnlyKeyValueStore<String, List<String>> tracerStore() {
        return waitUntilStoreIsQueryable(KEY_NAME);
    }