如何使用spark和mongohadoop连接器更新mongo集合中的多个字段?

hjqgdpho  于 2021-05-30  发布在  Hadoop
关注(0)|答案(1)|浏览(398)

我正在使用mongohadoop连接器将mongo与spark和scala连接起来。
我想使用mongo和spark更新(upsert)mongo集合的多个值。
我试着跟着--

val query = new BasicBSONObject()
query.append("customerId", customerId)

val update = new BasicBSONObject()
val bson = new BasicBSONObject()
bson.put("minimumUtilization", some value)
bson.put("maximumUtilization", some value)
bson.put("averageUtilization", some value)
bson.put("customerId", customerId)

现在我如何使用上面的对象来更新使用spark和mongo hadoop连接器的集合??

zf9nrax1

zf9nrax11#

请看这里:https://github.com/mongodb/mongo-hadoop/wiki/spark-usage
使用mongoupdatewriteable。创建mongoupdatewriteable的rdd(query、update、upsert、multiupdate、replace),然后

updates.map(f =>(null,f)).saveAsNewAPIHadoopFile(
      "file:///this-is-completely-unused",
      classOf[Object],
      classOf[MongoUpdateWritable],
      classOf[MongoOutputFormat[Object, MongoUpdateWritable]],
      outputConfig)

相关问题