我们将kafkas3连接器用于日志管道,因为它保证了一次语义。但是,我们在不同的主题中遇到了两个数据丢失事件。我们在kafka connect worker的日志中发现一条可疑的错误消息,如下所示。
[2019-04-10 08:56:22,388] ERROR WorkerSinkTask{id=s3-sink-common-log-4} Commit of offsets threw an unexpected exception for sequence number 2721: {logging_common_log-9=OffsetAndMetadata{offset=4485661604, metadata=''}, logging_common_log-8=OffsetAndMetadata{offset=4485670359, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:260)
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:808)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:641)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:608)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1486)
at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitAsync(WorkerSinkTask.java:352)
at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:363)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:432)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:209)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
worker和connector的配置为:
{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"flush.size": "999999999",
"rotate.schedule.interval.ms": "60000",
"retry.backoff.ms": "5000",
"s3.part.retries": "3",
"s3.retry.backoff.ms": "200",
"s3.part.size": "26214400",
"tasks.max": "3",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"schema.compatibility": "NONE",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"timestamp.extractor": "Record",
"partition.duration.ms": "3600000",
"path.format": "YYYY/MM/dd/HH",
"timezone": "America/Los_Angeles",
"locale": "US",
"append.late.data": "false",
...
},
以及
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=25
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=5
rest.port=8083
plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/share/java
问题是:1。根本原因是什么?2如何预防?三。如何复制?
非常感谢您的任何提示/建议/类似体验!
暂无答案!
目前还没有任何答案,快来回答吧!