Mongodb更快地执行数百万条记录的数组,仅需$in

vlf7wbxs  于 2023-03-17  发布在  Go
关注(0)|答案(1)|浏览(101)

我有两个mongodb集合:1).第一次采集为:sourcequeuemanualupload。它包含状态为“未处理”或“已分配”的所有记录。它包含上载数据时的记录。数据库中的一个已分配项目记录的示例:

{
    "_id" : ObjectId("63e0e46a6047d75b9c20d8ec"),
    "Properties: Name" : "Hangman - Guess Words",
    "Appstore URL" : "https://itunes.apple.com/app/id1375993101?hl=None",
    "Region" : "na",
    "Create Date" : "na",
    "AHT" : "1",
    "sourceId" : "63e0e3719b4f812ba5333a31",
    "type" : "Manual",
    "uploadTime" : "2023-02-06T11:28:42.533+0000",
    "status" : "Assigned",
    "batchId" : "63e0e3719b4f812ba5333a31_746f22e4319b4d81b8ab255f5e653c2c_612023112842"
}

2).第二个集合是queuedata。它包含所处理项目(来自sourcequeuemanualupload集合)的数据(问题、用户对工具的响应),这些数据均保存在此集合中,状态为Completedsourcequeuemanualupload中记录的_id在此处存储为“id”字段作为标识符。已完成项目的示例:

{
    "_id" : ObjectId("63e0e4b19b4f812ba5333a34"),
    "templateId" : "63e0e28e9b4f812ba5333a30",
    "id" : "63e0e46a6047d75b9c20d8ec",
    "moderator" : "kodaga",
    "startTime" : "2023-02-06T11:29:46.048Z",
    "endTime" : "2023-02-06T11:29:52.438Z",
    "status" : "Completed",
    "AHT" : NumberLong(6),
    "userInput" : [
            {
                    "question" : "Is the URL leading to the desired store page link?",
                    "response" : "yes"
            },
            {
                    "question" : "Comments, if any.",
                    "response" : "test 1"
            }
    ]
}

DBA最初犯了一个错误,因为他没有更新sourcequeuemanualupload集合中已完成到Completed的分配项的状态。因此,分配项保留为“已分配”本身。我们正在解决此问题,以将所有分配项(已处理)标记为“已完成”。
当前数据量:

> db.sourcequeuemanualupload.count()
414781
> db.sourcequeuemanualupload.count({"status":"Assigned"})
306418
> db.queuedata.count()
298128

我目前编写的用于识别已完成记录并将其标记为已完成的脚本:

var assigned_tasks_arr = [];

db.sourcequeuemanualupload.find({ status: "Assigned" }).forEach(function (rec) {
  assigned_tasks_arr.push(rec._id.str);
});

print(assigned_tasks_arr.length)
> 306418

db.queuedata.count({ id: { $in: assigned_tasks_arr }, status: "Completed" }); // <------------- STEP 1

var completed_items = db.queuedata
  .find(
    { id: { $in: assigned_tasks_arr }, status: "Completed" },
    { _id: 0, id: 1 }
  )
  .toArray(); //    <------------- STEP 2

var completed_items_ids = [];

completed_items.forEach(function (rec) {
  completed_items_ids.push(rec.id);
});

var completed_items_unique_objectid = [];

completed_items.forEach(function (item) {
  completed_items_unique_objectid.push(new ObjectId(item));
});

db.sourcequeuemanualupload.updateMany(
  { _id: { $in: completed_items_unique_objectid } },
  { $set: { status: "Completed" } }
); // <------------- STEP 3

基本上,我从sourcequeuemanualupload集合中获取Assigned状态项的所有_id,并将它们存储在一个数组中。接下来,为了查看它们是否实际上已经完成,并且queuedata中是否存在记录,我使用$in和assigned_tasks_arr执行搜索,以获得实际完成的项。一旦我获得了完成的id列表,我将继续更新sourcequeuemanualuload中的状态。
但是,我的步骤1和步骤2的执行时间超过30分钟,肯定超过一个小时或更长(但我无法获得确切的时间,因为我的会话被终止)。我认为$in with array find查询执行需要(N*log(M)),其中N是输入数组的长度,M是集合的大小。如您所见,我的N值以百万为单位,M也以百万为单位,这个查询将花费更长的时间.我如何优化这个查询或写一个更快的查询来找到id并立即更新?

我需要帮助以更快的速度执行上述脚本中的STEP 1、STEP 2和STEP 3,这样我就不必等待数小时。我们能否在此处编写一个更好的连接查询或任何其他方法来优化查询?

谢谢
我试着在线搜索更快的优化方法。我将索引作为“id”添加到queuedata集合,但仍然需要很长时间:(需要有关上述查询的帮助
注:使用AWS documentDB 4.0.0时,如果您提供任何查询建议,请检查链接:https://docs.aws.amazon.com/documentdb/latest/developerguide/mongo-apis.html,因为某些命令不受支持

7d7tgy0s

7d7tgy0s1#

我会这样尝试:

var completed_ids = []
db.sourcequeuemanualupload.aggregate([
   { $match: { status: "Assigned" } },
   {
      $lookup: {
         from: "queuedata",
         let: { id: "$_id" },
         pipeline: [
            { $mach: { status: "Completed" } },
            { $match: { $expr: { $eq: ["$$id", { $toObjectId: "$id" }] } } }
         ],
         as: "queuedata"
      }
   },
   { $match: { queuedata: { $ne: [] } } },
   { $project: { _id: 1 } }
]).forEach(rec => {
   completed_ids.push(rec._id);
   if (completed_ids.length > 10000) {
      db.sourcequeuemanualupload.updateMany(
         { _id: { $in: completed_ids } },
         { $set: { status: "Completed" } }
      );
      completed_ids = [];
   }
})
if (completed_ids.length > 0) {
   db.sourcequeuemanualupload.updateMany(
      { _id: { $in: completed_ids } },
      { $set: { status: "Completed" } }
   );
}

MongoDB版本4.0是End of Life,您可能会寻找一个现代的MongoDB托管服务。
另一个非常简单的方法是:

db.queuedata.aggregate([
   { $match: { status: "Completed" } },
   {
      $project: {
         _id: { $toObjectId: "$id" },
         status: 1
      }
   },
   {
      $merge: {
         into: "sourcequeuemanualupload",
         whenMatched: "merge",
         whenNotMatched: "discard"
      }
   }
])

但这将更新sourcequeuemanualupload中的所有文档,无论是否为status: 'Assigned'
一个解决方案是这样的:

var completed_ids = [];
db.queuedata.aggregate([
   { $match: { status: "Completed" } },
   { $project: { _id: { $toObjectId: "$id" } } }
]).forEach(rec => {
   completed_ids.push(rec._id);
   if (completed_ids.length > 10000) {
      db.sourcequeuemanualupload.updateMany(
         {
            _id: { $in: completed_ids },
            status: "Assigned"
         },
         { $set: { status: "Completed" } }
      );
      completed_ids = [];
   }
})
if (completed_ids.length > 0) {
   db.sourcequeuemanualupload.updateMany(
      {
         _id: { $in: completed_ids },
         status: "Assigned"
      },
      { $set: { status: "Completed" } }
   );
}

当您要修复数据时,id应该转换为ObjectId,而不是普通字符串。

相关问题