pyspark-joinwithcassandratable无Map重构

aiazj4mn  于 2021-05-18  发布在  Spark
关注(0)|答案(1)|浏览(570)

我刚开始在这里使用spark/scala,在这里重构我的一些代码时遇到了问题。我使用pyspark运行scala2.11,并在spark/yarn设置中运行。以下是工作,但我想清理它,并获得最大的性能从这个。我在别处读到,pyspark udf和lambdas会对性能造成巨大影响,所以我试图减少或删除它们是可能的。


# Reduce ingest df1 data by joining on allowed table df2

to_process = df2\
    .join(
        sf.broadcast(df1),
        df2.secondary_id == df1.secondary_id,
        how="inner")\
    .rdd\
    .map(lambda r: Row(tag=r['tag_id'], user_uuid=r['user_uuid']))

# Type column fixed to type=2, and tag==key

ready_to_join = to_process.map(lambda r: (r[0], 2, r[1]))

# Join with cassandra table to find matches

exists_in_cass = ready_to_join\
    .joinWithCassandraTable(keyspace, table3)\
    .on("user_uuid", "type")\
    .select("user_uuid")

log.error(f"TEST PRINT - [{exists_in_cass.count()}]")

Cassandratable是这样的

CREATE TABLE keyspace.table3 (
    user_uuid uuid,
    type int,
    key text,
    value text,
    PRIMARY KEY (user_uuid, type, key)
) WITH CLUSTERING ORDER BY (type ASC, key ASC)

我现在有

to_process = df2\
    .join(
        sf.broadcast(df1),
        df2.secondary_id == df1.secondary_id,
        how="inner")\
    .select(col("user_uuid"), col("tag_id").alias("tag"))

ready_to_join = to_process\
        .withColumn("type", sf.lit(2))\
        .select('user_uuid', 'type', col('tag').alias("key"))\
        .rdd\
        .map(lambda x: Row(x))

# planning on using repartitionByCassandraReplica here after I get it logically working

exists_in_cass = ready_to_join\
        .joinWithCassandraTable(keyspace, table3)\
        .on("user_uuid", "type")\
        .select("user_uuid")

log.error(f"TEST PRINT - [{exists_in_cass.count()}]")

但我会犯这样的错误

2020-10-30 15:10:42 WARN  TaskSetManager:66 - Lost task 148.0 in stage 22.0 (TID ----, ---, executor 9): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)
    at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)

从任何星火大师那里寻求帮助,指出我在这里做的任何蠢事。

更新

由于alex的建议,使用spark-cassandra连接器v2.5+提供了直接连接Dataframe的能力。我更新了代码,改用这个。

to_process = df2\
    .join(
        sf.broadcast(df1),
        df2.secondary_id == df1.secondary_id,
        how="inner")\
    .select(col("user_uuid"), col("tag_id").alias("tag"))

ready_to_join = to_process\
        .withColumn("type", sf.lit(2))\
        .select(col('user_uuid').alias('c1_user_uuid'), 'type', col('tag').alias("key"))\

cass_table = spark_session
        .read \
    .format("org.apache.spark.sql.cassandra") \
    .options(table=config.table, keyspace=config.keyspace) \
    .load()

exists_in_cass = ready_to_join\
        .join(
            cass_table,
            [(cass_table["user_uuid"] == ready_to_join["c1_user_uuid"]) &
             (cass_table["key"]  == ready_to_join["key"]) &
             (cass_table["type"] == ready_to_join["type"])])\
        .select(col("c1_user_uuid").alias("user_uuid"))

exists_in_cass.explain()
log.error(f"TEST PRINT - [{exists_in_cass.count()}]")

据我所知,理论上这应该快得多!但是我在运行时遇到了错误,数据库超时了。

WARN  TaskSetManager:66 - Lost task 827.0 in stage 12.0 (TID 9946, , executor 4): java.io.IOException: Exception during execution of SELECT "user_uuid", "key" FROM "keyspace"."table3" WHERE token("user_uuid") > ? AND token("user_uuid") <= ? AND "type" = ?   ALLOW FILTERING: Query timed out after PT2M

TaskSetManager:66 - Lost task 125.0 in stage 12.0 (TID 9215, , executor 7): com.datastax.oss.driver.api.core.DriverTimeoutException: Query timed out after PT2M

etc

我有Spark设置配置,以允许Spark扩展

--packages mysql:mysql-connector-java:5.1.47,com.datastax.spark:spark-cassandra-connector_2.11:2.5.1  \

--conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions \

spark的dag显示所有节点都已完全耗尽。我应该在运行join之前对数据进行分区吗?
对此的解释也没有显示直接连接(解释的代码比上面的代码片段多)

== Physical Plan ==

* (6) Project [c1_user_uuid#124 AS user_uuid#158]

+- *(6) SortMergeJoin [c1_user_uuid#124, key#125L], [user_uuid#129, cast(key#131 as bigint)], Inner
   :- *(3) Sort [c1_user_uuid#124 ASC NULLS FIRST, key#125L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(c1_user_uuid#124, key#125L, 200)
   :     +- *(2) Project [id#0 AS c1_user_uuid#124, tag_id#101L AS key#125L]
   :        +- *(2) BroadcastHashJoin [secondary_id#60], [secondary_id#100], Inner, BuildRight
   :           :- *(2) Filter (isnotnull(secondary_id#60) && isnotnull(id#0))
   :           :  +- InMemoryTableScan [secondary_id#60, id#0], [isnotnull(secondary_id#60), isnotnull(id#0)]
   :           :        +- InMemoryRelation [secondary_id#60, id#0], StorageLevel(disk, memory, deserialized, 1 replicas)
   :           :              +- *(7) Project [secondary_id#60, id#0]
   :           :                 +- Generate explode(split(secondary_ids#1, \|)), [id#0], false, [secondary_id#60]
   :           :                    +- *(6) Project [id#0, secondary_ids#1]
   :           :                       +- *(6) SortMergeJoin [id#0], [guid#46], Inner
   :           :                          :- *(2) Sort [id#0 ASC NULLS FIRST], false, 0
   :           :                          :  +- Exchange hashpartitioning(id#0, 200)
   :           :                          :     +- *(1) Filter (isnotnull(id#0) && id#0 RLIKE [0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12})
   :           :                          :        +- InMemoryTableScan [id#0, secondary_ids#1], [isnotnull(id#0), id#0 RLIKE [0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12}]
   :           :                          :              +- InMemoryRelation [id#0, secondary_ids#1], StorageLevel(disk, memory, deserialized, 1 replicas)
   :           :                          :                    +- Exchange RoundRobinPartitioning(3840)
   :           :                          :                       +- *(1) Filter AtLeastNNulls(n, id#0,secondary_ids#1)
   :           :                          :                          +- *(1) FileScan csv [id#0,secondary_ids#1] Batched: false, Format: CSV, Location: InMemoryFileIndex[inputdata_file, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,secondary_ids:string>
   :           :                          +- *(5) Sort [guid#46 ASC NULLS FIRST], false, 0
   :           :                             +- Exchange hashpartitioning(guid#46, 200)
   :           :                                +- *(4) Filter (guid#46 RLIKE [0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12} && isnotnull(guid#46))
   :           :                                   +- Generate explode(set_guid#36), false, [guid#46]
   :           :                                      +- *(3) Project [set_guid#36]
   :           :                                         +- *(3) Filter (isnotnull(allowed#39) && (allowed#39 = 1))
   :           :                                            +- *(3) FileScan orc whitelist.whitelist1[set_guid#36,region#39,timestamp#43] Batched: false, Format: ORC, Location: PrunedInMemoryFileIndex[hdfs://file, PartitionCount: 1, PartitionFilters: [isnotnull(timestamp#43), (timestamp#43 = 18567)], PushedFilters: [IsNotNull(region), EqualTo(region,1)], ReadSchema: struct<set_guid:array<string>,region:int>
   :           +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
    FROM TAG as T
    JOIN MAP as M
    ON T.tag_id = M.tag_id
    WHERE (expire >= NOW() OR expire IS NULL)
    ORDER BY T.tag_id) AS subset) [numPartitions=1] [secondary_id#100,tag_id#101L] PushedFilters: [*IsNotNull(secondary_id), *IsNotNull(tag_id)], ReadSchema: struct<secondary_id:string,tag_id:bigint>
   +- *(5) Sort [user_uuid#129 ASC NULLS FIRST, cast(key#131 as bigint) ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(user_uuid#129, cast(key#131 as bigint), 200)
         +- *(4) Project [user_uuid#129, key#131]
            +- *(4) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [user_uuid#129,key#131] PushedFilters: [*EqualTo(type,2)], ReadSchema: struct<user_uuid:string,key:string>

我没有得到直接连接工作,这是造成超时。

更新2

我认为这并不是解决直接连接的问题,因为Dataframe中的数据类型已关闭。特别是uuid类型

7uzetpgm

7uzetpgm1#

我建议使用spark cassandra connector(scc)2.5.x或3.0.x(发布公告),其中包含dataframe与cassandra连接的实现,而不是将rdd api与Pypark一起使用—在这种情况下,您不需要深入到rdd,只需使用普通的dataframe api连接即可。
请注意,这在默认情况下未启用,因此您需要启动 pyspark 或者 spark-submit 特殊配置,如:

pyspark --packages com.datastax.spark:spark-cassandra-connector_2.11:2.5.1 \
   --conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions

您可以在我最近关于这个主题的博客文章中找到更多关于与cassandra连接的信息(尽管它使用scala,但dataframe部分应该被翻译成几乎一对一的pyspark)

相关问题