如何通过Java或Python高效地将大量数据写入Cassandra?

1hdlvixo  于 2022-09-27  发布在  Java
关注(0)|答案(1)|浏览(189)

大约有百万行的数据需要写入Cassandra。我尝试了以下方法:
第一:根据GitHub上Datastax java-driverpython-driver给出的参考代码,我的代码类似于:

// The following code is fixed, and this part will be omitted later.
    String cassandraHost = "******";
    String keyspace = "******";
    String table = "******";
    String insertCqlStr = " insert into " + keyspace + "." + table +"( "
            +     "id,date,value)"
            +     " values ( ?, ?, ?) ;";
    CqlSession session = CqlSession.builder()
            .addContactPoint(new InetSocketAddress(cassandraHost, 9042))
            .withLocalDatacenter("datacenter1")
            .withKeyspace(CqlIdentifier.fromCql(keyspace))
            .build();

    PreparedStatement preparedStatement = session.prepare(insertCqlStr);

    // The code below is changed, or just what I think it is.
    for(List<String> row: rows){
        session.execute(
            preparedInsertStatement.bind(row.get(0),     
            row.get(1), row.get(2))
          .setConsistencyLevel(ConsistencyLevel.ANY));
    }
    session.close();

这段代码工作得很好,但效率太低,我无法接受。所以我尝试了驱动程序提供的异步API,代码几乎与上面的代码相同:

for(List<String> row: rows){
        session.executeAsync(
            preparedInsertStatement.bind(row.get(0),     
            row.get(1), row.get(2))
          .setConsistencyLevel(ConsistencyLevel.ANY));
    }
    session.close();

请原谅我缺乏异步编程经验,因为我太粗鲁了。它可以工作,但有一个致命的问题,我发现它并没有将所有数据写入数据库。我想知道调用异步API的正确用法
此外,我还尝试了驱动程序提供的BatchStatement的相关方法。我知道,为了提高性能,这种方法已被正式弃用,而且它有很多局限性。例如,据我所知,批处理中的语句数不能超过65535,在默认配置中,批处理的数据长度警告限制为5kb,错误限制为50kb。但我将语句数保持在65535以下,并修改了上述默认配置:

List<BoundStatement> boundStatements = new ArrayList<>();
    Integer count = 0;
    BatchStatement batchStatement = BatchStatement.newInstance(BatchType.UNLOGGED);
    for (List<String> row : rows){
    // The actual code here is looping multiple times instead of exiting directly.
        if(count >= 65535){
            break;
        }
        BoundStatement boundStatement = preparedStatement.bind(row.get(0),
                                        row.get(1), row.get(2));
        boundStatements.add(boundStatement);
        count += 1;
    }
    BatchStatement batch = batchStatement.addAll(boundStatements);
    session.execute(batch.setConsistencyLevel(ConsistencyLevel.ANY));
    // session.executeAsync(batch.setConsistencyLevel().ANY);
    session.close();

它也起作用。实际上,它比异步API更高效,使用同步接口可以确保数据完整性。如果使用异步API在此处执行BatchStatement,上述数据也会不完整。但是这个方法仍然不能满足我的要求,我需要用多线程来执行它。当我执行多个线程时,它会给出错误:原因:com.datastax.oss.driver.api.core。驱动器超时异常:查询在PT2S后超时

总结:我尝试了同步和异步写入以及与批处理相关的方法,但有些问题我无法接受。我现在需要知道如何正确使用异步API来避免丢失数据,以及为什么我错了。至于BatchStatement相关的方法,我不认为它有效,如果你能给我一个可行的建议,那就太好了。非常感谢。

n8ghc7c1

n8ghc7c11#

我建议不要自己编写数据加载代码,而是采用一个DSBulk tool,它针对向Cassandra加载/卸载数据进行了高度优化。它是open source,所以您甚至可以将其用作Java库。
这几乎没有什么原因:

  • 编写异步代码并不容易——您需要确保不会在同一连接上发送太多请求(Cassandra对正在处理的请求数量有限制)。对于driver 3.x,您可以使用类似this的内容,driver 4.x具有内置的速率限制功能
  • 如果使用不当,Cassandra中的批处理通常会导致性能下降。批处理应仅用于提交属于同一分区的数据,否则会导致协调节点上的负载增加。此外,您还需要实现自定义路由。

DSBulk正在非常有效地完成所有这些工作,因为它是由每天在大规模设置中与Cassandra一起工作的人编写的。
另外,在您的情况下,一致性级别ANY意味着协调器只确认接收到数据,但不保证它会被写入(例如,如果它崩溃了)。

相关问题