如何在spark join中配置pooligoption

htzpubme  于 2021-06-10  发布在  Cassandra
关注(0)|答案(0)|浏览(207)

我正在使用structured streaming 2.4并尝试使用foreachbatch sink写入单节点cassandra,如:

foreachBatch { (df, batchId) =>
        df
          .rdd
          .repartitionByCassandraReplica("ks", "tbl")
          .leftJoinWithCassandraTable("ks", "tbl")
          .on(SomeColumns("id"))
          .map(...)
          .toDF(...)
          .write
          .cassandraFormat("tbl", "ks")
          .mode("Append")
          .save()

当df有一百万行时,spark可以将一些行写入cassandra,然后抛出:

WARN QueryExecutor: BusyPoolException ... Retrying

然后:

ERROR QueryExecutor: Failed to execute: com.datastax.spark.connector.writer.RichBoundStatement@

Cassandra在那之后停了下来。所以,如果我想配置像poolingoptions这样的东西,有人能帮我举一些例子来说明如何在foreachbatch sink中配置datastax连接器吗?
谢谢

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题