NodeJS 如何使用AWS JavaScript SDK(dynamoDB)处理未处理的项目?

oxosxuxt  于 12个月前  发布在  Node.js
关注(0)|答案(6)|浏览(97)

我正在尝试使用AWS Lambda函数处理SendGrid中的事件。据我所知,事件将是一个数组,其中包含可变数量的JSON对象,每个对象表示一个给定的事件。我想使用batchWriteItem将这些事件写入DynamoDB,并循环该过程,直到没有返回任何UnprocessedItems。然而,我陷入了无限循环。下面是我的代码:

console.log('Loading function');

var aws = require('aws-sdk');
var dynamo = new aws.DynamoDB();
params = {};

exports.handler = function(sg_event, context) {

    var items = [];
    for(var i = 0; i < sg_event.length; i++) {
        var obj = sg_event[i];
        var request = {
            PutRequest: {
                Item: {
                    email: { S: obj.email },
                    timestamp: { S: obj.timestamp.toString() },
                    sg_message_id: { S: obj.sg_message_id },
                    event: { S: obj.event }
                }
            }
        };
        items.push(request);
    }

    params = {
        RequestItems: {
            sendgrid_response: items
        }
    }

    do {
        dynamo.batchWriteItem( params, function(err, data) {
            if(err)
                context.fail(err);
            else
                params.RequestItems = data.UnprocessedItems;
        });
    } while(!isEmpty(params.RequestItems));
};

function isEmpty(obj) {
    return (Object.keys(obj).length === 0);
}

字符串
我想问题出在试图在回调函数中设置参数,但我不知道我应该怎么做.我知道我可以在原始函数的回调函数中使用UnprocessedItems调用另一个batchWriteItem,但我仍然需要能够根据需要多次运行该函数,以确保所有UnprocessedItems都被写入。

xoefb8l8

xoefb8l81#

@Daniela Miao,感谢分享解决方案。
我们可以在您发布的代码中添加一个代码块,以避免DynamoDB异常。这将在再次请求DynamoDB批量写入之前检查params.RequestItems是否有未处理的数据。

//db is AWS.DynamoDB Client
var processItemsCallback = function(err, data) {
  if (err) { 
     //fail
  } else {
    var params = {};
    params.RequestItems = data.UnprocessedItems;
    /*
    * Added Code block 
    */
    if(Object.keys(params.RequestItems).length != 0) {
      db.batchWriteItem(params, processItemsCallback);
    }
  }
};

db.batchWriteItem(/*initial params*/, processItemsCallback);

字符串

bweufnob

bweufnob2#

这是我使用“await”语法的代码示例。所以这段代码必须在一个javascript函数中。它在重试之前会随机延迟。

do {
    batchWriteResp = await dynamo.batchWriteItem({RequestItems:batchWriteItems}).promise()
    if (Object.keys(batchWriteResp.UnprocessedItems).length>0) {
        batchWriteItems = batchWriteResp.UnprocessedItems
        // delay a random time between 0.5~2.5 seconds
        const delay = Math.floor(Math.random() * 2000 + 500)
        await new Promise(resolve => setTimeout(resolve, delay));
    } else {
        break
    }
} while (true)

字符串

eqzww0vc

eqzww0vc3#

Nodejs是单线程的,它首先执行所有的主函数,所以你的while循环永远不会结束,回调永远不会执行。
你应该这样做:

//db is AWS.DynamoDB Client
var processItemsCallback = function(err, data) {
  if (err) { 
     //fail
  } else {
    var params = {};
    params.RequestItems = data.UnprocessedItems;
    db.batchWriteItem(params, processItemsCallback);
  }
};

db.batchWriteItem(/*initial params*/, processItemsCallback);

字符串

8mmmxcuj

8mmmxcuj4#

我发现公认的答案不必要地递归,因此很难阅读和维护。

const isEmptyObj = obj => {
    for (const k of obj) return false;
    return true;
};

async writeBatchToDynamodbAndHandleUnprocessedItems(db, itemsToWrite) => {
    
    // Note that `itemsToWrite` should look like:
    // { [tableName]: [ item1, item2, item3, ... ] }    
    
    while (!isEmptyObj(itemsToWrite)) {
        
        const result = await db.batchWriteItem({ RequestItems: itemsToWrite }).promise();
        itemsToWrite = result.UnprocessedItems;
        
    }
    
};

字符串

nr9pn0ug

nr9pn0ug5#

一个非常简洁的解决方案,使用递归的BatchGetItem(其工作方式与BatchWrite完全相同):

public async batchGet(params: BatchGetItemInput, output: BatchGetResponseMap): Promise<BatchGetResponseMap> {
    const batchGetItemOutput: BatchGetItemOutput = await this.documentClient.batchGet(params).promise();

    Object.keys(batchGetItemOutput.Responses).forEach(tableName => {
        if (output[tableName]) {
            output[tableName] = output[tableName].concat(batchGetItemOutput.Responses[tableName]);
        } else {
            output[tableName] = batchGetItemOutput.Responses[tableName];
        }
    });

    if (Object.keys(batchGetItemOutput.UnprocessedKeys).length !== 0) {
        output = await this.batchGet({ RequestItems: batchGetItemOutput.UnprocessedKeys }, output);
    }

    return output;
}

字符串

zfciruhq

zfciruhq6#

根据AWS文档,SDK会自动处理重试逻辑:
注意AWS SDK实现了自动重试逻辑和指数回退。
最大重试次数可通过以下方式配置:

var dynamodb = new AWS.DynamoDB({apiVersion: '2012-08-10', maxRetries:5});

字符串

相关问题