使用dataframes的pyspark直接连接

cs7cruho  于 2021-05-18  发布在  Spark
关注(0)|答案(0)|浏览(307)

在spark cassandra connector 2.11-2.5.1中,我很难在两个Dataframe之间直接连接。我以以下方式启动spark:

spark-2.4.5-bin-hadoop2.6/bin/spark-submit \
      --packages com.datastax.spark:spark-cassandra-connector_2.11:2.5.1  \
      --conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions

我想加入我的table

CREATE KEYSPACE keyspace WITH replication = {
    'class': 'NetworkTopologyStrategy',
    'realtime': '3'}
AND durable_writes = false;

CREATE TABLE keyspace.table (
    id text,
    id_type int,
    region_type int,
    region_id int,
    foreign_id uuid,
    PRIMARY KEY ((id, id_type), region_type, region_id)
) WITH CLUSTERING ORDER BY (region_type ASC, region_id ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {
        'keys': 'ALL',
        'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {
        'class': 'SizeTieredCompactionStrategy',
        'tombstone_compaction_interval': '86400',
        'unchecked_tombstone_compaction': 'true'}
    AND compression = {
        'chunk_length_in_kb': '16',
        'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.001
    AND default_time_to_live = 0
    AND gc_grace_seconds = 3600
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99.9PERCENTILE';

执行我的测试python

log.error(f"-----------TEST METHOD------------")
log.error(f"--------printing config-------------")
conf_dict = dict(spark_session.sparkContext.getConf().getAll())
log.error(f"spark.sql.extensions = {conf_dict['spark.sql.extensions']}")

log.error(f"--------loading cassandra-------------")
cassandra_table = spark_session\
    .read \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="table",
             keyspace="keyspace",
             directJoinSetting="on")\
    .load()

log.error(f"--------generating test dataframe-------------")
test_data=[('77ce7199-0dd0-11eb-b419-17c19fe60001', 1, 1, 1)]
test_schema = StructType([
    StructField("id", StringType()),
    StructField("id_type", IntegerType()),
    StructField("region_type", IntegerType()),
    StructField("region_id", IntegerType())])
test_join_df = spark_session.createDataFrame(test_data, schema=test_schema)

log.error(f"-------printing test dataframe----------")
log.error(f"TYPE : {print_type(test_join_df)}")
test_join_df.explain()
log.error(f"Data types {test_join_df.dtypes}")
test_join_df.show(truncate=False)

log.error(f"-------doing join------")
match_join = test_join_df\
    .join(
        cassandra_table,
        on=["id", "id_type", "region_type", "region_id"],
        how="left")

log.error(f"-------printing join results----------")
match_join.explain()
log.error(f"--------------------------------------")
log.error(f"TYPE : {print_type(match_join)}")
log.error(f"Data types {match_join.dtypes}")
log.error(f"------- finished ----------")

但是正如输出中的解释所示

2020-11-06 11:11:44 ERROR __main__:? - -----------TEST METHOD------------
2020-11-06 11:11:44 ERROR __main__:? - --------printing config-------------
2020-11-06 11:11:45 ERROR __main__:? - spark.sql.extensions = com.datastax.spark.connector.CassandraSparkExtensions
2020-11-06 11:11:45 ERROR __main__:? - --------loading cassandra-------------
2020-11-06 11:11:49 ERROR __main__:? - --------generating test dataframe-------------
2020-11-06 11:11:50 ERROR __main__:? - -------printing test dataframe----------
2020-11-06 11:11:50 ERROR __main__:? - TYPE : DataFrame

== Physical Plan ==
Scan ExistingRDD[id#10,id_type#11,region_type#12,region_id#13]

2020-11-06 11:11:50 ERROR __main__:? - Data types [('id', 'string'), ('id_type', 'int'), ('region_type', 'int'), ('region_id', 'int')]

2020-11-06 11:12:06 WARN  YarnScheduler:66 - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
+------------------------------------+---------------+-----------+---------+
|id                                  |id_type        |region_type|region_id|
+------------------------------------+---------------+-----------+---------+
|77ce7199-0dd0-11eb-b419-17c19fe60001|1              |1          |1        |
+------------------------------------+---------------+-----------+---------+

2020-11-06 11:12:19 ERROR __main__:? - -------doing join------

2020-11-06 11:12:19 ERROR __main__:? - -------printing join results----------
== Physical Plan ==

* (4) Project [id#10, id_type#11, region_type#12, region_id#13, foreign_id#4]

+- SortMergeJoin [id#10, id_type#11, region_type#12, region_id#13], [id#0, id_type#1, region_type#2, region_id#3], Left
   :- *(1) Sort [id#10 ASC NULLS FIRST, id_type#11 ASC NULLS FIRST, region_type#12 ASC NULLS FIRST, region_id#13 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#10, id_type#11, region_type#12, region_id#13, 200)
   :     +- Scan ExistingRDD[id#10,id_type#11,region_type#12,region_id#13]
   +- *(3) Sort [id#0 ASC NULLS FIRST, id_type#1 ASC NULLS FIRST, region_type#2 ASC NULLS FIRST, region_id#3 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#0, id_type#1, region_type#2, region_id#3, 200)
         +- *(2) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [id#0,id_type#1,region_type#2,region_id#3,foreign_id#4] PushedFilters: [], ReadSchema: struct<id:string,id_type:int,region_type:int,region_id:int,foreign_id:string>

2020-11-06 11:12:20 ERROR __main__:? - --------------------------------------
2020-11-06 11:12:20 ERROR __main__:? - TYPE : DataFrame
2020-11-06 11:12:20 ERROR __main__:? - Data types [('id', 'string'), ('id_type', 'int'), ('region_type', 'int'), ('region_id', 'int'), ('foreign_id', 'string')]
2020-11-06 11:12:20 ERROR __main__:? - ------- finished ----------

它仍在执行全表扫描,而不是cassandra直接连接。是我的测试设置不正确还是遗漏了什么?cassandra中的foreign\u id类型即使不是键的一部分,也是uuid,这有问题吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题