在spark结构化流媒体中,是否可以使用foreachbatch将两个不相交的数据集写入数据同步?

okxuctiv  于 2021-05-24  发布在  Spark
关注(0)|答案(1)|浏览(427)

我正在尝试将数据从单个源写入多个数据链接(mongo和postgres dbs)。传入数据

Dataset<Row> df = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", "topic1")
        .load();

Dataset<Row> personalDetails = df.selectExpr("name", "id", "age");

personalDetails.writeStream()
    .outputMode(OutputMode.Update())
    .foreachBatch((dataframe, bachId) -> {
            dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
                    .option("uri", "mongodb://localhost/employee")
                    .option("database", "employee")
                    .option("collection", "PI").save();
    }).start();

Dataset<Row> salDetails = df.selectExpr("basicSal", "bonus");
salDetails.writeStream()
    .outputMode(OutputMode.Update())
    .foreachBatch((dataframe, bachId) -> {
            dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
                    .option("uri", "mongodb://localhost/employee")
                    .option("database", "employee")
                    .option("collection", "SAL").save();
    }).start();

问题是,我可以看到Spark正在打开两个流并读取相同的事件两次。是否可以读取一次并应用不同的转换和写入不同的集合?

bvjxkvbb

bvjxkvbb1#

你应该缓存Dataframe。请看这里:
向多个位置写入—如果要将流式查询的输出写入多个位置,则只需多次写入输出Dataframe/数据集即可。但是,每次尝试写入都可能导致重新计算输出数据(包括可能重新读取输入数据)。为了避免重新计算,应该缓存输出dataframe/dataset,将其写入多个位置,然后取消缓存。
以及他们的例子:

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.persist()
  batchDF.write.format(...).save(...)  // location 1
  batchDF.write.format(...).save(...)  // location 2
  batchDF.unpersist()
}

你可以把所有的代码放在一个 foreachBatch 并将Dataframe写入两个接收器。可以通过缓存Dataframe并执行 selectExpr 并保存它。
作为旁注-请注意,在任何情况下,如果您想要“全部或全部”(即,您不希望出现您写信给mongo而不是postgres的情况),您只能使用一个 foreachBatch ,否则(如果你有2个 foreachBatch ,如您的问题所示)您有两个独立的批处理—对于相同的数据,一个批处理可能会失败,而另一个批处理成功。

相关问题