如果无法访问接收器节点/主题,kafka流将关闭?

utugiqy6  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(367)

我想测试一个场景,当kafka流使用处理器api从一个源读取并写入主题列表时,一个或两个主题是不可访问的(失败测试:尝试通过添加集群中不存在的1/2主题来模拟它)。

topology.addSource("mysource","source_topic");
     topology.addProcessor("STREAM_PROCESSOR",()->new SourceProcessor(),"mysource");
     topology.addSink("SINK_1","TOPIC_1","STREAM_PROCESSOR");
     topology.addSink("SINK_2","TOPIC_2","STREAM_PROCESSOR");
     topology.addSink("SINK_3","TOPIC_3","STREAM_PROCESSOR"); // This topic is not present in cluster

      sourceContext.forward(eventName,eventMessage,To.child("sink1 or sink2 or sink3"));

我的理解是kafkastreams应该对不存在的主题给出错误,并继续将记录转发到存在的主题1和主题2。
但我看到的行为是,它会产生以下错误:

Exception in thread "StreamProcessor-56da56e4-4ab3-4ca3-bf48-b059558b689f-StreamThread-1" 
     org.apache.kafka.streams.errors.StreamsException: 
     task [0_0] Abort sending since an error caught with a previous record (timestamp 1592940025090) to topic "TOPIC_X" due to 
     org.apache.kafka.common.errors.TimeoutException: Topic "TOPIC_X" not present in metadata after 60000 ms.
     Timeout exception caught when sending record to topic "TOPIC_X". This might happen if the producer cannot send data to the
     Kafka cluster and thus, its internal buffer fills up. This can also happen if the broker is slow to respond, if the network connection to the
     broker was interrupted, or if similar circumstances arise. You can increase producer parameter `max.block.ms` to increase this timeout.

这是模拟无法访问的主题或主题不存在的问题的正确方法吗?还有,为什么kafka流在处理流和拓扑异常时也会因上述错误而关闭。如果某个接收器主题因某种原因不可用或不可访问,则kafka流不应关闭,对吗。好心的建议
对于上面的错误,我想在捕获streamseception时将错误转发到错误主题,但是kafkastreams会提前停止。

catch(StreamsException e)
{
    context.forward("","",Error_Topic)
}

这是预期的行为吗?
参考:https://docs.confluent.io/current/streams/developer-guide/manage-topics.html#user-topics这是否意味着kafkastreams拓扑中不允许不存在的主题作为接收节点。请确认。

wbgh16ku

wbgh16ku1#

Kafka的设计是,如果它不能写入一个接收器主题,它就会关闭流。原因是,默认情况下,kafka streams保证至少有一次处理语义,如果它不能将数据写入一个sink主题,但会继续,那么至少有一次处理会被违反,因为sink主题中会有数据丢失。
有一个 production.exception.handler 可能有用的配置。它允许您在将数据写入输出主题时接受某些异常。但是,请注意,这意味着相应主题的数据丢失。

相关问题