我正在运行一个算法来标记mongo字段,并在此基础上向该文档添加新字段。由于我的收藏数量在100万左右,因此更新和插入要花很多时间。
样本数据:
{id:'a1',content:'some text1'}
{id:'a2',content:'some text2'}
python代码:
docs= db.col.find({})
for doc in docs:
out = do_operation(doc['content']) //do_operation is my algorithm
doc["tag"]=out
db.col.update(id:doc['id'],$set:{'Tag_flag':TRUE})
db.col2.insert(doc)
虽然我使用了sparkDataframe来提高速度,但是sparkDataframe占用了大量内存并抛出内存错误(配置:4核和16gb ram(在hadoop的单个集群上)
df = //loading mongodata to a dataframe
df1 = df.withColumn('tag',df.content)
output = []
for doc in df.rdd.collect():
out = do_operation(doc['content'])
output.append(out)
df2 = spark.createDataFrame(output)
final_df = df1.join(df2, df1._id == df2._id , 'inner')
//and finally inserting this dataframe into new collection.
我需要优化我的sparkcode,这样我可以用更少的内存加速。
我可以在mongo和spark之间使用任何消息代理,比如kafka、rabbitmq或reddis。
会有帮助吗?
暂无答案!
目前还没有任何答案,快来回答吧!