有人能提供我的样本代码推记录Kafka从Spark流?
rmbxnbpk1#
使用spark流,您可以使用来自kafka主题的数据。如果要将记录发布到kafka主题,可以使用kafka producer[https://cwiki.apache.org/confluence/display/kafka/0.8.0+producer+example]或者可以使用kafka connect将数据发布到使用多个源连接器的kafka主题[http://www.confluent.io/product/connectors/]有关spark流媒体和kafka集成的更多信息,请参见下面的链接。http://spark.apache.org/docs/latest/streaming-kafka-integration.html
frebpwbc2#
我用java做的。您可以在 JavaDStream<String> 作为 .foreachRDD() . 这不是最好的方式,因为它创造了 KafkaProducer 对于每个rdd,可以使用 KafkaProducers 就像spark文档中的socket示例。这是我的密码:
JavaDStream<String>
.foreachRDD()
KafkaProducer
KafkaProducers
public static class KafkaPublisher implements VoidFunction<JavaRDD<String>> { private static final long serialVersionUID = 1L; public void call(JavaRDD<String> rdd) throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", "loca192.168.0.155lhost:9092"); props.put("acks", "1"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1000); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); rdd.foreachPartition(new VoidFunction<Iterator<String>>() { private static final long serialVersionUID = 1L; public void call(Iterator<String> partitionOfRecords) throws Exception { Producer<String, String> producer = new KafkaProducer<>(props); while(partitionOfRecords.hasNext()) { producer.send(new ProducerRecord<String, String>("topic", partitionOfRecords.next())); } producer.close(); } }); } }
2条答案
按热度按时间rmbxnbpk1#
使用spark流,您可以使用来自kafka主题的数据。
如果要将记录发布到kafka主题,可以使用kafka producer[https://cwiki.apache.org/confluence/display/kafka/0.8.0+producer+example]
或者可以使用kafka connect将数据发布到使用多个源连接器的kafka主题[http://www.confluent.io/product/connectors/]
有关spark流媒体和kafka集成的更多信息,请参见下面的链接。
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
frebpwbc2#
我用java做的。您可以在
JavaDStream<String>
作为.foreachRDD()
. 这不是最好的方式,因为它创造了KafkaProducer
对于每个rdd,可以使用KafkaProducers
就像spark文档中的socket示例。这是我的密码: