我有一个kafka streams应用程序版本-0.11,它从几个主题中获取数据,并将数据合并到另一个主题中。
Kafka配置:
5 kafka brokers - version 0.11
Kafka Topics - 15 partitions and 3 replication factor.
每小时消耗/产生几百万张唱片。每当我搞垮Kafka经纪人时,都会抛出以下例外:
org.apache.kafka.streams.errors.LockException: task [4_10] Failed to lock the state directory for task 4_10
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:99)
at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:80)
at org.apache.kafka.streams.processor.internals.StandbyTask.<init>(StandbyTask.java:62)
at org.apache.kafka.streams.processor.internals.StreamThread.createStandbyTask(StreamThread.java:1325)
at org.apache.kafka.streams.processor.internals.StreamThread.access$2400(StreamThread.java:73)
at org.apache.kafka.streams.processor.internals.StreamThread$StandbyTaskCreator.createTask(StreamThread.java:313)
at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
at org.apache.kafka.streams.processor.internals.StreamThread.addStandbyTasks(StreamThread.java:1366)
at org.apache.kafka.streams.processor.internals.StreamThread.access$1200(StreamThread.java:73)
at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:185)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
我读过几篇关于jira的文章,认为清理流可能有助于解决这个问题。但是每次启动kafka流应用程序时清除流是正确的解决方案还是修补程序?而且,流清理会延迟应用程序的启动,对吗?
注意:每次启动kafka streams应用程序时,是否需要在调用streams.start()之前调用streams.cleanup()
1条答案
按热度按时间iqjalb3h1#
看到一个
org.apache.kafka.streams.errors.LockException: task [4_10] Failed to lock the state directory for task 4_10
实际上是预期的,应该自行解决。线程将退出,以便等待另一个线程释放锁并稍后重试。因此,您甚至可能会看到这个警告消息是多次日志,以防在第二个线程释放锁之前重试。但是,最终锁应该由第二个线程释放,第一个线程将能够获得锁。之后,溪流应该继续向前。注意,这是一个警告消息,而不是错误。