使用scalaapark检索存储在dataframe列中的每一行的数组

2o7dmzc5  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(288)

以下Dataframe属于我

+-------------------------------+-------------------------+
|value                          |feeling                  |
+-------------------------------+-------------------------+
|Sam got these marks            |[sad, sad, dissappointed ]|
|I got good marks               |[happy, excited, happy]   |
+-------------------------------+-------------------------+

我想遍历这个Dataframe,得到每行的marks列的数组,并使用marks数组进行一些计算。

def calculationMethod(arrayValue : Array[String]) {
//get averege of words
}

输出Dataframe

+-------------------------------+-----------------------------+--------------
    |value                          |feeling                   |average       |
    +-------------------------------+-----------------------------------------+
    |Sam got these marks            |[sad, sad, dissappointed ]|sad           |
    |I got good marks               |[happy, excited, happy]   |happy         |
    +-------------------------------+-----------------------------------------+

我不确定如何遍历每一行,并在第二列中获得可以传递到我编写的方法中的数组。另外请注意,问题中显示的Dataframe是流Dataframe。
编辑1

val calculateUDF = udf(calculationMethod _)
    val editedDataFrame = filteredDataFrame.withColumn("average", calculateUDF(col("feeling"))) 

def calculationMethod(emojiArray: Seq[String]) : DataFrame {
val existingSparkSession = SparkSession.builder().getOrCreate()
    import existingSparkSession.implicits._
    val df = emojiArray.toDF("feeling")
    val result = df.selectExpr(
      "feeling",
      "'U+' || trim('0' , string(hex(encode(feeling, 'utf-32')))) as unicode"
    )
    result
}

我得到以下错误
不支持org.apache.spark.sql.dataset[org.apache.spark.sql.row]类型的架构
请注意,问题中提到的初始Dataframe是流Dataframe
编辑2
这应该是我期望的最后一个Dataframe

+-------------------+--------------+-------------------------+
    |value              |feeling       |unicode                  |
    +-------------------+--------------+-------------------------+
    |Sam got these marks|[???]     |[U+1F600 U+1F606 U+1F601]|
    |I got good marks   |[??]        | [U+1F604 U+1F643 ]      |
    +-------------------+---------------+-------------------------+
mzmfm0qo

mzmfm0qo1#

你可以 transform 数组而不是使用自定义项:

val df2 = df.withColumn(
    "unicode", 
    expr("transform(feeling, x -> 'U+' || ltrim('0' , string(hex(encode(x, 'utf-32')))))")
)

df2.show(false)
+-------------------+------------+---------------------------+
|value              |feeling     |unicode                    |
+-------------------+------------+---------------------------+
|Sam got these marks|[?, ?, ?]|[U+1F600, U+1F606, U+1F601]|
|I got good marks   |[?, ?]    |[U+1F604, U+1F643]         |
+-------------------+------------+---------------------------+

相关问题