我用的是Spring Boot 2.0.2.RELEASE
使用spring for ApacheKafka(有效pom) 2.1.6.RELEASE
版本为 Spring Kafka)。
我已经不再用普通的了 ByteArrayDeserializer
使用confluent的反序列化程序
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
因此,我不必获取字节,然后将它们反序列化到有效负载中,等等。但这样做的副作用是一些旧消息-我不能再读取了,因为它们的模式在合流注册表中略有不同。
所以当我启动应用程序的时候,我总是收到这样的信息
2018-08-17 17:58:51.360 ERROR 18004 --- [ntainer#0-0-C-1] o.s.k.listener.BatchLoggingErrorHandler : Error while processing:
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition ri00-q-log-et-final-0 at offset 36833. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
所以我决定我必须从主题的结尾开始听,我已经检查了文档https://docs.spring.io/spring-kafka/reference/htmlsingle/#seek 建议实现ConsumerSekAware及其子接口ConsumerSekAware.ConsumerSekCallback
我已经更改了包含@kafkalistener方法的@service类,以实现文档中提到的接口
@Service
public class MyAvroListener implements
ConsumerSeekAware.ConsumerSeekCallback,ConsumerSeekAware {
它有@kafkalistener注解的方法,在这里我试着寻找分区的结尾
@KafkaListener(topics = "${topic}", containerFactory = "myAvroListenerFactory")
public void listen(final Acknowledgment ack, final List<ConsumerRecord<String, EclLogging>> messages) throws Exception {
this.seekCallBack.get().seekToEnd(topic,0);
try {
for (ConsumerRecord<String, EclLogging> kafkaRecord : messages) {
我也尝试过寻找特定的偏移量(因为我一直在36833偏移量信息卡住)
@KafkaListener(topics = "${topic}", containerFactory = "myAvroListenerFactory")
public void listen(final Acknowledgment ack, final List<ConsumerRecord<String, EclLogging>> messages) throws Exception {
this.seekCallBack.get().seek(topic,0,36900);
try {
for (ConsumerRecord<String, EclLogging> kafkaRecord : messages) {
我已经实现了上述接口的方法
private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();
@Override
public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
this.seekCallBack.set(consumerSeekCallback);
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
}
@Override
public void seek(String s, int i, long l) {
}
@Override
public void seekToBeginning(String s, int i) {
}
@Override
public void seekToEnd(String topic, int partition) {
System.out.println("seekToEnd is hit for topic s = " + topic + " and partition i=" + partition);
}
当应用程序启动时,registerseekcallback方法确实被命中,但是seektoend或seek方法没有被命中。
因此我一直收到这个信息
2018-08-17 17:58:51.360 ERROR 18004 --- [ntainer#0-0-C-1] o.s.k.listener.BatchLoggingErrorHandler : Error while processing:
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition ri00-q-log-et-final-0 at offset 36833. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
我在这里使用SpringKafka模板实现示例中的片段来查找偏移量、确认
正如这里提到的,什么决定了kafka consumer offset?我不能使用auto.offset.reset属性从主题末尾开始消费(除非我使用不同的consumergroupid-在我的情况下这是不可能的)。我想知道我是否可以利用现有的消费者群体来解决这个问题。
3条答案
按热度按时间dly7yett1#
根据@garyrussell的反馈,我对代码做了以下修改以使其正常工作。谢谢@gary和@manjeet/@cricket\u 007
所以我做了下面的事情,它成功了
基本上对注解为@kafkalistener的方法没有任何更改,但是包含它的类必须实现这些接口
然后在那个类中我实现了这些接口的方法。。。
1tu0hz3e2#
当您已经用不同的模式发送消息时,就有了一个主题。
这个问题可以用多种方法解决。
上面的api主要用于开发环境中,在完成模式之前通常要经过迭代。虽然不建议在生产环境中使用,但在生产环境中使用这些api的情况很少,但要非常小心。
要注册的新架构与现有架构版本之一存在兼容性问题
需要为同一主题重新注册旧版本的架构
该模式仅用于实时流媒体系统,不再需要旧版本
一个主题需要循环利用
请注意,在使用“删除主题”或删除唯一可用的架构版本时,主题的任何已注册兼容性设置也将被删除,这一点也很重要。
第二种方法是开始向新主题发送消息。按照这些步骤做,你会没事的。
更新producer将数据发送到新主题,新主题将在schema registry中注册更新的schema
验证此主题的所有使用者的滞后为零
更新使用者以使用新主题中的数据
从Kafka中删除旧主题
a64a0gku3#
您执行seek太晚了-在
poll()
获取记录;您需要在中执行查找通过呼叫
consumerSeekCallback.seekToEnd(...)
在那里。搜索将在poll()
获取记录。您也可以使用
kafka-consumer-groups
命令行工具,用于设置组/主题/分区的任意偏移量。目前的引导版本是2.0.4,Kafka2.1.8。
另外,不应该实现传递给您的回调。
文件看起来很清楚。。。
在使用组管理时,当分配发生更改时调用第二种方法。您可以使用此方法,例如,通过调用回调来设置分区的初始偏移量;必须使用回调参数,而不是传递到registerseekcallback的参数。
…如果不是,我们应该改变什么?