这里是初学者的问题
将Spark Dataframe推送到ElasticSearch的工作流程或步骤是什么?
根据研究,我认为我需要使用spark.newAPIHadoopFile()方法。
然而,通过Elastic Search Documentation和other Stack Q/A's的挖掘,我仍然对参数需要采用什么格式以及为什么这样做感到有点困惑
注意,我使用的是pyspark,这是ES的一个新表(不存在索引),而df是5列(2个字符串类型、2个LONG类型、1个int列表),大约有3.5M行。
2条答案
按热度按时间x7rlezfr1#
这对我很有效--我的数据存储在
df
中。我使用此命令提交了我的作业-
/path/to/spark-submit --master spark://master:7077 --jars ./jar_files/elasticsearch-hadoop-5.6.4.jar --driver-class-path ./jar_files/elasticsearch-hadoop-5.6.4.jar main_df.py
。uxh89sit2#
我设法找到了答案,所以我来分享。Spark df(来自pyspk.sql)目前不支持
newAPIHadoopFile()
方法;但是,df.rdd.saveAsNewAPIHadoopFile()
也给了我错误。诀窍是通过以下函数将df转换为字符串因此,我的JSON工作流程是:
1:
df = spark.read.json('XXX.json')
2:
rdd_mapped = df.rdd.map(lambda y: y.asDict())
3:
final_rdd = rdd_mapped.map(transform)
4:
更多information on ES arguments can be found here(滚动到“配置”)