spark structured stream在我停止作业之前不会写入文件

5hcedyr0  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(323)

我在一个经典用例中使用spark结构化流:我想从一个kafka主题中读取数据,并将数据流以parquet格式写入hdfs。
这是我的密码:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{ArrayType, DataTypes, StructType}

object TestKafkaReader extends  App{
  val spark = SparkSession
    .builder
    .appName("Spark-Kafka-Integration")
    .master("local")
    .getOrCreate()
  spark.sparkContext.setLogLevel("ERROR")
  import spark.implicits._

  val kafkaDf = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers","KAFKA_BROKER_IP:PORT")
    //.option("subscribe", "test")
    .option("subscribe", "test")
    .option("startingOffsets", "earliest")
    .load()

  val moviesJsonDf = kafkaDf.selectExpr("CAST(value AS STRING)")

  // movie struct
  val struct = new StructType()
    .add("title", DataTypes.StringType)
    .add("year", DataTypes.IntegerType)
    .add("cast", ArrayType(DataTypes.StringType))
    .add("genres", ArrayType(DataTypes.StringType))

  val moviesNestedDf = moviesJsonDf.select(from_json($"value", struct).as("movie"))
  // json flatten
  val movieFlattenedDf = moviesNestedDf.selectExpr("movie.title", "movie.year", "movie.cast","movie.genres")

  // convert to parquet and save to hdfs
  val query = movieFlattenedDf
    .writeStream
    .outputMode("append")
    .format("parquet")
    .queryName("movies")
    .option("checkpointLocation", "src/main/resources/chkpoint_dir")
    .start("src/main/resources/output")
    .awaitTermination()
  }

上下文:
我直接从intellij运行这个(安装了本地spark)
我能够毫无问题地从Kafka那里读到内容,并在控制台中编写(使用控制台模式)
目前我想在本地机器上写这个文件(但是我在hdfs集群上试过,问题是一样的)
我的问题是:
在作业期间,它不会在文件夹中写入任何内容,我必须手动停止作业才能最终看到文件。
我想可能和 .awaitTermination() 作为参考,我试图删除这个选项,但如果没有,我会得到一个错误,作业根本无法运行。
也许我没有设置正确的选项,但在阅读了很多次的文件和谷歌搜索后,我没有找到任何东西。
你能帮我一下吗?
谢谢您
编辑:
我用的是spark 2.4.0
我尝试了64/128mb格式=>在停止作业之前,不会更改任何文件

62lalag4

62lalag41#

是的,问题解决了
我的问题是,我有太少的数据和Spark是等待更多的数据来写Parquet文件。
为了实现这一点,我使用@alexandrosbiratsis的注解(更改块大小)
再次感谢@alexandrosbiratsis非常感谢

相关问题