pyspark使用saveasnewapihadoopfile将数据流数据写入elasticsearch

xfb7svmp  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(497)

我正在尝试将kafka流转换为RDD,并将这些RDD插入elasticsearch数据库。这是我的密码:

conf = SparkConf().setAppName("ola")
sc = SparkContext(conf=conf) 
es_write_conf = {
    "es.nodes": "localhost",
    "es.port": "9200",
    "es.resource": "pipe/word"
}

ssc = StreamingContext(sc, 2)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) 
lines = kvs.map(lambda x: x[1])  
value_counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)

value_counts.transform(lambda rdd: rdd.map(f))
value_counts.saveAsNewAPIHadoopFile(
    path='-',
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf=es_write_conf)

ssc.start()  
ssc.awaitTermination()

saveasnewapihadoopfile函数应该将这些rdd写入es。但是我得到这个错误:

value_counts.saveAsNewAPIHadoopFile(
   AttributeError: 'TransformedDStream' object has no attribute 'saveAsNewAPIHadoopFile'

转换函数应该能够将流转换为sparkDataframe。如何将这些rdd写入elasticsearch?谢谢!

wgeznvg7

wgeznvg71#

new = rawUser.rdd.map(lambda item: ('key', {'id': item['entityId'],'targetEntityId': item['targetEntityId']}))

rawuser是dataframe,new是pipelinedrdd

new.saveAsNewAPIHadoopFile(
    path='/home/aakash/test111/', 
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf={ "es.resource" : "index/test" ,"es.mapping.id":"id","es.nodes" : "localhost","es.port" : "9200","es.nodes.wan.only":"false"})

这里最重要的是下载合适的兼容jarhttps://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-hadoop 检查elastic的版本并下载合适的jar。
命令使pyspark使用jar。 pyspark --jars elasticsearch-hadoop-6.2.4.jar

dbf7pr2w

dbf7pr2w2#

你可以用 foreachRDD :

value_counts.foreachRDD(lambda rdd: rdd.saveAsNewAPIHadoopFile(...))

相关问题