我正在使用camel kafka(版本:3.2.0)来消费消息。Kafka的消费者没有顺利退订,导致 WakeupException
,不知道为什么会这样。
谢谢你的帮助
下面是流程图
kafka服务使用主题中的事件
呼叫服务器
当服务器在40毫秒内没有响应时,抛出serverunavailableexception
阻止Kafka消费者谈这个主题
取消订阅主题testtopicv1 Error unsubscribing testTopicV1-Thread 0 from kafka topic testTopicV1. Caused by: [org.apache.kafka.common.errors.WakeupException - null]
```
2021-01-31 05:48:23.143+0000 WARN Camel (camel-1) thread #6 - KafkaConsumer[testTopicV1] [camel.component.kafka.KafkaConsumer(log:212)] Error unsubscribing testTopicV1-Thread 0 from kafka topic testTopicV1. Caused by: [org.apache.kafka.common.errors.WakeupException - null]
2021-01-31 05:48:23.143+0000 INFO Camel (camel-1) thread #7 - KafkaConsumer[testTopicV1] [camel.component.kafka.KafkaConsumer(doRun:397)] Unsubscribing testTopicV1-Thread 1 from topic testTopicV1
org.apache.kafka.common.errors.WakeupException: null
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:511) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:275) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:937) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1473) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1431) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.commitOffset(KafkaConsumer.java:432) ~[camel-kafka-3.2.0.jar!/:3.2.0]
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.onPartitionsRevoked(KafkaConsumer.java:455) ~[camel-kafka-3.2.0.jar!/:3.2.0]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:291) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:707) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe(KafkaConsumer.java:1073) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:400) [camel-kafka-3.2.0.jar!/:3.2.0]
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:214) [camel-kafka-3.2.0.jar!/:3.2.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:1.8.0_271]
at java.util.concurrent.FutureTask.run(Unknown Source) [?:1.8.0_271]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:1.8.0_271]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:1.8.0_271]
at java.lang.Thread.run(Unknown Source) [?:1.8.0_271]
完整堆栈跟踪
2021-01-31 05:48:13.138+0000 ERROR Camel (camel-1) thread #7 - KafkaConsumer[testTopicV1] [camel.processor.errorhandler.DefaultErrorHandler(log:203)] Failed delivery for (MessageId: ID-TestId-1611647443253-0-25 on ExchangeId: ID-TestId-1611647443253-0-25). Exhausted after delivery attempt: 1 caught: com.example.exception.ServerUnavailableException: RestClientException during rest call
Message History (complete message history is disabled)
RouteId ProcessorId Processor Elapsed (ms)
[route1 ] [route1 ] [from[kafka://testTopicV1?allowManualCommit=True&autoCommitEnable=False] [ 40027]
...
[route1 ] [to1 ] [bean:kafkaProcessor ] [ 0]
Stacktrace
com.example.exception.ServerUnavailableException: RestClientException during rest call
at com.example.processor.KafkaProcessor.process(KafkaProcessor.java:46) ~[classes!/:?]
at org.apache.camel.component.bean.AbstractBeanProcessor.process(AbstractBeanProcessor.java:117) ~[camel-bean-3.2.0.jar!/:3.2.0]
at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:56) ~[camel-bean-3.2.0.jar!/:3.2.0]
at org.apache.camel.component.bean.BeanProducer.process(BeanProducer.java:41) ~[camel-bean-3.2.0.jar!/:3.2.0]
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:166) ~[camel-base-3.2.0.jar!/:3.2.0]
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:702) [camel-base-3.2.0.jar!/:3.2.0]
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:616) [camel-base-3.2.0.jar!/:3.2.0]
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148) [camel-base-3.2.0.jar!/:3.2.0]
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:60) [camel-base-3.2.0.jar!/:3.2.0]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:147) [camel-base-3.2.0.jar!/:3.2.0]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:286) [camel-base-3.2.0.jar!/:3.2.0]
at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83) [camel-base-3.2.0.jar!/:3.2.0]
at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:40) [camel-support-3.2.0.jar!/:3.2.0]
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:338) [camel-kafka-3.2.0.jar!/:3.2.0]
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:214) [camel-kafka-3.2.0.jar!/:3.2.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:1.8.0_271]
at java.util.concurrent.FutureTask.run(Unknown Source) [?:1.8.0_271]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:1.8.0_271]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:1.8.0_271]
at java.lang.Thread.run(Unknown Source) [?:1.8.0_271]
Caused by: org.springframework.web.client.ResourceAccessException: I/O error on POST request for "http://xxxxxxx": Read timed out; nested exception is java.net.SocketTimeoutException: Read timed out
at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:746) ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE]
at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:672) ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE]
at org.springframework.web.client.RestTemplate.postForEntity(RestTemplate.java:447) ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE]
... 21 more
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method) ~[?:1.8.0_271]
at java.net.SocketInputStream.socketRead(Unknown Source) ~[?:1.8.0_271]
at java.net.SocketInputStream.read(Unknown Source) ~[?:1.8.0_271]
at java.net.SocketInputStream.read(Unknown Source) ~[?:1.8.0_271]
at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137) ~[httpcore-4.4.13.jar!/:4.4.13]
at org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153) ~[httpcore-4.4.13.jar!/:4.4.13]
at org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:280) ~[httpcore-4.4.13.jar!/:4.4.13]
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138) ~[httpclient-4.5.12.jar!/:4.5.12]
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56) ~[httpclient-4.5.12.jar!/:4.5.12]
at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:259) ~[httpcore-4.4.13.jar!/:4.4.13]
at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163) ~[httpcore-4.4.13.jar!/:4.4.13]
at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:157) ~[httpclient-4.5.12.jar!/:4.5.12]
at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273) ~[httpcore-4.4.13.jar!/:4.4.13]
at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125) ~[httpcore-4.4.13.jar!/:4.4.13]
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272) ~[httpclient-4.5.12.jar!/:4.5.12]
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) ~[httpclient-4.5.12.jar!/:4.5.12]
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89) ~[httpclient-4.5.12.jar!/:4.5.12]
at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110) ~[httpclient-4.5.12.jar!/:4.5.12]
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) ~[httpclient-4.5.12.jar!/:4.5.12]
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) ~[httpclient-4.5.12.jar!/:4.5.12]
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) ~[httpclient-4.5.12.jar!/:4.5.12]
at org.springframework.http.client.HttpComponentsClientHttpRequest.executeInternal(HttpComponentsClientHttpRequest.java:87) ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE]
at org.springframework.http.client.AbstractBufferingClientHttpRequest.executeInternal(AbstractBufferingClientHttpRequest.java:48) ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE]
at org.springframework.http.client.AbstractClientHttpRequest.execute(AbstractClientHttpRequest.java:53) ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE]
at org.springframework.http.client.InterceptingClientHttpRequest$InterceptingRequestExecution.execute(InterceptingClientHttpRequest.java:109) ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE]
at org.springframework.boot.actuate.metrics.web.client.MetricsClientHttpRequestInterceptor.intercept(MetricsClientHttpRequestInterceptor.java:95) ~[spring-boot-actuator-2.2.11.RELEASE.jar!/:2.2.11.RELEASE]
at org.springframework.http.client.InterceptingClientHttpRequest$InterceptingRequestExecution.execute(InterceptingClientHttpRequest.java:93) ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE]
at org.springframework.http.client.InterceptingClientHttpRequest.executeInternal(InterceptingClientHttpRequest.java:77) ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE]
at org.springframework.http.client.AbstractBufferingClientHttpRequest.executeInternal(AbstractBufferingClientHttpRequest.java:48) ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE]
at org.springframework.http.client.AbstractClientHttpRequest.execute(AbstractClientHttpRequest.java:53) ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE]
at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:737) ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE]
at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:672) ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE]
at org.springframework.web.client.RestTemplate.postForEntity(RestTemplate.java:447) ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE]
... 21 more
2021-01-31 05:48:13.139+0000 INFO Camel (camel-1) thread #7 - KafkaConsumer[testTopicV1] [camel.component.kafka.KafkaConsumer(doStop:141)] Stopping Kafka consumer on topic: testTopicV1
2021-01-31 05:48:15.140+0000 INFO Camel (camel-1) thread #7 - KafkaConsumer[testTopicV1] [camel.impl.engine.BaseExecutorServiceManager(awaitTermination:405)] Waited 2.000 seconds for ExecutorService: org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0[Shutting down, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0][KafkaConsumer[testTopicV1]] to terminate...
2021-01-31 05:48:17.141+0000 INFO Camel (camel-1) thread #7 - KafkaConsumer[testTopicV1] [camel.impl.engine.BaseExecutorServiceManager(awaitTermination:405)] Waited 4.001 seconds for ExecutorService: org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0[Shutting down, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0][KafkaConsumer[testTopicV1]] to terminate...
2021-01-31 05:48:19.142+0000 INFO Camel (camel-1) thread #7 - KafkaConsumer[testTopicV1] [camel.impl.engine.BaseExecutorServiceManager(awaitTermination:405)] Waited 6.001 seconds for ExecutorService: org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0[Shutting down, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0][KafkaConsumer[testTopicV1]] to terminate...
2021-01-31 05:48:21.147+0000 INFO Camel (camel-1) thread #7 - KafkaConsumer[testTopicV1] [camel.impl.engine.BaseExecutorServiceManager(awaitTermination:405)] Waited 8.007 seconds for ExecutorService: org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0[Shutting down, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0][KafkaConsumer[testTopicV1]] to terminate...
2021-01-31 05:48:23.141+0000 INFO Camel (camel-1) thread #7 - KafkaConsumer[testTopicV1] [camel.impl.engine.BaseExecutorServiceManager(awaitTermination:405)] Waited 10.000 seconds for ExecutorService: org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0[Shutting down, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0][KafkaConsumer[testTopicV1]] to terminate...
2021-01-31 05:48:23.141+0000 WARN Camel (camel-1) thread #7 - KafkaConsumer[testTopicV1] [camel.impl.engine.BaseExecutorServiceManager(doShutdown:305)] Forcing shutdown of ExecutorService: org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0[Shutting down, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0][KafkaConsumer[testTopicV1]] due first await termination elapsed.
2021-01-31 05:48:23.141+0000 WARN Camel (camel-1) thread #7 - KafkaConsumer[testTopicV1] [camel.impl.engine.BaseExecutorServiceManager(doShutdown:314)] Forcing shutdown of ExecutorService: org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0[Shutting down, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0][KafkaConsumer[testTopicV1]] due interrupted.
2021-01-31 05:48:23.141+0000 INFO Camel (camel-1) thread #7 - KafkaConsumer[testTopicV1] [camel.impl.engine.BaseExecutorServiceManager(doShutdown:322)] Shutdown of ExecutorService: org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0[Shutting down, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0][KafkaConsumer[testTopicV1]] is shutdown: true and terminated: false took: 10.001 seconds.
2021-01-31 05:48:23.142+0000 INFO Camel (camel-1) thread #6 - KafkaConsumer[testTopicV1] [camel.component.kafka.KafkaConsumer(doRun:397)] Unsubscribing testTopicV1-Thread 0 from topic testTopicV1
2021-01-31 05:48:23.143+0000 INFO Camel (camel-1) thread #6 - KafkaConsumer[testTopicV1] [clients.consumer.internals.ConsumerCoordinator(invokePartitionsRevoked:286)] [Consumer clientId=consumer-TestService-5, groupId=TestService] Revoke previously assigned partitions testTopicV1-13, testTopicV1-12, testTopicV1-14, testTopicV1-11, testTopicV1-10
2021-01-31 05:48:23.142+0000 WARN Camel (camel-1) thread #7 - KafkaConsumer[testTopicV1] [camel.component.kafka.KafkaConsumer(log:212)] Error during processing. Exchange[ID-TestId-1611647443253-0-25]. Caused by: [ServerUnavailableException - RestClientException during rest call]
ServerUnavailableException: RestClientException during rest call
2021-01-31 05:48:23.143+0000 INFO kafka-coordinator-heartbeat-thread | TestService [clients.consumer.internals.AbstractCoordinator(markCoordinatorUnknown:808)] [Consumer clientId=consumer-TestService-5, groupId=TestService] Group coordinator KafkaBroker3:9093 (id: 2147482641 rack: null) is unavailable or invalid, will attempt rediscovery
2021-01-31 05:48:23.143+0000 WARN Camel (camel-1) thread #6 - KafkaConsumer[testTopicV1] [camel.component.kafka.KafkaConsumer(log:212)] Error unsubscribing testTopicV1-Thread 0 from kafka topic testTopicV1. Caused by: [org.apache.kafka.common.errors.WakeupException - null]
2021-01-31 05:48:23.143+0000 INFO Camel (camel-1) thread #7 - KafkaConsumer[testTopicV1] [camel.component.kafka.KafkaConsumer(doRun:397)] Unsubscribing testTopicV1-Thread 1 from topic testTopicV1
org.apache.kafka.common.errors.WakeupException: null
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:511) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:275) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:937) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1473) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1431) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.commitOffset(KafkaConsumer.java:432) ~[camel-kafka-3.2.0.jar!/:3.2.0]
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.onPartitionsRevoked(KafkaConsumer.java:455) ~[camel-kafka-3.2.0.jar!/:3.2.0]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:291) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:707) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe(KafkaConsumer.java:1073) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:400) [camel-kafka-3.2.0.jar!/:3.2.0]
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:214) [camel-kafka-3.2.0.jar!/:3.2.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:1.8.0_271]
at java.util.concurrent.FutureTask.run(Unknown Source) [?:1.8.0_271]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:1.8.0_271]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:1.8.0_271]
at java.lang.Thread.run(Unknown Source) [?:1.8.0_271]
2021-01-31 05:48:23.143+0000 INFO Camel (camel-1) thread #7 - KafkaConsumer[testTopicV1] [clients.consumer.internals.ConsumerCoordinator(invokePartitionsRevoked:286)] [Consumer clientId=consumer-TestService-6, groupId=TestService] Revoke previously assigned partitions testTopicV1-24, testTopicV1-21, testTopicV1-20, testTopicV1-23, testTopicV1-22
2021-01-31 05:48:23.143+0000 WARN Camel (camel-1) thread #6 - KafkaConsumer[testTopicV1] [clients.consumer.internals.AbstractCoordinator(closeHeartbeatThread:367)] [Consumer clientId=consumer-TestService-5, groupId=TestService] Interrupted while waiting for consumer heartbeat thread to close
2021-01-31 05:48:23.144+0000 INFO Camel (camel-1) thread #6 - KafkaConsumer[testTopicV1] [clients.consumer.internals.ConsumerCoordinator(invokePartitionsRevoked:286)] [Consumer clientId=consumer-TestService-5, groupId=TestService] Revoke previously assigned partitions testTopicV1-13, testTopicV1-12, testTopicV1-14, testTopicV1-11, testTopicV1-10
2021-01-31 05:48:23.144+0000 WARN Camel (camel-1) thread #7 - KafkaConsumer[testTopicV1] [camel.component.kafka.KafkaConsumer(log:212)] Error unsubscribing testTopicV1-Thread 1 from kafka topic testTopicV1. Caused by: [org.apache.kafka.common.errors.WakeupException - null]
org.apache.kafka.common.errors.WakeupException: null
....
2021-01-31 05:48:23.144+0000 WARN Camel (camel-1) thread #7 - KafkaConsumer[testTopicV1] [clients.consumer.internals.AbstractCoordinator(closeHeartbeatThread:367)] [Consumer clientId=consumer-TestService-6, groupId=TestService] Interrupted while waiting for consumer heartbeat thread to close
2021-01-31 05:48:23.144+0000 INFO Camel (camel-1) thread #7 - KafkaConsumer[testTopicV1] [clients.consumer.internals.ConsumerCoordinator(invokePartitionsRevoked:286)] [Consumer clientId=consumer-TestService-6, groupId=TestService] Revoke previously assigned partitions testTopicV1-24, testTopicV1-21, testTopicV1-20, testTopicV1-23, testTopicV1-22
2021-01-31 05:48:23.144+0000 ERROR Camel (camel-1) thread #6 - KafkaConsumer[testTopicV1] [kafka.clients.consumer.KafkaConsumer(close:2297)] [Consumer clientId=consumer-TestService-5, groupId=TestService] Failed to close coordinator
org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeThrowInterruptException(ConsumerNetworkClient.java:517) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:278) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:230) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:932) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1473) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1431) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.commitOffset(KafkaConsumer.java:432) ~[camel-kafka-3.2.0.jar!/:3.2.0]
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.onPartitionsRevoked(KafkaConsumer.java:455) ~[camel-kafka-3.2.0.jar!/:3.2.0]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:291) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:707) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:887) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:832) ~[kafka-clients-2.4.0.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2294) [kafka-clients-2.4.0.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2261) [kafka-clients-2.4.0.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2211) [kafka-clients-2.4.0.jar!/:?]
at org.apache.camel.util.IOHelper.close(IOHelper.java:341) [camel-util-3.2.0.jar!/:3.2.0]
at org.apache.camel.util.IOHelper.close(IOHelper.java:403) [camel-util-3.2.0.jar!/:3.2.0]
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:419) [camel-kafka-3.2.0.jar!/:3.2.0]
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:214) [camel-kafka-3.2.0.jar!/:3.2.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:1.8.0_271]
at java.util.concurrent.FutureTask.run(Unknown Source) [?:1.8.0_271]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:1.8.0_271]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:1.8.0_271]
at java.lang.Thread.run(Unknown Source) [?:1.8.0_271]
Caused by: java.lang.InterruptedException
... 26 more
暂无答案!
目前还没有任何答案,快来回答吧!