将kafka消费者和生产者集成在一个函数中

xvw2m8pv  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(431)

我们需要开发一个代码,在这个代码中,一个消费者运行时聆听一个特定的Kafka生产者,然后在同一个函数中产生一个处理过的数据,从当前消费的一个到另一个生产者主题。
这是将flinks代码与java集成,其中java生成一条到一个主题的消息,flink使用该消息并生成一个到另一个主题的新数据,以便java进一步处理它。
请告诉我们是否有其他方法来完成这个过程。

lpwwtiir

lpwwtiir1#

这听起来像是“读过程写”模式。您可以使用kafka的事务功能使这个过程原子化(或者不原子化,这取决于您,但下面的示例使用事务):

KafkaProducer producer = createKafkaProducer(
  "bootstrap.servers", "localhost:9092",
  "transactional.id", "my-transactional-id");

producer.initTransactions();

KafkaConsumer consumer = createKafkaConsumer(
  "bootstrap.servers", "localhost:9092",
  "group.id", "my-group-id",
  "isolation.level", "read_committed");

consumer.subscribe(singleton("inputTopic"));

while (true) {
  ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
  producer.beginTransaction();
  for (ConsumerRecord record : records)
    producer.send(producerRecord("outputTopic", record));
  producer.sendOffsetsToTransaction(currentOffsets(consumer), group);  
  producer.commitTransaction();
}

这是将flinks代码与java集成,其中java生成一条到一个主题的消息,flink使用该消息并生成一个到另一个主题的新数据,以便java进一步处理它。
你可以考虑Kafka流:https://docs.confluent.io/current/streams/developer-guide/index.html

ukdjmx9f

ukdjmx9f2#

flink与kafka很好地集成,如果需要,可以利用kafka事务。这样的应用程序如下所示:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer011<> consumer = new FlinkKafkaConsumer011<IN>(topic_in, serializer_in, kafkaProperties);
FlinkKafkaProducer011<> producer = new FlinkKafkaProducer011<OUT>(broker, topic_out, serializer_out)

env.addSource(consumer)
   .map(new SuitableTransformation())
   .addSink(producer)
   .execute()
hm2xizp9

hm2xizp93#

这听起来像是使用 akka 河的好地方, val done = Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1")) .map(msg => ProducerMessage.Message(new ProducerRecord[Array[Byte], String]("topic2", msg.record.value), msg.committableOffset)) .via(Producer.flow(producerSettings)) .map(_.message.passThrough) .batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) => batch.updated(elem) } .mapAsync(3)(_.commitScaladsl()) .runWith(Sink.ignore)

相关问题