FlinkKafkaException(闪光Kafka例外):向Kafka发送数据失败:'topic'的19个记录过期:自批处理创建以来已过去120001毫秒

f45qwnt8  于 2022-12-09  发布在  Apache
关注(0)|答案(2)|浏览(392)

I'm using Flink 1.13.1, and trying to write data to kafka with an RPS(rate per second) of 10k records. I have a kafka cluster of 30 brokers, my flink job does a filter operator and just sinks data to kafka. Below is my producer setting, Initially I have 5 sink topic partitions, but now 10, still I'm getting the same issue. Also I tried to set request.timeout.ms to 1 min instead of kafka default 30 sec, but still getting 120001 ms has passed since batch creation . Due to this error, checkpointing is getting failed. The checkpoint size is just 107kb as flink only committing offset to kafka. Job parallelism is 36. Here is the sink kafka properties and full stacktrace.

private static Properties kafkaSinkProperties() {
    val properties = new Properties();
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
    properties.setProperty("bootstrap.servers", KafkaServers.SINK_KAFKA_BOOTSTRAP_SERVER);
    properties.setProperty("request.timeout.ms", "60000");
    return properties;
}

FlinkKafkaProducer<Map<String, Object>> kafkaProducer =
        new FlinkKafkaProducer<Map<String, Object>>(
            KafkaTopics.TOPIC, jsonSerde, kafkaSinkProperties());
filterStream.addSink(kafkaProducer);
java.lang.Exception: Could not perform checkpoint 6984 for operator Source: source -> Filter -> Sink: Unnamed (42/48)#3.
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:1000)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$7(StreamTask.java:960)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 6984 for operator Source: source -> Filter -> Sink: Unnamed (42/48)#3. Failure reason: Checkpoint was declined.
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:264)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:169)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:706)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:627)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:590)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:312)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1086)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1070)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:988)
    ... 13 more
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 20 record(s) for topic-1:120000 ms has passed since batch creation
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1095)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:1002)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:99)
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:320)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:1100)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:218)
    ... 23 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 20 record(s) for topic-1:120000 ms has passed since batch creation
irlmq6kh

irlmq6kh1#

Kafka is overloaded, the easiest way is to add more resources to your kafka.
There are some minor things you can do:

  • decrease parallelism (less data will be send to kafka at once)
  • do IO kafka operations less often
  • set kafka acks=1 instead of acks=all
  • increase checkpointing trigger interval
uttx8gqw

uttx8gqw2#

It is for sure a sign of Flink-Kafka sending problems. It can be that Kafka is overloaded or you have some very important connectivity issues in your setup. For sure you can optimise your software or hardware setup, but also you can consider to fully understand the nature of your error.
I think the philosophy of Flink-Kafka interaction is that most of config should be done in Flink operator itself.
Thus I think it worth to consider next options for Kafka Driver:

  • delivery.timeout.ms - default 2 minutes
  • retries - default Integer.MAX_VALUE, you probably do not want to touch it

相关问题