如果接收器 node/topic 不可访问,Kafka 流将关闭?

Kafka streams shuts down if sink node/topic not reachable?

我想测试使用处理器 API 的 Kafka Streams 正在从源读取并写入主题列表并且无法访问一个或两个主题的场景(失败测试:尝试通过添加 1 来模拟它/2 集群中不存在的主题)。

     topology.addSource("mysource","source_topic");
     topology.addProcessor("STREAM_PROCESSOR",()->new SourceProcessor(),"mysource");
     topology.addSink("SINK_1","TOPIC_1","STREAM_PROCESSOR");
     topology.addSink("SINK_2","TOPIC_2","STREAM_PROCESSOR");
     topology.addSink("SINK_3","TOPIC_3","STREAM_PROCESSOR"); // This topic is not present in cluster

      sourceContext.forward(eventName,eventMessage,To.child("sink1 or sink2 or sink3"));

我的理解是 kafkaStreams 应该对不存在的主题给出错误,并继续将记录转发到存在的主题 1 和 2。

但我看到的行为是出现以下错误:

     Exception in thread "StreamProcessor-56da56e4-4ab3-4ca3-bf48-b059558b689f-StreamThread-1" 
     org.apache.kafka.streams.errors.StreamsException: 
     task [0_0] Abort sending since an error caught with a previous record (timestamp 1592940025090) to topic "TOPIC_X" due to 
     org.apache.kafka.common.errors.TimeoutException: Topic "TOPIC_X" not present in metadata after 60000 ms.
     Timeout exception caught when sending record to topic "TOPIC_X". This might happen if the producer cannot send data to the
     Kafka cluster and thus, its internal buffer fills up. This can also happen if the broker is slow to respond, if the network connection to the
     broker was interrupted, or if similar circumstances arise. You can increase producer parameter `max.block.ms` to increase this timeout.

这是模拟无法访问的主题或主题不存在问题的正确方法吗?还有为什么即使我们正在处理流和拓扑异常,卡夫卡流也会因上述错误而关闭。 如果其中一个接收器主题由于某种原因不可用或无法访问,kafka 流不应该关闭,对吗? . 请推荐

On the above error I want to forward the error when catching the StreamsException to Error topic , however kafkastreams stops prematurely.

catch(StreamsException e)
{
    context.forward("","",Error_Topic)
}

这是预期的行为吗?

参考 : https://docs.confluent.io/current/streams/developer-guide/manage-topics.html#user-topics 这是否意味着在 kafkastreams 拓扑中不允许将不存在的主题作为汇节点。请确认。

如果 Kafka Streams 无法写入接收器主题,它就会关闭。原因是,默认情况下,Kafka Streams 保证至少一次处理语义,如果它不能将数据写入一个接收器主题但会继续,则将违反至少一次处理,因为接收器主题中会丢失数据.

有一个 production.exception.handler 配置可能会有所帮助。它允许您在将数据写入输出主题时吞下某些异常。但是,请注意,这意味着您在相应主题上丢失了数据。