卡夫卡流中的延迟功能
delay function in kafka streams
正在尝试使用 kafka 流代码进行一些尝试,并希望在拆分数据后添加延迟或 threads.sleep() 之类的东西 1 毫秒......我很困惑该怎么做......有人可以帮助我这样做吗?
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textlines = builder.stream("INTOPIC");
KStream<String, String> mstream = textlines
.mapValues(value -> value.replace("[",""));
.mapValues(value -> value.replace("]",""));
.mapValues(value -> value.replaceAll("\},\{" ,"\}\},\{\{"))
.flatMapValues(value -> Arrays.asList(value.split("\},\{")));
mstream.to("OUTTOPIC");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
所以在 .flatmapvalues 语句之后我需要为 1ms 添加一个 thread.sleep() 那么我的语句在那里..?
不确定您要实现什么,但您似乎想减慢处理速度?比起你可以在你的使用代码中睡一觉。为此,您的 lambda 表达式必须在 returns 实际结果之前调用 "sleep"。作为替代方案,您还可以添加一个额外的 .foreach()
或 peek()
呼叫并在那里睡觉。
正在尝试使用 kafka 流代码进行一些尝试,并希望在拆分数据后添加延迟或 threads.sleep() 之类的东西 1 毫秒......我很困惑该怎么做......有人可以帮助我这样做吗?
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textlines = builder.stream("INTOPIC");
KStream<String, String> mstream = textlines
.mapValues(value -> value.replace("[",""));
.mapValues(value -> value.replace("]",""));
.mapValues(value -> value.replaceAll("\},\{" ,"\}\},\{\{"))
.flatMapValues(value -> Arrays.asList(value.split("\},\{")));
mstream.to("OUTTOPIC");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
所以在 .flatmapvalues 语句之后我需要为 1ms 添加一个 thread.sleep() 那么我的语句在那里..?
不确定您要实现什么,但您似乎想减慢处理速度?比起你可以在你的使用代码中睡一觉。为此,您的 lambda 表达式必须在 returns 实际结果之前调用 "sleep"。作为替代方案,您还可以添加一个额外的 .foreach()
或 peek()
呼叫并在那里睡觉。