我现在正在使用Kafka流图书馆。
我要做的是:(使用kafka流)我试图从主题中消费,操纵消息值,确认消息并将结果传输到另一个主题(下面的代码)
Properties properties = new Properties();
.
.
properties.put("enable.auto.commit", false);
StreamBuilder builder = new StreamBuilder();
KStream kStream = builder.stream("MyTopic");
KafkaStream kafkaStream = new KafkaStream(builder.build(), properties)
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
kStream.forEach(new ForeachAction<String, String>(){
@Override
public void apply(Strings arg, String value){
//Just doing some simple data manipulation
String myValue = value + new Date().toString();
//Sending result to new topic
producer.send(new ProducerRecord<String,String>("MyTopicWithTimeStamp", myValue)):
// Problem (1) Here -> How do I acknowledge this from here manually
// Problem (2) How should I properly handle/close my producer (if at all)
}
});
kafkaStram.start();
我不知道怎么做:确认消息正确使用Kafka流库
2条答案
按热度按时间46scxncf1#
代替使用手动方法,您可以直接通过kafka流api编写@amethystic。
关于acknowlegdement,您可以在stream configuration中设置属性,以确保producer在发布到主题时接收到确认。
您可以参考流配置:https://kafka.apache.org/21/documentation/streams/developer-guide/config-streams.html#acks
erhoui1w2#
您可以使用kstream#直接写入kafka输出主题,而不是创建和维护自己的producer示例,如下所示: