以下Dataframe属于我
+-------------------------------+-------------------------+
|value |feeling |
+-------------------------------+-------------------------+
|Sam got these marks |[sad, sad, dissappointed ]|
|I got good marks |[happy, excited, happy] |
+-------------------------------+-------------------------+
我想遍历这个Dataframe,得到每行的marks列的数组,并使用marks数组进行一些计算。
def calculationMethod(arrayValue : Array[String]) {
//get averege of words
}
输出Dataframe
+-------------------------------+-----------------------------+--------------
|value |feeling |average |
+-------------------------------+-----------------------------------------+
|Sam got these marks |[sad, sad, dissappointed ]|sad |
|I got good marks |[happy, excited, happy] |happy |
+-------------------------------+-----------------------------------------+
我不确定如何遍历每一行,并在第二列中获得可以传递到我编写的方法中的数组。另外请注意,问题中显示的Dataframe是流Dataframe。
编辑1
val calculateUDF = udf(calculationMethod _)
val editedDataFrame = filteredDataFrame.withColumn("average", calculateUDF(col("feeling")))
def calculationMethod(emojiArray: Seq[String]) : DataFrame {
val existingSparkSession = SparkSession.builder().getOrCreate()
import existingSparkSession.implicits._
val df = emojiArray.toDF("feeling")
val result = df.selectExpr(
"feeling",
"'U+' || trim('0' , string(hex(encode(feeling, 'utf-32')))) as unicode"
)
result
}
我得到以下错误
不支持org.apache.spark.sql.dataset[org.apache.spark.sql.row]类型的架构
请注意,问题中提到的初始Dataframe是流Dataframe
编辑2
这应该是我期望的最后一个Dataframe
+-------------------+--------------+-------------------------+
|value |feeling |unicode |
+-------------------+--------------+-------------------------+
|Sam got these marks|[???] |[U+1F600 U+1F606 U+1F601]|
|I got good marks |[??] | [U+1F604 U+1F643 ] |
+-------------------+---------------+-------------------------+
1条答案
按热度按时间mzmfm0qo1#
你可以
transform
数组而不是使用自定义项: