我在Databricks中使用pyspark编写以下函数:
def expand_category(input_df: DataFrame, category_list: list, column_list: list) -> DataFrame:
'''
@input: dataframe to exapand
@category_list: list of category
@column_list: column that identify the unicity of the record
'''
input_device_id_df = input_df.select(F.col('device_id')).distinct()
category_list = [{'category': i} for i in category_list]
category_df = spark.createDataFrame(Row(**x) for x in category_list)
cross_join_df = input_device_id_df.crossJoin(category_df)
output_df_with_null = cross_join_df.join(input_df, on = column_list)
output_df = output_df_with_null.na.fill(0)
return output_df
编辑:
input_df:pyspark.sql.dataframe.DataFrame
device_id:string
category:string
active_photos:long
attentiont_milliseconds:long
dwell_milliseconds:long
dwell_ots:long
ots:long
total_photos:long
views:long
category_list = ['PERSON', 'VEHICLE']
column_list = ['device_id', 'category']
但是当执行join时,它给了我一个错误:
java.lang.IllegalStateException:意外分区:数据源分区
下面是我数据块配置:
- 数据块版本:10.4 LTS(包括Apache Spark 3.2.1和Scala 2.12)
- 数据块群集:标准_DS3_v2 14Gb双核
下面是错误的完整跟踪:
Py4JJavaError: An error occurred while calling o27515.showString.
: java.lang.IllegalStateException: Unexpected partitioning: DataSourcePartitioning
at org.apache.spark.sql.catalyst.plans.physical.Partitioning.createShuffleSpec(partitioning.scala:245)
at org.apache.spark.sql.catalyst.plans.physical.Partitioning.createShuffleSpec$(partitioning.scala:244)
at org.apache.spark.sql.execution.datasources.v2.DataSourcePartitioning.createShuffleSpec(DataSourcePartitioning.scala:27)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$ensureDistributionAndOrdering$5(EnsureRequirements.scala:129)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$ensureDistributionAndOrdering$5$adapted(EnsureRequirements.scala:124)
at scala.collection.immutable.List.map(List.scala:293)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:124)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:471)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:441)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$2(TreeNode.scala:629)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:167)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:629)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:626)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1226)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1225)
at org.apache.spark.sql.execution.ProjectExec.mapChildren(basicPhysicalOperators.scala:45)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:626)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:626)
at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren(TreeNode.scala:1253)
at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren$(TreeNode.scala:1252)
at org.apache.spark.sql.execution.joins.SortMergeJoinExec.mapChildren(SortMergeJoinExec.scala:39)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:626)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:626)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1226)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1225)
at org.apache.spark.sql.execution.ProjectExec.mapChildren(basicPhysicalOperators.scala:45)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:626)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:626)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1226)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1225)
at org.apache.spark.sql.execution.CollectLimitExec.mapChildren(limit.scala:49)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:626)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:602)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:441)
at com.databricks.sql.optimizer.EnsureRequirementsDP.apply(EnsureRequirementsDP.scala:718)
at com.databricks.sql.optimizer.EnsureRequirementsDP.apply(EnsureRequirementsDP.scala:550)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$.$anonfun$applyPhysicalRules$3(AdaptiveSparkPlanExec.scala:1089)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$.$anonfun$applyPhysicalRules$2(AdaptiveSparkPlanExec.scala:1089)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$.applyPhysicalRules(AdaptiveSparkPlanExec.scala:1088)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$initialPlan$1(AdaptiveSparkPlanExec.scala:293)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.<init>(AdaptiveSparkPlanExec.scala:292)
at org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.applyInternal(InsertAdaptiveSparkPlan.scala:83)
at org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.apply(InsertAdaptiveSparkPlan.scala:48)
at org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.apply(InsertAdaptiveSparkPlan.scala:42)
at org.apache.spark.sql.execution.QueryExecution$.$anonfun$prepareForExecution$2(QueryExecution.scala:596)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
at org.apache.spark.sql.execution.QueryExecution$.$anonfun$prepareForExecution$1(QueryExecution.scala:596)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.execution.QueryExecution$.prepareForExecution(QueryExecution.scala:595)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$2(QueryExecution.scala:232)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:268)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:265)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:265)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:228)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:222)
at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:298)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:361)
at org.apache.spark.sql.execution.QueryExecution.explainStringLocal(QueryExecution.scala:325)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:202)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:386)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:186)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:141)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:336)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3949)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2877)
at org.apache.spark.sql.Dataset.take(Dataset.scala:3084)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:293)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:332)
at sun.reflect.GeneratedMethodAccessor670.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
1条答案
按热度按时间xcitsw881#
我已经尝试了类似的代码与不同的数据
已成功执行