在spark cassandra connector 2.11-2.5.1中,我很难在两个Dataframe之间直接连接。我以以下方式启动spark:
spark-2.4.5-bin-hadoop2.6/bin/spark-submit \
--packages com.datastax.spark:spark-cassandra-connector_2.11:2.5.1 \
--conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions
我想加入我的table
CREATE KEYSPACE keyspace WITH replication = {
'class': 'NetworkTopologyStrategy',
'realtime': '3'}
AND durable_writes = false;
CREATE TABLE keyspace.table (
id text,
id_type int,
region_type int,
region_id int,
foreign_id uuid,
PRIMARY KEY ((id, id_type), region_type, region_id)
) WITH CLUSTERING ORDER BY (region_type ASC, region_id ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = {
'keys': 'ALL',
'rows_per_partition': 'NONE'}
AND comment = ''
AND compaction = {
'class': 'SizeTieredCompactionStrategy',
'tombstone_compaction_interval': '86400',
'unchecked_tombstone_compaction': 'true'}
AND compression = {
'chunk_length_in_kb': '16',
'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND crc_check_chance = 1.0
AND dclocal_read_repair_chance = 0.001
AND default_time_to_live = 0
AND gc_grace_seconds = 3600
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.9PERCENTILE';
执行我的测试python
log.error(f"-----------TEST METHOD------------")
log.error(f"--------printing config-------------")
conf_dict = dict(spark_session.sparkContext.getConf().getAll())
log.error(f"spark.sql.extensions = {conf_dict['spark.sql.extensions']}")
log.error(f"--------loading cassandra-------------")
cassandra_table = spark_session\
.read \
.format("org.apache.spark.sql.cassandra") \
.options(table="table",
keyspace="keyspace",
directJoinSetting="on")\
.load()
log.error(f"--------generating test dataframe-------------")
test_data=[('77ce7199-0dd0-11eb-b419-17c19fe60001', 1, 1, 1)]
test_schema = StructType([
StructField("id", StringType()),
StructField("id_type", IntegerType()),
StructField("region_type", IntegerType()),
StructField("region_id", IntegerType())])
test_join_df = spark_session.createDataFrame(test_data, schema=test_schema)
log.error(f"-------printing test dataframe----------")
log.error(f"TYPE : {print_type(test_join_df)}")
test_join_df.explain()
log.error(f"Data types {test_join_df.dtypes}")
test_join_df.show(truncate=False)
log.error(f"-------doing join------")
match_join = test_join_df\
.join(
cassandra_table,
on=["id", "id_type", "region_type", "region_id"],
how="left")
log.error(f"-------printing join results----------")
match_join.explain()
log.error(f"--------------------------------------")
log.error(f"TYPE : {print_type(match_join)}")
log.error(f"Data types {match_join.dtypes}")
log.error(f"------- finished ----------")
但是正如输出中的解释所示
2020-11-06 11:11:44 ERROR __main__:? - -----------TEST METHOD------------
2020-11-06 11:11:44 ERROR __main__:? - --------printing config-------------
2020-11-06 11:11:45 ERROR __main__:? - spark.sql.extensions = com.datastax.spark.connector.CassandraSparkExtensions
2020-11-06 11:11:45 ERROR __main__:? - --------loading cassandra-------------
2020-11-06 11:11:49 ERROR __main__:? - --------generating test dataframe-------------
2020-11-06 11:11:50 ERROR __main__:? - -------printing test dataframe----------
2020-11-06 11:11:50 ERROR __main__:? - TYPE : DataFrame
== Physical Plan ==
Scan ExistingRDD[id#10,id_type#11,region_type#12,region_id#13]
2020-11-06 11:11:50 ERROR __main__:? - Data types [('id', 'string'), ('id_type', 'int'), ('region_type', 'int'), ('region_id', 'int')]
2020-11-06 11:12:06 WARN YarnScheduler:66 - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
+------------------------------------+---------------+-----------+---------+
|id |id_type |region_type|region_id|
+------------------------------------+---------------+-----------+---------+
|77ce7199-0dd0-11eb-b419-17c19fe60001|1 |1 |1 |
+------------------------------------+---------------+-----------+---------+
2020-11-06 11:12:19 ERROR __main__:? - -------doing join------
2020-11-06 11:12:19 ERROR __main__:? - -------printing join results----------
== Physical Plan ==
* (4) Project [id#10, id_type#11, region_type#12, region_id#13, foreign_id#4]
+- SortMergeJoin [id#10, id_type#11, region_type#12, region_id#13], [id#0, id_type#1, region_type#2, region_id#3], Left
:- *(1) Sort [id#10 ASC NULLS FIRST, id_type#11 ASC NULLS FIRST, region_type#12 ASC NULLS FIRST, region_id#13 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#10, id_type#11, region_type#12, region_id#13, 200)
: +- Scan ExistingRDD[id#10,id_type#11,region_type#12,region_id#13]
+- *(3) Sort [id#0 ASC NULLS FIRST, id_type#1 ASC NULLS FIRST, region_type#2 ASC NULLS FIRST, region_id#3 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#0, id_type#1, region_type#2, region_id#3, 200)
+- *(2) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [id#0,id_type#1,region_type#2,region_id#3,foreign_id#4] PushedFilters: [], ReadSchema: struct<id:string,id_type:int,region_type:int,region_id:int,foreign_id:string>
2020-11-06 11:12:20 ERROR __main__:? - --------------------------------------
2020-11-06 11:12:20 ERROR __main__:? - TYPE : DataFrame
2020-11-06 11:12:20 ERROR __main__:? - Data types [('id', 'string'), ('id_type', 'int'), ('region_type', 'int'), ('region_id', 'int'), ('foreign_id', 'string')]
2020-11-06 11:12:20 ERROR __main__:? - ------- finished ----------
它仍在执行全表扫描,而不是cassandra直接连接。是我的测试设置不正确还是遗漏了什么?cassandra中的foreign\u id类型即使不是键的一部分,也是uuid,这有问题吗?
暂无答案!
目前还没有任何答案,快来回答吧!