我正在尝试将kafka流转换为RDD,并将这些RDD插入elasticsearch数据库。这是我的密码:
conf = SparkConf().setAppName("ola")
sc = SparkContext(conf=conf)
es_write_conf = {
"es.nodes": "localhost",
"es.port": "9200",
"es.resource": "pipe/word"
}
ssc = StreamingContext(sc, 2)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
value_counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
value_counts.transform(lambda rdd: rdd.map(f))
value_counts.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)
ssc.start()
ssc.awaitTermination()
saveasnewapihadoopfile函数应该将这些rdd写入es。但是我得到这个错误:
value_counts.saveAsNewAPIHadoopFile(
AttributeError: 'TransformedDStream' object has no attribute 'saveAsNewAPIHadoopFile'
转换函数应该能够将流转换为sparkDataframe。如何将这些rdd写入elasticsearch?谢谢!
2条答案
按热度按时间wgeznvg71#
rawuser是dataframe,new是pipelinedrdd
这里最重要的是下载合适的兼容jarhttps://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-hadoop 检查elastic的版本并下载合适的jar。
命令使pyspark使用jar。
pyspark --jars elasticsearch-hadoop-6.2.4.jar
dbf7pr2w2#
你可以用
foreachRDD
: