我正在使用structured streaming 2.4并尝试使用foreachbatch sink写入单节点cassandra,如:
foreachBatch { (df, batchId) =>
df
.rdd
.repartitionByCassandraReplica("ks", "tbl")
.leftJoinWithCassandraTable("ks", "tbl")
.on(SomeColumns("id"))
.map(...)
.toDF(...)
.write
.cassandraFormat("tbl", "ks")
.mode("Append")
.save()
当df有一百万行时,spark可以将一些行写入cassandra,然后抛出:
WARN QueryExecutor: BusyPoolException ... Retrying
然后:
ERROR QueryExecutor: Failed to execute: com.datastax.spark.connector.writer.RichBoundStatement@
Cassandra在那之后停了下来。所以,如果我想配置像poolingoptions这样的东西,有人能帮我举一些例子来说明如何在foreachbatch sink中配置datastax连接器吗?
谢谢
暂无答案!
目前还没有任何答案,快来回答吧!