卡夫卡流中的延迟功能

delay function in kafka streams

正在尝试使用 kafka 流代码进行一些尝试,并希望在拆分数据后添加延迟或 threads.sleep() 之类的东西 1 毫秒......我很困惑该怎么做......有人可以帮助我这样做吗?

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textlines = builder.stream("INTOPIC");
KStream<String, String> mstream = textlines
    .mapValues(value -> value.replace("[",""));
    .mapValues(value -> value.replace("]",""));
    .mapValues(value -> value.replaceAll("\},\{" ,"\}\},\{\{"))
    .flatMapValues(value -> Arrays.asList(value.split("\},\{")));
mstream.to("OUTTOPIC");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

所以在 .flatmapvalues 语句之后我需要为 1ms 添加一个 thread.sleep() 那么我的语句在那里..?

不确定您要实现什么,但您似乎想减慢处理速度?比起你可以在你的使用代码中睡一觉。为此,您的 lambda 表达式必须在 returns 实际结果之前调用 "sleep"。作为替代方案,您还可以添加一个额外的 .foreach()peek() 呼叫并在那里睡觉。