我有一个连接 cleanDF
以及 sentiment_df
使用 array_contains
工作正常(来自解决方案61687997)。我需要包括在 Result
df新列('年')来自 cleanDF
.
这是连接:
from pyspark.sql import functions
Result = cleanDF.join(sentiment_df, expr("""array_contains(MeaningfulWords,word)"""), how='left')\
.groupBy("ID")\
.agg(first("MeaningfulWords").alias("MeaningfulWords")\
,collect_list("score").alias("ScoreList")\
,mean("score").alias("MeanScore"))
这就是 Result
结构:
Result.show(5)
# +------------------+--------------------+--------------------+-----------------+
# | ID| MeaningfulWords| ScoreList| MeanScore|
# +------------------+--------------------+--------------------+-----------------+
# |a0U3Y00000p1IzjUAE|[buen, servicio, ...| [6.39, 1.82]| 4.105|
# |a0U3Y00000p1KhGUAU| [mala]| [2.02]| 2.02|
# |a0U3Y00000p1M1oUAE|[cliente, content...| [6.39, 8.41]| 7.4|
# |a0U3Y00000p1OnTUAU|[positivo, trato,...| [8.2]| 8.19|
# |a0U3Y00000p1R5DUAU|[momento, sido, g...| [6.0]| 6.0|
# +------------------+--------------------+--------------------+-----------------+
我加了一个 .select
(36132322)包括 Year
从 cleanDF
:
Result1 = cleanDF.alias('a').join(sentiment_df.alias('b'), expr("""array_contains(a.MeaningfulWords,b.word)"""), how='left')\
.select(col('a.ID'),col('a.Year'),col('a.MeaningfulWords'),col('b.word'),col('b.score'))\
.groupBy("ID")\
.agg(first("a.MeaningfulWords").alias("MeaningfulWords")\
,collect_list("score").alias("ScoreList")\
,mean("score").alias("MeanScore"))
但是我进去了 Result1
与相同的列 **Result**
:
display(Result1)
# DataFrame[ID: string, MeaningfulWords: array<string>, ScoreList: array<double>, MeanScore: double]
当我试着包括 Year
在 .agg
功能:
Result2 = cleanDF.join(sentiment_df, expr("""array_contains(MeaningfulWords,word)"""), how='left')\
.groupBy("ID")\
.agg(first("MeaningfulWords").alias("MeaningfulWords"),first("Year").alias("Year")\
,collect_list("score").alias("ScoreList")\
,mean("score").alias("MeanScore"))
Result2.show()
Py4JJavaError: An error occurred while calling o3205.showString.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:146)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doExecute(BroadcastNestedLoopJoinExec.scala:343)
...
...
...
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$createTransformFunc$1: (string) => array<string>)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1066)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:109)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:107)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1063)
...
...
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 411.0 failed 1 times, most recent failure: Lost task 2.0 in stage 411.0 (TID 9719, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$5: (array<string>) => array<string>)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1066)
at org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction$class.eval(higherOrderFunctions.scala:208)
at org.apache.spark.sql.catalyst.expressions.ArrayFilter.eval(higherOrderFunctions.scala:296)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
...
...
... 20 more
Caused by: java.lang.NullPointerException
我在spark2.4.5上使用pyspark。
事先谢谢你的帮助。
1条答案
按热度按时间jmo0nnb31#
year列可能有空值&因此它失败了
Caused by: java.lang.NullPointerException
例外。从中筛选所有空值Year
列。