使用c语言在cassandra中批量插入#

9fkzdhlc  于 2021-06-10  发布在  Cassandra
关注(0)|答案(2)|浏览(429)

我是新来Cassandra的。我必须用c#在cassandra中一次性插入50000行。我用的是Cassandrac#驱动程序。我使用下面的代码在cassandra中插入数据。请帮帮我

string tableName = "" + ConfigurationManager.AppSettings["tableName"];
            string keySpace = "test";// "" + ConfigurationManager.AppSettings["Keyspace"];
            //string query = "INSERT INTO " + tableName + " (emp_name, emp_position,emp_firstname,uniqueid) VALUES (?, ?,?,?)"; //; "SELECT * FROM "+ tableName + " limit 2 ";
            string query = string.Empty;
            query =
"BEGIN BATCH " +
"INSERT INTO " + tableName + " (emp_name, emp_position,emp_firstname,uniqueid) VALUES (?, ?,?,?);" +
"INSERT INTO " + tableName + " (emp_name, emp_position,emp_firstname,uniqueid) VALUES (?, ?,?,?);" +
"INSERT INTO " + tableName + " (emp_name, emp_position,emp_firstname,uniqueid) VALUES (?, ?,?,?);" +
" APPLY BATCH";

BatchStatement(keySpace, query,
                    Convert.ToString("ashish" + i), 2, Convert.ToString("Mohan" + i), System.Guid.NewGuid(),
                    Convert.ToString("ashish" + i), 2, Convert.ToString("Mohan" + i), System.Guid.NewGuid(),
                    Convert.ToString("ashish" + i), 2, Convert.ToString("Mohan" + i), System.Guid.NewGuid()
public void BatchStatement(string keySpace, string query, params object[] parameter)
    {
        try
        {
            BatchStatement objBatchStatement = new BatchStatement();
            PreparedStatement statement = PrepareQueryStatement(keySpace, query);
            objBatchStatement.Add(statement.Bind(parameter));
            var session = cluster.Connect(keySpace);
            //objBatchStatement.ConsistencyLevel
            // Execute the batch
            //RowSet row = session.Execute(query);
            RowSet row = session.Execute(objBatchStatement);

        }
        catch (Exception ex)
            {
            Console.WriteLine("Excpetion occured during batch operation method Name BatchStatement error : " + ex.ToString());
        }

    }

我得到错误:批处理中的语句无效:只允许update、insert和delete语句。

n7taea2i

n7taea2i1#

要回答具体问题。。。
批处理中的语句无效:只允许update、insert和delete语句。

"BEGIN BATCH " +
...
" APPLY BATCH";
``` `BEGIN BATCH` 以及 `APPLY BATCH` 是不允许的语句。这个 `BatchStatement` object会帮你处理的。
至于这个说法:
我要在Cassandra一次插入50000行
乔ão是绝对正确的,因为你所做的是对Cassandra理论的误用 `BATCH` 功能。批处理(在cassandra中)被设计成原子地将一次写入应用于多个表。它不是为支持50000次写入一个表而构建的。我见过开发团队在执行此操作时无意中导致集群节点崩溃。
这不起作用的原因是,cassandra意识到它没有一个单独的分区来发送批处理。因此,它选择了一个协调节点,负责从所有其他节点提取和组装50000个结果。该节点很快就会变得不知所措,并崩溃。
再说一遍,乔ão的建议是异步编写行。我要做的唯一调整是添加一种机制来限制在任何时候活动的线程数,以保护您的节点免受50k写操作的反压力。
ca1c2owp

ca1c2owp2#

我不太了解你的代码示例,所以我不知道你是否在使用 SELECT 似乎是问题所在的批中的查询。不管怎样,你可能误用了 Batch 因为cassandra中的批处理用于原子性而不是性能(大多数情况下)。
如果您希望尽可能快地插入这50000行,那么使用异步方式执行50000个插入会更快(建议这样做) session.ExecuteAsync . 尝试以下操作:

var tasks = new List<Task>();

foreach (BoundStatement query in queries) 
{
    tasks.Add(session.ExecuteAsync(query));
}

await Task.WhenAll(tasks).ConfigureAwait(false);

对于需要插入更多行的更高级用例,可能需要控制并发执行的请求数,但是标准任务并行库已经为您完成了其中的一些工作。
唯一一个 Batch 当您可以使用属于同一分区的语句创建微批处理时,要比单独异步执行所有这些请求快得多,但这要困难得多,而且您可能不需要这样的性能增益。

更新

在你编辑你的问题后,我可以理解代码示例。如果你还想用 Batch 与我上面建议的方法不同,您要做的是:

var batch = new BatchStatement();
foreach (BoundStatement query in queries) 
{
    batch.Add(query); // you can also add SimpleStatements instead of BoundStatements
}

await session.ExecuteAsync(batch).ConfigureAwait(false);

相关问题