结构化流kafka先写最新文件,即使禁用了latestfirst

zbq4xfa0  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(282)

我有一份工作,在那里我阅读了一些来自s3/alluxio的parquets,转换成avro,从schema registry捕获schema,并将其放入kafka。为此,在我的writer中,我使用trigger.once()。
但由于某种原因,当我检查我的Kafka主题时,第一次写的记录来自最后一次谈判。
即使禁用最新的第一个选项,也会发生这种情况。
我怎样才能解决这个问题?在窗口点菜?
求求你,救命!

val dataFrame = spark.readStream
              .option("mergeSchema", "true")
              .option("maxFilesPerTrigger", 5)
              .schema(dfSchema)
            //  .parquet(s"alluxio://tenants/table/*")
              .parquet(s"s3a://tenants/table/*")
    dataFrame.withColumn("cdcTime", to_timestamp(col("cdcTime")))

val avroDF = dataFrame.
  .withColumn("key", to_confluent_avro(col("key"), getSchemaRegistryConfigKey(table)))
  .withColumn("value", when(col("Op") === "D", lit(null))
  .otherwise(to_confluent_avro(col("value"),
  valueJsonAvroSchema,
  getSchemaRegistryConfigValue(table))))
  .drop("Op")

   avroDF.writeStream
      .format("kafka")
      .queryName(s"${table.schema}/${table.name}")
      .option("kafka.bootstrap.servers", "mybrokerUrl")
      .option("topic", table.topic)
      .trigger(Trigger.Once())
      .option("checkpointLocation", s"checkpoints/${table.schema}/${table.name}")
      .start()
      .awaitTermination()

我用abris把我的parquert转换成avro

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题