如何在spark中插入elasticsearch?

baubqpgj  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(386)

使用httppost,以下脚本可以插入一个新字段 createtime 或更新 lastupdatetime :

curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{
"doc": {
    "lastupdatetime": "2015-09-16T18:00:00"
}
"upsert" : {
    "createtime": "2015-09-16T18:00:00"
    "lastupdatetime": "2015-09-16T18:00",
}
}'

但在星火剧本里,设定之后 "es.write.operation": "upsert" ,我不知道如何插入 createtime 完全。只有 es.update.script.* 在官方文件中。。。有谁能给我举个例子吗?
更新:在我的例子中,我想将android设备的信息从logo保存到一个elasticsearch类型中,并将它的首次出现时间设置为 createtime . 如果设备再次出现,我只更新 lastupdatetime ,但是离开 createtime 就这样。
所以文件 id 是android,如果id存在,更新 lastupdatetime ,否则插入 createtime 以及 lastupdatetime 。所以这里的设置是(在python中):

conf = {
    "es.resource.write": "stats-device/activation",
    "es.nodes": "NODE1:9200",
    "es.write.operation": "upsert",
    "es.mapping.id": "id"
    # ???
}

rdd.saveAsNewAPIHadoopFile(
    path='-',
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf=conf
)

我只是不知道如果 id 不存在。

ruyhziif

ruyhziif1#

如果没有看到你的Spark脚本,将很难给出一个详细的答案。但一般来说,您会希望使用elasticsearch hadoop(例如,您需要将该依赖项添加到build.sbt文件中),然后在脚本中您可以:

import org.elasticsearch.spark._ 
val documents = sc.parallelize(Seq(Map(
                                   "id" -> 1, 
                                   "createtime" -> "2015-09-16T18:00:00"
                                   "lastupdatetime" -> "2015-09-16T18:00"),
                                  Map(<next document>), ...)
                   .saveToEs("test/type1", Map("es.mapping.id" -> "id"))

根据官方文件。savetoes的第二个参数指定要用作elasticsearch文档id的Maprdd中的哪个键。
当然,如果您使用spark执行此操作,则意味着您拥有的行数超过了手动键入的行数,因此对于您的情况,您需要将数据从脚本中的key->value转换为Map的rdd。但是如果不知道数据来源,我就不能更详细地说了。

t3psigkw

t3psigkw2#

最后,我得到了一个并不完美的解决方案:
添加 createtime 所有来源单据;
保存到es create 方法和忽略已创建的错误;
删除 createtime 字段;
再次保存到es update 方法;
目前(2015-09-27),步骤2可以通过此修补程序实现。

相关问题