spark3.0和cassandraspark/python conenctors:在写之前没有创建表

9udxz4iz  于 2021-06-09  发布在  Cassandra
关注(0)|答案(2)|浏览(361)

我正在尝试将我的应用程序升级到spark 3.0.1。对于表的创建,我使用cassandra驱动程序python cassandra连接器拖放并创建一个表。然后我使用spark cassandra连接器将Dataframe写入表中。仅仅使用spark-cassandra连接器来创建和删除表并不是一个好的选择。
对于spark 2.4,drop create write流没有问题。但是在Spark3.0中,应用程序似乎没有按照特定的顺序来做这些事情,通常是在删除和创建之前先写。我不知道如何确保先删除和创建表。我知道即使应用程序在写的时候出错,删除和创建也会发生,因为当我通过cqlsh查询cassandra时,我可以看到表被删除和重新创建。对spark 3.0中的这种行为有什么想法吗?
注意:由于模式发生了变化,需要删除并重新创建这个表,而不是直接覆盖。
请求的代码段:

session = self._get_python_cassandra_session(self.env_conf, self.database)
        # build drop table query
        drop_table_query = 'DROP TABLE IF EXISTS {}.{}'.format(self.database, tablename)
        session.execute(drop_table_query)

        df, table_columns, table_keys = self._create_table_metadata(df, keys=keys)
        # build create query
        create_table_query = 'CREATE TABLE IF NOT EXISTS {}.{} ({} PRIMARY KEY({}), );'.format(self.database, tablename, table_columns, table_keys)
        # execute table creation
        session.execute(create_table_query)
        session.shutdown()

        # spark-cassandra connection options
        copts = _cassandra_cluster_spark_options(self.env_conf)
        # set write mode
        copts['confirm.truncate'] = overwrite
        mode = 'overwrite' if overwrite else 'append'
        # write dataframe to cassandra
        get_dataframe_writer(df, 'cassandra', keyspace=self.database, 
        table=tablename, mode=mode, copts=copts).save()
fzsnzjdm

fzsnzjdm1#

我最终建立了一个time.sleep(5)延迟和100秒超时,周期性地ping cassandra查找表,然后写入是否找到了表。

qgzx9mmu

qgzx9mmu2#

在spark cassandra connector 3.0+中,您可以使用新功能—通过catalogs api操纵键空间和表。您可以使用sparksql创建/更改/删除键空间和表。例如,可以使用以下命令在cassandra中创建表:

CREATE TABLE casscatalog.ksname.table_name (
  key_1 Int, 
  key_2 Int, 
  key_3 Int, 
  cc1 STRING, 
  cc2 String, 
  cc3 String, 
  value String) 
USING cassandra
PARTITIONED BY (key_1, key_2, key_3)
TBLPROPERTIES (
    clustering_key='cc1.asc, cc2.desc, cc3.asc',
    compaction='{class=SizeTieredCompactionStrategy,bucket_high=1001}'
)

正如您在这里看到的,您可以指定非常复杂的主键,还可以指定表选项。这个 casscatalog piece是一个前缀,链接到特定的cassandra集群(您可以同时使用多个)-它是在启动spark作业时指定的,例如:

spark-shell --packages com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 \
  --conf spark.sql.catalog.casscatalog=com.datastax.spark.connector.datasource.CassandraCatalog

更多的例子可以在文档中找到:

相关问题