我正在尝试逐步通过apachecamel的源代码来确定bug的来源。尽管配置了 StringDeserializer
对于消费者:
org.apache.kafka.common.errors.SerializationException: Can't convert key of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in key.serializer
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
当我尝试逐步遍历camel以找出反序列化字符串如何仍以字节数组结束时,camel会不断关闭自己,因为协调器认为使用者已死亡:
20:45:04.171 [kafka-coordinator-heartbeat-thread | rtp-creditor-receive-payment] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=rtp-creditor-receive-payment] Marking the coordinator rtp-demo-cluster-kafka-0.rtp-demo-cluster-kafka-brokers.rtp-reference.svc.cluster.local:9092 (id: 2147483647 rack: null) dead
如何完全禁用所有超时,以便在不担心消费者被标记为死亡的情况下逐步完成源代码?
1条答案
按热度按时间iq3niunx1#
虽然不能禁用kafka群集与其使用者之间的所有超时,但可以将一些属性修改为非常长:
group.max.session.timeout.ms
-这是任何使用者的最大会话超时。默认值为5分钟。将其设置为最大整数,例如2100000000
,在通常命名为server.properties
.max.poll.interval.ms
-这类似于会话超时,如果在此间隔内没有轮询,则会将使用者标记为已死亡。将此值设置为小于request.timeout.ms
以及,例如1900000000
.在apache camel中,您需要设置以下属性:
consumerRequestTimeoutMs
-这是等待客户端响应的最长时间。将此设置为2000000000
.sessionTimeoutMs
-这很可能是将您的消费者标记为已死亡的会话超时。该值应设置为小于request.timeout.ms
. 所以,就像1900000000
.可能还有一些,在这里找到的,任何一个
timeout
或者ms
是个不错的选择。