akka 如何从 cassandra 流所有记录?

w3nuxt5m  于 2022-11-05  发布在  Cassandra
关注(0)|答案(2)|浏览(152)

我需要从Cassandra流所有记录。目前我使用akka-persistence-cassandra流数据:

val querier =
        PersistenceQuery(system)
          .readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)

      val selectDistinctPersistenceIds = new SimpleStatement(
      "SELECT DISTINCT persistence_id, partition_nr FROM messages")
        .setFetchSize(100000)

        querier.session.select(selectDistinctPersistenceIds).map { row =>
          val id = row.getString(0)
          id
        }

当记录数在150万左右时,这个方法可以正常工作,但是当记录数超过150万时,就会出现read timeout错误。
我正在使用:

"com.typesafe.akka" %% "akka-persistence-cassandra" % "0.58"
"com.typesafe.akka" %% "akka-persistence" % "2.6.12"
"com.typesafe.akka" %% "akka-persistence-query" % "2.6.12"

编辑:错误日志:

com.datastax.driver.core.exceptions.OperationTimedOutException: [/<ip-address>:9042] Timed out waiting for server response", exceptionStackTrace="java.util.concurrent.ExecutionException: com.datastax.driver.core.exceptions.OperationTimedOutException: [/<ip-address>:9042] Timed out waiting for server response
    at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:552)
    at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:513)
    at akka.persistence.cassandra.package$ListenableFutureConverter$$anon$2.$anonfun$run$2(package.scala:25)
...
pkwftd7m

pkwftd7m1#

问题是与您的驱动程序会话设置调整它根据您的需要。
可能与gap-timeout或增加重试次数和timeout设置有关。

hlswsv35

hlswsv352#

我可以通过为cassandra-journal.socket.read-timeout-millis设置一个比默认值12000ms更高的值来解决这个问题。

cassandra-journal {
  ...

  socket {
    # the per-host read timeout in milliseconds. Should be higher than the timeout settings
    # used on the Cassandra side.
    read-timeout-millis = 30000
}

相关问题