Apache Spark 异常错误:不能强制转换GenericRowWithSchema

dgenwo3n  于 2022-11-16  发布在  Apache
关注(0)|答案(1)|浏览(513)
  • 我正在尝试使用具有类型安全 * 检查的agg函数,我为数据集创建了一个case类并定义了其架构
case class JosmSalesRecord(orderId: Long,
                           totalOrderSales : Double,
                           totalOrderCount: Long)
object JosmSalesRecord extends SparkSessionWrapper {
  import sparkSession.implicits._

  val schema: Option[StructType] = Some(StructType(Seq(
    StructField("order_id", IntegerType ,true),
    StructField("total_order_sales",DoubleType,true),

    StructField("total_order_count", IntegerType,true)

  )))

}

数据集

+----------+------------------+---------------+
    |   orderId|   totalOrderSales|totalOrderCount|
    +----------+------------------+---------------+
    |1071131089|            433.68|              8|
    |1071386263|  8848.42000000001|            343|
    |1071439146|108.39999999999999|              8|
    |1071349454|34950.400000000074|            512|
    |1071283654|349.65000000000003|             27|

root
     |-- orderId: long (nullable = false)
     |-- totalOrderSales: double (nullable = false)
     |-- totalOrderCount: long (nullable = false)

我将对数据集应用以下函数。

val pv = aggregateSum.agg(typed.sum[JosmSalesRecord](_.totalOrderSales),
          typed.sumLong[JosmSalesRecord](_.totalOrderCount)).toDF("totalOrderSales", "totalOrderCount")

当应用***pv.show()***时,我将获得

java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to com.pipeline.dataset.JosmSalesRecord
at com.FirstApp$$anonfun$7.apply(FirstApp.scala:78)
at org.apache.spark.sql.execution.aggregate.TypedSumDouble.reduce(typedaggregators.scala:32)
at org.apache.spark.sql.execution.aggregate.TypedSumDouble.reduce(typedaggregators.scala:30)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.agg_doConsume_1$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.serializefromobject_doConsume_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.mapelements_doConsume_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.deserializetoobject_doConsume_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.agg_doAggregateWithKeysOutput_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.agg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

注意--我严格要求使用import org.apache.spark.sql.expressions.scalalang.typed.sum类型安全函数。我在应用import org.apache.spark.sql.functions的和时得到了我的答案

2hh7jdfx

2hh7jdfx1#

如果aggregateSum是 Dataframe ,例如

val aggregateSum = Seq((1071131089L, 433.68, 8),(1071386263L,8848.42000000001,343)).toDF("orderId", "totalOrderSales", "totalOrderCount")

在Spark的当前版本(例如3.x)中,typed被弃用。您仍然可以使用类型安全,例如(Spark 3):

case class OutputRecord(totalOrderSales : Double, totalOrderCount: Long)

val pv = aggregateSum.as[JosmSalesRecord]
            .groupByKey(_ => 1)
            .mapGroups((k,vs) => vs.map((x:JosmSalesRecord) => (x.totalOrderSales, x.totalOrderCount))
            .reduceOption((x, y) => (x._1 + y._1, x._2 + y._2))
            .map{case (x, y) => OutputRecord(x,y)}
            .getOrElse(OutputRecord(0.0, 0L)))
pv.show()

给予

+----------------+---------------+
| totalOrderSales|totalOrderCount|
+----------------+---------------+
|9282.10000000001|            351|
+----------------+---------------+

如果您已经将Scala cats作为依赖项,那么您还可以do something like this

相关问题