(datastax 4.1.0)(cassandra)如何使用session.executeasync收集所有响应?

x33g5p2x  于 2021-06-10  发布在  Cassandra
关注(0)|答案(2)|浏览(501)

我想用execute.async调用manuel中的cassandradb进行异步调用我找到了这段代码,但我不明白如何将所有行收集到任何列表中。非常基本的调用,比如select*from table,我想存储所有的结果。
https://docs.datastax.com/en/developer/java-driver/4.4/manual/core/async/

CompletionStage<CqlSession> sessionStage = CqlSession.builder().buildAsync();

// Chain one async operation after another:
CompletionStage<AsyncResultSet> responseStage =
    sessionStage.thenCompose(
        session -> session.executeAsync("SELECT release_version FROM system.local"));

// Apply a synchronous computation:
CompletionStage<String> resultStage =
    responseStage.thenApply(resultSet -> resultSet.one().getString("release_version"));

// Perform an action once a stage is complete:
resultStage.whenComplete(
    (version, error) -> {
      if (error != null) {
        System.out.printf("Failed to retrieve the version: %s%n", error.getMessage());
      } else {
        System.out.printf("Server version: %s%n", version);
      }
      sessionStage.thenAccept(CqlSession::closeAsync);
    });
wlp8pajw

wlp8pajw1#

下面是4.x的一个示例(您还可以在4.4 btw中找到React式代码的示例)
https://github.com/datastax/cassandra-reactive-demo/blob/master/2_async/src/main/java/com/datastax/demo/async/repository/asyncstockrepository.java

pqwbnv8z

pqwbnv8z2#

您需要参考关于异步分页的部分—您需要提供一个回调,将数据收集到作为外部对象提供的列表中。文档有以下示例:

CompletionStage<AsyncResultSet> futureRs =
    session.executeAsync("SELECT * FROM myTable WHERE id = 1");
futureRs.whenComplete(this::processRows);

void processRows(AsyncResultSet rs, Throwable error) {
  if (error != null) {
    // The query failed, process the error
  } else {
    for (Row row : rs.currentPage()) {
      // Process the row...
    }
    if (rs.hasMorePages()) {
      rs.fetchNextPage().whenComplete(this::processRows);
    }
  }
}

在这种情况下 processRows 可以将数据存储在作为当前对象一部分的列表中,如下所示:

class Abc {
  List<Row> rows = new ArrayList<>();

  // call to executeAsync

  void processRows(AsyncResultSet rs, Throwable error) {
....
    for (Row row : rs.currentPage()) {
      rows.add(row);
    }
....

  }
}

但你得小心点 select * from table 因为它可能会返回很多结果,而且如果数据太多,它可能会超时—在这种情况下,最好执行令牌范围扫描(我有一个驱动程序3.x的示例,但还没有4.x的示例)。

相关问题