I'm using Flink 1.13.1, and trying to write data to kafka with an RPS(rate per second) of 10k records. I have a kafka cluster of 30 brokers, my flink job does a filter operator and just sinks data to kafka. Below is my producer setting, Initially I have 5 sink topic partitions, but now 10, still I'm getting the same issue. Also I tried to set request.timeout.ms
to 1 min instead of kafka default 30 sec, but still getting 120001 ms has passed since batch creation
. Due to this error, checkpointing is getting failed. The checkpoint size is just 107kb as flink only committing offset to kafka. Job parallelism is 36. Here is the sink kafka properties and full stacktrace.
private static Properties kafkaSinkProperties() {
val properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
properties.setProperty("bootstrap.servers", KafkaServers.SINK_KAFKA_BOOTSTRAP_SERVER);
properties.setProperty("request.timeout.ms", "60000");
return properties;
}
FlinkKafkaProducer<Map<String, Object>> kafkaProducer =
new FlinkKafkaProducer<Map<String, Object>>(
KafkaTopics.TOPIC, jsonSerde, kafkaSinkProperties());
filterStream.addSink(kafkaProducer);
java.lang.Exception: Could not perform checkpoint 6984 for operator Source: source -> Filter -> Sink: Unnamed (42/48)#3.
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:1000)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$7(StreamTask.java:960)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 6984 for operator Source: source -> Filter -> Sink: Unnamed (42/48)#3. Failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:264)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:169)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:706)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:627)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:590)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:312)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1086)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1070)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:988)
... 13 more
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 20 record(s) for topic-1:120000 ms has passed since batch creation
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1095)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:1002)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:99)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:320)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:1100)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:218)
... 23 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 20 record(s) for topic-1:120000 ms has passed since batch creation
2条答案
按热度按时间irlmq6kh1#
Kafka is overloaded, the easiest way is to add more resources to your kafka.
There are some minor things you can do:
acks=1
instead ofacks=all
uttx8gqw2#
It is for sure a sign of Flink-Kafka sending problems. It can be that Kafka is overloaded or you have some very important connectivity issues in your setup. For sure you can optimise your software or hardware setup, but also you can consider to fully understand the nature of your error.
I think the philosophy of Flink-Kafka interaction is that most of config should be done in Flink operator itself.
Thus I think it worth to consider next options for Kafka Driver: