我们需要开发一个代码,在这个代码中,一个消费者运行时聆听一个特定的Kafka生产者,然后在同一个函数中产生一个处理过的数据,从当前消费的一个到另一个生产者主题。这是将flinks代码与java集成,其中java生成一条到一个主题的消息,flink使用该消息并生成一个到另一个主题的新数据,以便java进一步处理它。请告诉我们是否有其他方法来完成这个过程。
lpwwtiir1#
这听起来像是“读过程写”模式。您可以使用kafka的事务功能使这个过程原子化(或者不原子化,这取决于您,但下面的示例使用事务):
KafkaProducer producer = createKafkaProducer( "bootstrap.servers", "localhost:9092", "transactional.id", "my-transactional-id"); producer.initTransactions(); KafkaConsumer consumer = createKafkaConsumer( "bootstrap.servers", "localhost:9092", "group.id", "my-group-id", "isolation.level", "read_committed"); consumer.subscribe(singleton("inputTopic")); while (true) { ConsumerRecords records = consumer.poll(Long.MAX_VALUE); producer.beginTransaction(); for (ConsumerRecord record : records) producer.send(producerRecord("outputTopic", record)); producer.sendOffsetsToTransaction(currentOffsets(consumer), group); producer.commitTransaction(); }
这是将flinks代码与java集成,其中java生成一条到一个主题的消息,flink使用该消息并生成一个到另一个主题的新数据,以便java进一步处理它。你可以考虑Kafka流:https://docs.confluent.io/current/streams/developer-guide/index.html
ukdjmx9f2#
flink与kafka很好地集成,如果需要,可以利用kafka事务。这样的应用程序如下所示:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); FlinkKafkaConsumer011<> consumer = new FlinkKafkaConsumer011<IN>(topic_in, serializer_in, kafkaProperties); FlinkKafkaProducer011<> producer = new FlinkKafkaProducer011<OUT>(broker, topic_out, serializer_out) env.addSource(consumer) .map(new SuitableTransformation()) .addSink(producer) .execute()
hm2xizp93#
这听起来像是使用 akka 河的好地方, val done = Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1")) .map(msg => ProducerMessage.Message(new ProducerRecord[Array[Byte], String]("topic2", msg.record.value), msg.committableOffset)) .via(Producer.flow(producerSettings)) .map(_.message.passThrough) .batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) => batch.updated(elem) } .mapAsync(3)(_.commitScaladsl()) .runWith(Sink.ignore)
val done = Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1")) .map(msg => ProducerMessage.Message(new ProducerRecord[Array[Byte], String]("topic2", msg.record.value), msg.committableOffset)) .via(Producer.flow(producerSettings)) .map(_.message.passThrough) .batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) => batch.updated(elem) } .mapAsync(3)(_.commitScaladsl()) .runWith(Sink.ignore)
3条答案
按热度按时间lpwwtiir1#
这听起来像是“读过程写”模式。您可以使用kafka的事务功能使这个过程原子化(或者不原子化,这取决于您,但下面的示例使用事务):
这是将flinks代码与java集成,其中java生成一条到一个主题的消息,flink使用该消息并生成一个到另一个主题的新数据,以便java进一步处理它。
你可以考虑Kafka流:https://docs.confluent.io/current/streams/developer-guide/index.html
ukdjmx9f2#
flink与kafka很好地集成,如果需要,可以利用kafka事务。这样的应用程序如下所示:
hm2xizp93#
这听起来像是使用 akka 河的好地方,
val done = Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1")) .map(msg => ProducerMessage.Message(new ProducerRecord[Array[Byte], String]("topic2", msg.record.value), msg.committableOffset)) .via(Producer.flow(producerSettings)) .map(_.message.passThrough) .batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) => batch.updated(elem) } .mapAsync(3)(_.commitScaladsl()) .runWith(Sink.ignore)