我们在使用springcloud和kafka时遇到了一些问题,有时我们的微服务会抛出一个错误 UnkownProducerIdException
,这是由于参数 transactional.id.expiration.ms
在代理端过期。
我的问题是,是否可以捕获该异常并重试失败的消息?如果是的话,处理它的最佳选择是什么?
我看了一下:
- https://cwiki.apache.org/confluence/pages/viewpage.action?pageid=89068820
-Kafka未知\u生产者\u id异常
我们用的是Spring CloudHoxton.RELEASE
Kafka版本和 Spring 版2.2.4.RELEASE
我们使用的是aws-kafka解决方案,因此不能对我前面提到的属性设置新值。
以下是一些例外情况:
2020-04-07 20:54:00.563 ERROR 5188 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-2] The broker returned org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception. for topic-partition test.produce.another-2 with producerId 35000, epoch 0, and sequence number 8
2020-04-07 20:54:00.563 INFO 5188 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-2] ProducerId set to -1 with epoch -1
2020-04-07 20:54:00.565 ERROR 5188 --- [ad | producer-2] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='{...}' to topic <some-topic>:
要重现此异常:
-我使用了汇合docker图像并设置了环境变量 KAFKA_TRANSACTIONAL_ID_EXPIRATION_MS
到10秒,这样我就不会等待太多的时间来抛出这个异常。
-在另一个进程中,以10秒的间隔逐个发送java将要侦听的主题中的1条消息。
下面是一个代码示例:
文件绑定.java
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface Bindings {
@Input("test-input")
SubscribableChannel testListener();
@Output("test-output")
MessageChannel testProducer();
}
文件application.yml(不要忘记设置环境变量 KAFKA_HOST
):
spring:
cloud:
stream:
kafka:
binder:
auto-create-topics: true
brokers: ${KAFKA_HOST}
transaction:
producer:
error-channel-enabled: true
producer-properties:
acks: all
retry.backoff.ms: 200
linger.ms: 100
max.in.flight.requests.per.connection: 1
enable.idempotence: true
retries: 3
compression.type: snappy
request.timeout.ms: 5000
key.serializer: org.apache.kafka.common.serialization.StringSerializer
consumer-properties:
session.timeout.ms: 20000
max.poll.interval.ms: 350000
enable.auto.commit: true
allow.auto.create.topics: true
auto.commit.interval.ms: 12000
max.poll.records: 5
isolation.level: read_committed
configuration:
auto.offset.reset: latest
bindings:
test-input:
# contentType: text/plain
destination: test.produce
group: group-input
consumer:
maxAttempts: 3
startOffset: latest
autoCommitOnError: true
queueBufferingMaxMessages: 100000
autoCommitOffset: true
test-output:
# contentType: text/plain
destination: test.produce.another
group: group-output
producer:
acks: all
debug: true
侦听器处理程序:
@SpringBootApplication
@EnableBinding(Bindings.class)
public class PocApplication {
private static final Logger log = LoggerFactory.getLogger(PocApplication.class);
public static void main(String[] args) {
SpringApplication.run(PocApplication.class, args);
}
@Autowired
private BinderAwareChannelResolver binderAwareChannelResolver;
@StreamListener(Topics.TESTLISTENINPUT)
public void listen(Message<?> in, String headerKey) {
final MessageBuilder builder;
MessageChannel messageChannel;
messageChannel = this.binderAwareChannelResolver.resolveDestination("test-output");
Object payload = in.getPayload();
builder = MessageBuilder.withPayload(payload);
try {
log.info("Event received: {}", in);
if (!messageChannel.send(builder.build())) {
log.error("Something happend trying send the message! {}", in.getPayload());
}
log.info("Commit success");
} catch (UnknownProducerIdException e) {
log.error("UnkownProducerIdException catched ", e);
} catch (KafkaException e) {
log.error("KafkaException catched ", e);
}catch (Exception e) {
System.out.println("Commit failed " + e.getMessage());
}
}
}
当做
1条答案
按热度按时间enyaitl31#
要捕获异常,需要设置
sync
Kafka生产者财产(https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.3.release/reference/html/spring-cloud-stream-binder-kafka.html#kafka-生产商财产)。否则,错误将异步返回你不应该“吃”那里的例外;它必须被抛出回容器,以便容器回滚事务。
也,
提交是在流侦听器返回到容器之后由容器执行的,因此在这里您永远不会看到提交错误;同样,必须让异常传播回容器。
容器将根据使用者绑定的重试配置重试传递。