如何在spark streaming 2.3.1中为多个Kafka主题编写每条记录?换句话说,假设我有5个记录和两个输出Kafka主题,我希望所有5个记录都在两个输出主题中。这里的问题不涉及结构化流媒体案例。我正在寻找特定的结构化流媒体。
mzmfm0qo1#
不确定您使用的是java还是scala。下面是生成两个不同主题的消息的代码。你得打电话
dataset.foreachPartition(partionsrows => { val props = new util.HashMap[String, Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, String](props) partionsrows.foreach(row => { val offerId = row.get(0).toString.replace("[", "").replace("]", "") val message1 = new ProducerRecord[String, String]("topic1", "message") producer.send(message1) val message2 = new ProducerRecord[String, String]("topic2", "message") producer.send(message2) }) })
1条答案
按热度按时间mzmfm0qo1#
不确定您使用的是java还是scala。下面是生成两个不同主题的消息的代码。你得打电话