我已经写了一个代码,每天将数据推送到Kafka主题,但有几个问题,我不确定这段代码是否能够处理。我的职责是从保存1天数据的活动表中推送完整的数据(每天早上刷新)
我的代码将查询“select*from mytable”并将它一个接一个地推送到kafka主题,就像推之前一样,我需要验证/更改每一行并推送到主题。
下面是我的制作人发送代码。
Properties configProperties = new Properties();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, sBOOTSTRAP_SERVERS_CONFIG);
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
configProperties.put("acks", "all");
configProperties.put("retries", 0);
configProperties.put("batch.size", 15000);
configProperties.put("linger.ms", 1);
configProperties.put("buffer.memory", 30000000);
@SuppressWarnings("resource")
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configProperties);
System.out.println("Starting Kafka producer job " + new Date());
producer.send(new ProducerRecord<String, String>(eventName, jsonRec.toString()), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
});
现在,我不知道如何在失败的情况下将数据再次推回到主题中。因为我已经从表中选择了所有的记录,但很少有失败的记录,我不知道是哪一个。
下面是我想说的
如何只处理那些没有被推送的记录,以避免重复记录被推送(避免冗余)。
如何验证推送的记录与表中的完全相同。我是说数据的完整性。比如数据的大小和被推送的记录数。
1条答案
按热度按时间wlp8pajw1#
你可以用
configProperties.put("enable.idempotence", true);
-它将尝试重试失败的消息,但确保每个记录中只有一条保存在kafka中。请注意,这意味着retries>0
acks=all
以及max.in.flight.requests.per.connection
>=0. 详情请查收https://kafka.apache.org/documentation/.对于第二个问题-如果你的意思是你需要保存所有记录或没有,那么你必须使用Kafka事务,这带来了更多的问题,我建议阅读https://www.confluent.io/blog/transactions-apache-kafka/