Kafka主题中丢失的消息

6qftjkof  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(550)

在生产记录中尝试时间戳时;我发现了一些奇怪的东西。在从生产者发送了几条消息之后,我运行kafka-console-consumer.sh并验证这些消息是否在主题中。我拦住制片人等了一会儿。当我重新运行kafka-console-consumer.sh时,它没有显示我以前生成的消息。我还添加了producer.flush()和producer.close(),但结果还是一样的。
现在,当我停止使用timestamp字段时,一切都正常工作,这让我相信,对于带有timestamp的消息有一些挑剔的地方。
我正在使用Kafka2.11-2.0.0(于2018年7月30日发布)
下面是示例代码。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internal.RecordHeaders;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import static java.lang.Thread.sleep;
public class KafkaProducerSample{
    public static void main(String[] args){
        String kafkaHost="sample:port";
        String notificationTopic="test";

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaHost);
        props.put(ProducerConfig.ACKS_CONFIG, 1);
        props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);

        Producer<String, String> producer = new KafkaProducer(props, new StringSerialize(), new StringSerializer);

        RecordHeaders recordHeaders = new RecordHeader();
        ProducerRecord<String, String> record = new ProducerRecord(notificationTopic, null, 1574443515L, sampleKey, SampleValue);
        producer.send(record);
        sleep(1000);
    }
}

我运行console consumer如下

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap.server KAFKA_HOST:PORT --topic test --from-beginning

# output after running producer

test

# output 5mins after shutting down producer
8oomwypt

8oomwypt1#

您只异步发送一条记录,但不确认或刷新缓冲区。
你需要发送更多的记录,
或者

producer.send(record).get();

或者

producer.send(record);
producer.flush();

或(首选),做 Runtime.addShutdownHook() 在你的主要方法冲洗和关闭生产者

相关问题