大约有百万行的数据需要写入Cassandra。我尝试了以下方法:
第一:根据GitHub上Datastax java-driver或python-driver给出的参考代码,我的代码类似于:
// The following code is fixed, and this part will be omitted later.
String cassandraHost = "******";
String keyspace = "******";
String table = "******";
String insertCqlStr = " insert into " + keyspace + "." + table +"( "
+ "id,date,value)"
+ " values ( ?, ?, ?) ;";
CqlSession session = CqlSession.builder()
.addContactPoint(new InetSocketAddress(cassandraHost, 9042))
.withLocalDatacenter("datacenter1")
.withKeyspace(CqlIdentifier.fromCql(keyspace))
.build();
PreparedStatement preparedStatement = session.prepare(insertCqlStr);
// The code below is changed, or just what I think it is.
for(List<String> row: rows){
session.execute(
preparedInsertStatement.bind(row.get(0),
row.get(1), row.get(2))
.setConsistencyLevel(ConsistencyLevel.ANY));
}
session.close();
这段代码工作得很好,但效率太低,我无法接受。所以我尝试了驱动程序提供的异步API,代码几乎与上面的代码相同:
for(List<String> row: rows){
session.executeAsync(
preparedInsertStatement.bind(row.get(0),
row.get(1), row.get(2))
.setConsistencyLevel(ConsistencyLevel.ANY));
}
session.close();
请原谅我缺乏异步编程经验,因为我太粗鲁了。它可以工作,但有一个致命的问题,我发现它并没有将所有数据写入数据库。我想知道调用异步API的正确用法。
此外,我还尝试了驱动程序提供的BatchStatement的相关方法。我知道,为了提高性能,这种方法已被正式弃用,而且它有很多局限性。例如,据我所知,批处理中的语句数不能超过65535,在默认配置中,批处理的数据长度警告限制为5kb,错误限制为50kb。但我将语句数保持在65535以下,并修改了上述默认配置:
List<BoundStatement> boundStatements = new ArrayList<>();
Integer count = 0;
BatchStatement batchStatement = BatchStatement.newInstance(BatchType.UNLOGGED);
for (List<String> row : rows){
// The actual code here is looping multiple times instead of exiting directly.
if(count >= 65535){
break;
}
BoundStatement boundStatement = preparedStatement.bind(row.get(0),
row.get(1), row.get(2));
boundStatements.add(boundStatement);
count += 1;
}
BatchStatement batch = batchStatement.addAll(boundStatements);
session.execute(batch.setConsistencyLevel(ConsistencyLevel.ANY));
// session.executeAsync(batch.setConsistencyLevel().ANY);
session.close();
它也起作用。实际上,它比异步API更高效,使用同步接口可以确保数据完整性。如果使用异步API在此处执行BatchStatement,上述数据也会不完整。但是这个方法仍然不能满足我的要求,我需要用多线程来执行它。当我执行多个线程时,它会给出错误:原因:com.datastax.oss.driver.api.core。驱动器超时异常:查询在PT2S后超时
总结:我尝试了同步和异步写入以及与批处理相关的方法,但有些问题我无法接受。我现在需要知道如何正确使用异步API来避免丢失数据,以及为什么我错了。至于BatchStatement相关的方法,我不认为它有效,如果你能给我一个可行的建议,那就太好了。非常感谢。
1条答案
按热度按时间n8ghc7c11#
我建议不要自己编写数据加载代码,而是采用一个DSBulk tool,它针对向Cassandra加载/卸载数据进行了高度优化。它是open source,所以您甚至可以将其用作Java库。
这几乎没有什么原因:
DSBulk正在非常有效地完成所有这些工作,因为它是由每天在大规模设置中与Cassandra一起工作的人编写的。
另外,在您的情况下,一致性级别ANY意味着协调器只确认接收到数据,但不保证它会被写入(例如,如果它崩溃了)。