我已经编写了一个小程序,它将继续从hdfs目录中读取传入的新文件,并将其流式传输到kafka主题。在继续读取hdfs文件的同时,我想将这些文件记录到我的日志文件中,但我没有找到任何方法。我还想记录文件,一旦它完全进入Kafka主题。在将数据写入kafka主题时,我想将文件名作为头传递给kafka主题。
下面是我用于从hdfs读取文件并将其推送到kafka主题的示例代码。
StructType sch = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("id", DataTypes.StringType, true),
DataTypes.createStructField("cat", DataTypes.StringType, true)
});
Dataset<Row> jsonlines = spark.readStream().schema(sch).json("/data/json/") ;
StreamingQuery query = jsonlines.selectExpr("to_json(struct(*)) AS value")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("checkpointLocation", "/checkpoint/topic2/")
.option("batch.size", 10)
.option("topic", "topic2").start();
query.awaitTermination();
你能帮忙吗,我真的很感激。
暂无答案!
目前还没有任何答案,快来回答吧!