数据块pyspark中的JOIN给予“非法状态异常:意外分区:数据源分区”错误

oknrviil  于 2022-11-30  发布在  Apache
关注(0)|答案(1)|浏览(111)

我在Databricks中使用pyspark编写以下函数:

def expand_category(input_df: DataFrame, category_list: list, column_list: list) -> DataFrame:
    '''
    @input: dataframe to exapand
    @category_list: list of category
    @column_list: column that identify the unicity of the record
    '''
    
    input_device_id_df = input_df.select(F.col('device_id')).distinct()
    category_list = [{'category': i} for i in category_list]
    category_df = spark.createDataFrame(Row(**x) for x in category_list)
    cross_join_df = input_device_id_df.crossJoin(category_df)
    output_df_with_null = cross_join_df.join(input_df, on = column_list)
    output_df = output_df_with_null.na.fill(0)
    return output_df

编辑:

input_df:pyspark.sql.dataframe.DataFrame
  device_id:string
  category:string
  active_photos:long
  attentiont_milliseconds:long
  dwell_milliseconds:long
  dwell_ots:long
  ots:long
  total_photos:long
  views:long

category_list = ['PERSON', 'VEHICLE']
column_list = ['device_id', 'category']

但是当执行join时,它给了我一个错误:
java.lang.IllegalStateException:意外分区:数据源分区
下面是我数据块配置:

  • 数据块版本:10.4 LTS(包括Apache Spark 3.2.1和Scala 2.12)
  • 数据块群集:标准_DS3_v2 14Gb双核

下面是错误的完整跟踪:

Py4JJavaError: An error occurred while calling o27515.showString.
: java.lang.IllegalStateException: Unexpected partitioning: DataSourcePartitioning
    at org.apache.spark.sql.catalyst.plans.physical.Partitioning.createShuffleSpec(partitioning.scala:245)
    at org.apache.spark.sql.catalyst.plans.physical.Partitioning.createShuffleSpec$(partitioning.scala:244)
    at org.apache.spark.sql.execution.datasources.v2.DataSourcePartitioning.createShuffleSpec(DataSourcePartitioning.scala:27)
    at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$ensureDistributionAndOrdering$5(EnsureRequirements.scala:129)
    at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$ensureDistributionAndOrdering$5$adapted(EnsureRequirements.scala:124)
    at scala.collection.immutable.List.map(List.scala:293)
    at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:124)
    at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:471)
    at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:441)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$2(TreeNode.scala:629)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:167)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:629)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:626)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1226)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1225)
    at org.apache.spark.sql.execution.ProjectExec.mapChildren(basicPhysicalOperators.scala:45)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:626)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:626)
    at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren(TreeNode.scala:1253)
    at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren$(TreeNode.scala:1252)
    at org.apache.spark.sql.execution.joins.SortMergeJoinExec.mapChildren(SortMergeJoinExec.scala:39)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:626)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:626)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1226)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1225)
    at org.apache.spark.sql.execution.ProjectExec.mapChildren(basicPhysicalOperators.scala:45)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:626)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:626)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1226)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1225)
    at org.apache.spark.sql.execution.CollectLimitExec.mapChildren(limit.scala:49)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:626)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:602)
    at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:441)
    at com.databricks.sql.optimizer.EnsureRequirementsDP.apply(EnsureRequirementsDP.scala:718)
    at com.databricks.sql.optimizer.EnsureRequirementsDP.apply(EnsureRequirementsDP.scala:550)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$.$anonfun$applyPhysicalRules$3(AdaptiveSparkPlanExec.scala:1089)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$.$anonfun$applyPhysicalRules$2(AdaptiveSparkPlanExec.scala:1089)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:91)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$.applyPhysicalRules(AdaptiveSparkPlanExec.scala:1088)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$initialPlan$1(AdaptiveSparkPlanExec.scala:293)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.<init>(AdaptiveSparkPlanExec.scala:292)
    at org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.applyInternal(InsertAdaptiveSparkPlan.scala:83)
    at org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.apply(InsertAdaptiveSparkPlan.scala:48)
    at org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.apply(InsertAdaptiveSparkPlan.scala:42)
    at org.apache.spark.sql.execution.QueryExecution$.$anonfun$prepareForExecution$2(QueryExecution.scala:596)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
    at org.apache.spark.sql.execution.QueryExecution$.$anonfun$prepareForExecution$1(QueryExecution.scala:596)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:91)
    at org.apache.spark.sql.execution.QueryExecution$.prepareForExecution(QueryExecution.scala:595)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$2(QueryExecution.scala:232)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:268)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:265)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:265)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:228)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:222)
    at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:298)
    at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:361)
    at org.apache.spark.sql.execution.QueryExecution.explainStringLocal(QueryExecution.scala:325)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:202)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:386)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:186)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:141)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:336)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3949)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2877)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:3084)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:293)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:332)
    at sun.reflect.GeneratedMethodAccessor670.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)
xcitsw88

xcitsw881#

我已经尝试了类似的代码与不同的数据

    • 数据块配置:**
  • 数据块版本:11.3 LTS ML(包括Apache Spark 3.3.0和Scala 2.12)
  • 数据块群集:标准_DS3_v2 14Gb 4核
    • Dataframe df的随机数据**

    • expand_category函数定义**
def expand_category(input_df: df, category_list: list, column_list: list) -> df:
    '''
    @input: dataframe to exapand
    @category_list: list of category
    @column_list: column that identify the unicity of the record
    '''
    input_device_id_df = input_df.select(F.col('device_id')).distinct()
    category_list = [{'category': i} for i in category_list]
    category_df = spark.createDataFrame(Row(**x) for x in category_list)
    cross_join_df = input_device_id_df.crossJoin(category_df)
    output_df_with_null = cross_join_df.join(input_df, on = column_list)
    output_df = output_df_with_null.na.fill(0)
    return output_df

已成功执行

相关问题