NodeJS 在云函数中使用事务和批处理写入

sqxo8psd  于 2023-08-04  发布在  Node.js
关注(0)|答案(2)|浏览(115)

Transactions and batched writes可用于通过原子操作写入多个文档。
文档说明 * 使用Cloud Firestore客户端库,您可以将多个操作分组到单个事务中。*
我不明白这里的 * 客户端库 * 是什么意思,以及在云函数中使用事务和批处理写入是否正确。
给出的示例:假设在数据库中我有3个元素(文档ID是A、B、C)。现在我需要再插入3个元素(文档ID为C、D、E)。云函数应该只添加最新的文档,并向用户发送推送通知,告诉他有2个新文档可用。
文档ID可以是相同的,但由于我需要计算有多少文档是新的(将插入的那些),我需要一种方法来读取文档ID,并检查它的存在。因此,我想知道事务是否适合云函数。
此外,每个事务或一批写操作最多可以写入500个文档。有没有其他方法可以克服云函数中的这个限制?

pkwftd7m

pkwftd7m1#

Firestore Transaction行为在客户端SDK(JS SDK,iOS SDK,Android SDK...)和Admin SDK(一组服务器库)之间是不同的,Admin SDK是我们在云函数中使用的SDK。有关here差异的更多说明,请参阅文档。
由于Admin SDK中使用的数据争用类型,您可以使用getAll()方法从Firestore检索多个文档,并对所有返回的文档持有悲观锁。
所以这就是你需要在交易中调用的方法:你使用getAll()来获取文档C、D和E,并且你检测到只有C存在,所以你知道你只需要添加D和E。
具体地说,它可以是沿着以下路线的东西:

const db = admin.firestore();

exports.lorenzoFunction = functions
    .region('europe-west1')
    .firestore
    .document('tempo/{docId}')  //Just a way to trigger the test Cloud Function!!
    .onCreate(async (snap, context) => {

        const c = db.doc('coltest/C');
        const d = db.doc('coltest/D');
        const e = db.doc('coltest/E');

        const docRefsArray = [c, d, e]

        return db.runTransaction(transaction => {
            return transaction.getAll(...docRefsArray).then(snapsArray => {
                let counter = 0;
                snapsArray.forEach(snap => {
                    if (!snap.exists) {
                        counter++;
                        transaction.set(snap.ref, { foo: "bar" });
                    } else {
                        console.log(snap.id + " exists")
                    }
                });
                console.log(counter);
                return;
            });
        });

    });

字符串
要测试它:在coltest集合中创建一个C,D或E文档,然后在tempo集合中创建一个文档(只是触发此测试云函数的简单方法):CF被触发。然后看看coltest集合:创建了两个缺失的文档;查看CF日志:计数器= 2。
此外,每个事务或一批写操作最多可以写入500个文档。有没有其他方法可以克服云函数中的这个限制?
答案是否定的。

chy5wohz

chy5wohz2#

过去在500个记录块之间也需要一秒的延迟。这是我几年前写的下面的脚本逐行读取CSV文件,为每行创建和设置一个新的批处理对象。计数器每500个对象创建一个新的批处理写入,最后使用async/await将写入速率限制为每秒1次。最后,我们通过控制台日志记录通知用户写入进度。我在这里发表了一篇文章>> https://hightekk.com/articles/firebase-admin-sdk-bulk-import
注意:在我的情况下,我正在阅读一个巨大的平面文本文件(一个制造商的零件号目录)的进口。您可以将其用作工作模板并进行修改以适合您的数据源。此外,您可能需要增加分配给node的内存,以便运行此命令:
第一个月
脚本看起来像:

var admin = require("firebase-admin");
var serviceAccount = require("./your-firebase-project-service-account-key.json");
var fs = require('fs');
var csvFile = "./my-huge-file.csv"
var parse = require('csv-parse');
require('should');
admin.initializeApp({
  credential: admin.credential.cert(serviceAccount),
  databaseURL: "https://your-project.firebaseio.com"
});
var firestore = admin.firestore();
var thisRef;
var obj = {};
var counter = 0;
var commitCounter = 0;
var batches = [];
batches[commitCounter] = firestore.batch();
fs.createReadStream(csvFile).pipe(
    parse({delimiter: '|',relax_column_count:true,quote: ''})
).on('data', function(csvrow) {
    if(counter <= 498){
        if(csvrow[1]){
            obj.family = csvrow[1];
        }
        if(csvrow[2]){
            obj.series = csvrow[2];
        }
        if(csvrow[3]){
            obj.sku = csvrow[3];
        }
        if(csvrow[4]){
            obj.description = csvrow[4];
        }
        if(csvrow[6]){
            obj.price = csvrow[6];    
        }
        thisRef = firestore.collection("your-collection-name").doc();
        batches[commitCounter].set(thisRef, obj);
        counter = counter + 1;            
    } else {
        counter = 0;
        commitCounter = commitCounter + 1;
        batches[commitCounter] = firestore.batch();
    }
}).on('end',function() {
    writeToDb(batches);
});
function oneSecond() {
    return new Promise(resolve => {
        setTimeout(() => {
            resolve('resolved');
        }, 1010);
    });
}
async function writeToDb(arr) {
    console.log("beginning write");
    for (var i = 0; i < arr.length; i++) {
        await oneSecond();
        arr[i].commit().then(function () {
            console.log("wrote batch " + i);
        });
    }
    console.log("done.");
}

字符串

相关问题