最近,我将我们的流媒体应用程序从spark streaming 2.1切换到kafka streaming new api(1.0)和kafka broker server 0.11.0.0
我已经实现了自己的处理器类,在process方法中,我只打印了消息内容。
我有一个由3台机器组成的kafka集群,我正在研究的主题有300个分区。
我在一台有32gb内存和8核的机器上运行了100线程的流媒体应用程序。
我的问题是,在某些情况下,当它到达Kafka主题/分区时,我就收到了消息,而在其他情况下,我在它到达主题后10-15分钟收到了消息,不知道为什么!
我使用下面的命令行跟踪流应用程序group.id的kafka主题的延迟。 ./bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --new-consumer --describe --group kf_streaming_gp_id
但不幸的是,它并没有始终如一地给出准确的结果,甚至根本不给出结果,任何机构知道为什么吗?
流媒体应用程序中是否有我遗漏的东西,以便我可以在到达分区后一致地读取消息?任何消费者属性都可以解决这样的问题。
我的Kafka流媒体应用程序结构如下:
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "kf_streaming_gp_id");
config.put(StreamsConfig.CLIENT_ID_CONFIG, "kf_streaming_gp_id");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, DocumentSerde.class);
config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimeExtractor.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 100);
KStream<String, Document> topicStreams = builder.stream(sourceTopic);
topicStreams.process(() -> new DocumentProcessor(appName, environment, dimensions, vector, sinkTopic));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
1条答案
按热度按时间zc0qhyus1#
我知道我的案子有什么问题。
结果发现,有线程一直在做高cpu密集型的工作,这导致阻止其他线程消耗消息,这就是为什么我看到这样的突发事件,当我停止这种cpu密集型逻辑时,一切都非常快,消息到达kafka主题后就进入流式处理作业。