在使用spark readstream读取hdfs文件时记录hdfs文件

w46czmvw  于 2021-05-31  发布在  Hadoop
关注(0)|答案(0)|浏览(310)

我已经编写了一个小程序,它将继续从hdfs目录中读取传入的新文件,并将其流式传输到kafka主题。在继续读取hdfs文件的同时,我想将这些文件记录到我的日志文件中,但我没有找到任何方法。我还想记录文件,一旦它完全进入Kafka主题。在将数据写入kafka主题时,我想将文件名作为头传递给kafka主题。
下面是我用于从hdfs读取文件并将其推送到kafka主题的示例代码。

StructType sch =  DataTypes.createStructType(new StructField[] {
            DataTypes.createStructField("id", DataTypes.StringType, true),
            DataTypes.createStructField("cat", DataTypes.StringType, true)
    });

    Dataset<Row> jsonlines = spark.readStream().schema(sch).json("/data/json/") ;

    StreamingQuery query = jsonlines.selectExpr("to_json(struct(*)) AS value")
            .writeStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option("checkpointLocation", "/checkpoint/topic2/")
            .option("batch.size", 10)
            .option("topic", "topic2").start();

    query.awaitTermination();

你能帮忙吗,我真的很感激。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题