从spark流媒体向kafka推送数据

anauzrmj  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(382)

有人能提供我的样本代码推记录Kafka从Spark流?

rmbxnbpk

rmbxnbpk1#

使用spark流,您可以使用来自kafka主题的数据。
如果要将记录发布到kafka主题,可以使用kafka producer[https://cwiki.apache.org/confluence/display/kafka/0.8.0+producer+example]
或者可以使用kafka connect将数据发布到使用多个源连接器的kafka主题[http://www.confluent.io/product/connectors/]
有关spark流媒体和kafka集成的更多信息,请参见下面的链接。
http://spark.apache.org/docs/latest/streaming-kafka-integration.html

frebpwbc

frebpwbc2#

我用java做的。您可以在 JavaDStream<String> 作为 .foreachRDD() . 这不是最好的方式,因为它创造了 KafkaProducer 对于每个rdd,可以使用 KafkaProducers 就像spark文档中的socket示例。
这是我的密码:

public static class KafkaPublisher implements VoidFunction<JavaRDD<String>> {
    private static final long serialVersionUID = 1L;

    public void call(JavaRDD<String> rdd) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "loca192.168.0.155lhost:9092");
        props.put("acks", "1");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1000);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
            private static final long serialVersionUID = 1L;

            public void call(Iterator<String> partitionOfRecords) throws Exception {
                Producer<String, String> producer = new KafkaProducer<>(props);
                while(partitionOfRecords.hasNext()) {
                    producer.send(new ProducerRecord<String, String>("topic", partitionOfRecords.next()));
                }
                producer.close();
            }
        });
    }
}

相关问题