我刚开始在这里使用spark/scala,在这里重构我的一些代码时遇到了问题。我使用pyspark运行scala2.11,并在spark/yarn设置中运行。以下是工作,但我想清理它,并获得最大的性能从这个。我在别处读到,pyspark udf和lambdas会对性能造成巨大影响,所以我试图减少或删除它们是可能的。
# Reduce ingest df1 data by joining on allowed table df2
to_process = df2\
.join(
sf.broadcast(df1),
df2.secondary_id == df1.secondary_id,
how="inner")\
.rdd\
.map(lambda r: Row(tag=r['tag_id'], user_uuid=r['user_uuid']))
# Type column fixed to type=2, and tag==key
ready_to_join = to_process.map(lambda r: (r[0], 2, r[1]))
# Join with cassandra table to find matches
exists_in_cass = ready_to_join\
.joinWithCassandraTable(keyspace, table3)\
.on("user_uuid", "type")\
.select("user_uuid")
log.error(f"TEST PRINT - [{exists_in_cass.count()}]")
Cassandratable是这样的
CREATE TABLE keyspace.table3 (
user_uuid uuid,
type int,
key text,
value text,
PRIMARY KEY (user_uuid, type, key)
) WITH CLUSTERING ORDER BY (type ASC, key ASC)
我现在有
to_process = df2\
.join(
sf.broadcast(df1),
df2.secondary_id == df1.secondary_id,
how="inner")\
.select(col("user_uuid"), col("tag_id").alias("tag"))
ready_to_join = to_process\
.withColumn("type", sf.lit(2))\
.select('user_uuid', 'type', col('tag').alias("key"))\
.rdd\
.map(lambda x: Row(x))
# planning on using repartitionByCassandraReplica here after I get it logically working
exists_in_cass = ready_to_join\
.joinWithCassandraTable(keyspace, table3)\
.on("user_uuid", "type")\
.select("user_uuid")
log.error(f"TEST PRINT - [{exists_in_cass.count()}]")
但我会犯这样的错误
2020-10-30 15:10:42 WARN TaskSetManager:66 - Lost task 148.0 in stage 22.0 (TID ----, ---, executor 9): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)
at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
从任何星火大师那里寻求帮助,指出我在这里做的任何蠢事。
更新
由于alex的建议,使用spark-cassandra连接器v2.5+提供了直接连接Dataframe的能力。我更新了代码,改用这个。
to_process = df2\
.join(
sf.broadcast(df1),
df2.secondary_id == df1.secondary_id,
how="inner")\
.select(col("user_uuid"), col("tag_id").alias("tag"))
ready_to_join = to_process\
.withColumn("type", sf.lit(2))\
.select(col('user_uuid').alias('c1_user_uuid'), 'type', col('tag').alias("key"))\
cass_table = spark_session
.read \
.format("org.apache.spark.sql.cassandra") \
.options(table=config.table, keyspace=config.keyspace) \
.load()
exists_in_cass = ready_to_join\
.join(
cass_table,
[(cass_table["user_uuid"] == ready_to_join["c1_user_uuid"]) &
(cass_table["key"] == ready_to_join["key"]) &
(cass_table["type"] == ready_to_join["type"])])\
.select(col("c1_user_uuid").alias("user_uuid"))
exists_in_cass.explain()
log.error(f"TEST PRINT - [{exists_in_cass.count()}]")
据我所知,理论上这应该快得多!但是我在运行时遇到了错误,数据库超时了。
WARN TaskSetManager:66 - Lost task 827.0 in stage 12.0 (TID 9946, , executor 4): java.io.IOException: Exception during execution of SELECT "user_uuid", "key" FROM "keyspace"."table3" WHERE token("user_uuid") > ? AND token("user_uuid") <= ? AND "type" = ? ALLOW FILTERING: Query timed out after PT2M
TaskSetManager:66 - Lost task 125.0 in stage 12.0 (TID 9215, , executor 7): com.datastax.oss.driver.api.core.DriverTimeoutException: Query timed out after PT2M
etc
我有Spark设置配置,以允许Spark扩展
--packages mysql:mysql-connector-java:5.1.47,com.datastax.spark:spark-cassandra-connector_2.11:2.5.1 \
--conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions \
spark的dag显示所有节点都已完全耗尽。我应该在运行join之前对数据进行分区吗?
对此的解释也没有显示直接连接(解释的代码比上面的代码片段多)
== Physical Plan ==
* (6) Project [c1_user_uuid#124 AS user_uuid#158]
+- *(6) SortMergeJoin [c1_user_uuid#124, key#125L], [user_uuid#129, cast(key#131 as bigint)], Inner
:- *(3) Sort [c1_user_uuid#124 ASC NULLS FIRST, key#125L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(c1_user_uuid#124, key#125L, 200)
: +- *(2) Project [id#0 AS c1_user_uuid#124, tag_id#101L AS key#125L]
: +- *(2) BroadcastHashJoin [secondary_id#60], [secondary_id#100], Inner, BuildRight
: :- *(2) Filter (isnotnull(secondary_id#60) && isnotnull(id#0))
: : +- InMemoryTableScan [secondary_id#60, id#0], [isnotnull(secondary_id#60), isnotnull(id#0)]
: : +- InMemoryRelation [secondary_id#60, id#0], StorageLevel(disk, memory, deserialized, 1 replicas)
: : +- *(7) Project [secondary_id#60, id#0]
: : +- Generate explode(split(secondary_ids#1, \|)), [id#0], false, [secondary_id#60]
: : +- *(6) Project [id#0, secondary_ids#1]
: : +- *(6) SortMergeJoin [id#0], [guid#46], Inner
: : :- *(2) Sort [id#0 ASC NULLS FIRST], false, 0
: : : +- Exchange hashpartitioning(id#0, 200)
: : : +- *(1) Filter (isnotnull(id#0) && id#0 RLIKE [0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12})
: : : +- InMemoryTableScan [id#0, secondary_ids#1], [isnotnull(id#0), id#0 RLIKE [0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12}]
: : : +- InMemoryRelation [id#0, secondary_ids#1], StorageLevel(disk, memory, deserialized, 1 replicas)
: : : +- Exchange RoundRobinPartitioning(3840)
: : : +- *(1) Filter AtLeastNNulls(n, id#0,secondary_ids#1)
: : : +- *(1) FileScan csv [id#0,secondary_ids#1] Batched: false, Format: CSV, Location: InMemoryFileIndex[inputdata_file, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,secondary_ids:string>
: : +- *(5) Sort [guid#46 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(guid#46, 200)
: : +- *(4) Filter (guid#46 RLIKE [0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12} && isnotnull(guid#46))
: : +- Generate explode(set_guid#36), false, [guid#46]
: : +- *(3) Project [set_guid#36]
: : +- *(3) Filter (isnotnull(allowed#39) && (allowed#39 = 1))
: : +- *(3) FileScan orc whitelist.whitelist1[set_guid#36,region#39,timestamp#43] Batched: false, Format: ORC, Location: PrunedInMemoryFileIndex[hdfs://file, PartitionCount: 1, PartitionFilters: [isnotnull(timestamp#43), (timestamp#43 = 18567)], PushedFilters: [IsNotNull(region), EqualTo(region,1)], ReadSchema: struct<set_guid:array<string>,region:int>
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
FROM TAG as T
JOIN MAP as M
ON T.tag_id = M.tag_id
WHERE (expire >= NOW() OR expire IS NULL)
ORDER BY T.tag_id) AS subset) [numPartitions=1] [secondary_id#100,tag_id#101L] PushedFilters: [*IsNotNull(secondary_id), *IsNotNull(tag_id)], ReadSchema: struct<secondary_id:string,tag_id:bigint>
+- *(5) Sort [user_uuid#129 ASC NULLS FIRST, cast(key#131 as bigint) ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(user_uuid#129, cast(key#131 as bigint), 200)
+- *(4) Project [user_uuid#129, key#131]
+- *(4) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [user_uuid#129,key#131] PushedFilters: [*EqualTo(type,2)], ReadSchema: struct<user_uuid:string,key:string>
我没有得到直接连接工作,这是造成超时。
更新2
我认为这并不是解决直接连接的问题,因为Dataframe中的数据类型已关闭。特别是uuid类型
1条答案
按热度按时间7uzetpgm1#
我建议使用spark cassandra connector(scc)2.5.x或3.0.x(发布公告),其中包含dataframe与cassandra连接的实现,而不是将rdd api与Pypark一起使用—在这种情况下,您不需要深入到rdd,只需使用普通的dataframe api连接即可。
请注意,这在默认情况下未启用,因此您需要启动
pyspark
或者spark-submit
特殊配置,如:您可以在我最近关于这个主题的博客文章中找到更多关于与cassandra连接的信息(尽管它使用scala,但dataframe部分应该被翻译成几乎一对一的pyspark)