如何在spark streaming 2.3.1中为多个Kafka主题编写每条记录?

v1uwarro  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(269)

如何在spark streaming 2.3.1中为多个Kafka主题编写每条记录?换句话说,假设我有5个记录和两个输出Kafka主题,我希望所有5个记录都在两个输出主题中。
这里的问题不涉及结构化流媒体案例。我正在寻找特定的结构化流媒体。

mzmfm0qo

mzmfm0qo1#

不确定您使用的是java还是scala。下面是生成两个不同主题的消息的代码。你得打电话

dataset.foreachPartition(partionsrows => {
      val props = new util.HashMap[String, Object]()
      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer)
      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringSerializer")
      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringSerializer")
      val producer = new KafkaProducer[String, String](props)
      partionsrows.foreach(row => {
        val offerId = row.get(0).toString.replace("[", "").replace("]", "")
        val message1 = new ProducerRecord[String, String]("topic1", "message")
        producer.send(message1)
        val message2 = new ProducerRecord[String, String]("topic2",  "message")
        producer.send(message2)
      })
    })

相关问题