使用httppost,以下脚本可以插入一个新字段 createtime
或更新 lastupdatetime
:
curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{
"doc": {
"lastupdatetime": "2015-09-16T18:00:00"
}
"upsert" : {
"createtime": "2015-09-16T18:00:00"
"lastupdatetime": "2015-09-16T18:00",
}
}'
但在星火剧本里,设定之后 "es.write.operation": "upsert"
,我不知道如何插入 createtime
完全。只有 es.update.script.*
在官方文件中。。。有谁能给我举个例子吗?
更新:在我的例子中,我想将android设备的信息从logo保存到一个elasticsearch类型中,并将它的首次出现时间设置为 createtime
. 如果设备再次出现,我只更新 lastupdatetime
,但是离开 createtime
就这样。
所以文件 id
是android,如果id存在,更新 lastupdatetime
,否则插入 createtime
以及 lastupdatetime
。所以这里的设置是(在python中):
conf = {
"es.resource.write": "stats-device/activation",
"es.nodes": "NODE1:9200",
"es.write.operation": "upsert",
"es.mapping.id": "id"
# ???
}
rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=conf
)
我只是不知道如果 id
不存在。
2条答案
按热度按时间ruyhziif1#
如果没有看到你的Spark脚本,将很难给出一个详细的答案。但一般来说,您会希望使用elasticsearch hadoop(例如,您需要将该依赖项添加到build.sbt文件中),然后在脚本中您可以:
根据官方文件。savetoes的第二个参数指定要用作elasticsearch文档id的Maprdd中的哪个键。
当然,如果您使用spark执行此操作,则意味着您拥有的行数超过了手动键入的行数,因此对于您的情况,您需要将数据从脚本中的key->value转换为Map的rdd。但是如果不知道数据来源,我就不能更详细地说了。
t3psigkw2#
最后,我得到了一个并不完美的解决方案:
添加
createtime
所有来源单据;保存到es
create
方法和忽略已创建的错误;删除
createtime
字段;再次保存到es
update
方法;目前(2015-09-27),步骤2可以通过此修补程序实现。