spark batch在2个cassandra群集之间迁移数据

n9vozmp4  于 2021-06-10  发布在  Cassandra
关注(0)|答案(2)|浏览(361)

我使用spark将一些数据从一个cassandra表移动到另一个集群上的另一个cassandra表。
我为其中一个源集群指定了cassandra配置,如下所示:

/*
spark.cassandra.connection.host: 
spark.cassandra.connection.port:
spark.cassandra.auth.username:
spark.cassandra.auth.password:
spark.cassandra.connection.ssl.clientAuth.enabled: true
spark.cassandra.connection.ssl.enabled: true
spark.cassandra.connection.ssl.trustStore.path: 
spark.cassandra.connection.ssl.trustStore.password: 
spark.cassandra.connection.timeout_ms: */

SparkSession spark = SparkSession.builder()
            .config(conf)
            .getOrCreate();

Dataset<Row> df = spark.read()
            .format("org.apache.spark.sql.cassandra")
            .options(config.getSourceTable())
            .load();
df.show();

//***How/Where do I specify cassandra config in destination cluster?***
df.write()
        .mode(SaveMode.Append)
        .format("org.apache.spark.sql.cassandra")
        .options(destinationTbl);

如何/在何处指定目标集群中的cassandra配置(java perferred)?
谢谢!

lymgl2op

lymgl2op1#

我有一个类似的用例,但是在我的例子中,由于一些连接器问题,我无法使用alex建议的方法建立到第二个集群的连接。所以,我必须将这个Dataframe转换成rdd,并使用rdd方法将其写入第二个cassandra集群
将所有cassandra连接器详细信息传递给另一个sparkconfig文件,并使用cassandraconnector对其进行解析。

{    
val cluster: CassandraConnector = CassandraConnector(sparkConfig)

      implicit val c: CassandraConnector = cluster

      dataFrame
        .rdd
        .saveToCassandra(keySpaceName, tableName, SomeColumns(ListOfColumns)
}
xtfmy6hx

xtfmy6hx2#

我还没有测试过它,但是根据russel spitzer的博客文章,您可以做以下工作(没有用java测试,但应该可以工作):
设置2个配置选项(或在创建 spark 示例):

spark.setConf("ClusterSource/spark.cassandra.connection.host", "127.0.0.1");
spark.setConf("ClusterDestination/spark.cassandra.connection.host", "127.0.0.2");

添加到 options 将相应集群的名称称为 cluster 进入。
p、 另外,请记住,如果需要迁移数据并在数据上保留writetime和/或ttl,则需要使用rddapi,因为dataframeapi不支持这些内容。

相关问题