scala生成的代码超过64kib,spark结构化流应用程序中出现编译错误

ckocjqey  于 2021-07-13  发布在  Spark
关注(0)|答案(0)|浏览(333)

我运行了下面的spark结构化流应用程序,但是生成的代码超过了64kib并且发生了编译错误。

ERROR CodeGenerator: failed to compile: org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass" in "generated.java": Code of method "expand_doConsume_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage3;Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/apache/spark/sql/catalyst/InternalRow;ZJZ)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3" grows beyond 64 KB
def window(timeColumn: Column, windowDuration: String, slideDuration: String): Column

但是,如果我将slideduration(从1秒增加到2秒)值,则不会显示上述错误消息。
这是一个简单的代码,但我不明白为什么生成的代码超过64kib。
您可以通过此链接找到整个错误消息(gist)
代码

def inputSchema: StructType = StructType(
    Seq(
      StructField("timestamp", LongType, nullable = false),
      StructField("missingInfo", LongType, nullable = true),
      StructField("jobId", LongType, nullable = false),
      StructField("taskId", LongType, nullable = false),
      StructField("machineId", LongType, nullable = true),
      StructField("eventType", IntegerType, nullable = false),
      StructField("userId", IntegerType, nullable = true),
      StructField("category", IntegerType, nullable = true),
      StructField("priority", IntegerType, nullable = false),
      StructField("cpu", FloatType, nullable = true),
      StructField("ram", FloatType, nullable = true),
      StructField("disk", FloatType, nullable = true),
      StructField("constraints", IntegerType, nullable = true),
    )
  )
ss.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "test")
      .option("enable.auto.commit", "false")
      .option("startingOffsets", "earliest")
      .option("maxOffsetsPerTrigger", "1000")
      .load()
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .select(col("key"), from_csv(col("value"), inputSchema, Map("delimiter" -> ",")).as("task_event"))
      .withColumn("event_time", (col("task_event.timestamp") / 1000 + startTraceTime).cast(TimestampType))
      .where("task_event.eventType == 1")
      .dropDuplicates("key")
      .withWatermark("event_time", "60 seconds")
      .groupBy(
        window(col("event_time"), "60 seconds", "1 second"),
        col("task_event.jobId")
      ).agg(avg("task_event.cpu").as("avgCpu"))
      .writeStream
      .format("console")
      .option("checkpointLocation", "/tmp/checkpoint")
      .start()
      .awaitTermination()

环境
操作系统:ubuntu 18.04
java:11.0.9版本
斯卡拉:2.12.12
Spark:3.0.1

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题