scala结构的spark流多写

cyvaqqii  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(410)

我正在使用一个数据流来写入kafka主题以及hbase。对于Kafka,我使用如下格式:

dataset.selectExpr("id as key", "to_json(struct(*)) as value")
        .writeStream.format("kafka")
        .option("kafka.bootstrap.servers", Settings.KAFKA_URL)
        .option("topic", Settings.KAFKA_TOPIC2)
        .option("checkpointLocation", "/usr/local/Cellar/zookeepertmp")
        .outputMode(OutputMode.Complete())
        .start()

然后对于hbase,我会这样做:

dataset.writeStream.outputMode(OutputMode.Complete())
    .foreach(new ForeachWriter[Row] {
      override def process(r: Row): Unit = {
        //my logic
      }

      override def close(errorOrNull: Throwable): Unit = {}

      override def open(partitionId: Long, version: Long): Boolean = {
        true
      }
    }).start().awaitTermination()

这会像预期的那样写入hbase,但并不总是写入kafka主题。我不知道为什么会这样。

ukqbszuj

ukqbszuj1#

使用 foreachBatch Spark中:
如果要将流式查询的输出写入多个位置,则只需多次写入输出dataframe/dataset即可。但是,每次尝试写入都可能导致重新计算输出数据(包括可能重新读取输入数据)。为了避免重新计算,应该缓存输出dataframe/dataset,将其写入多个位置,然后取消缓存。这是一个提纲。

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    batchDF.persist()
    batchDF.write.format(…).save(…) // location 1
    batchDF.write.format(…).save(…) // location 2
    batchDF.unpersist()
}

相关问题