Spark:将Dataset转换< Row>为要在Kafka中添加的键和值

cbwuti44  于 2023-05-28  发布在  Apache
关注(0)|答案(1)|浏览(112)

我正在尝试连接到Kafka主题的Spark,该主题具有位置数据。我想将df(dataframe结果)转换为键值对,以便将其输出到另一个Kafka主题

Dataset<Row> df = spark.readStream().format("kafka")
            .option("kafka.bootstrap.servers", "kafka:9092")
            .option("subscribe", "cab-location")
            .option("startingOffsets", "earliest").load();
    df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    .map(new MapFunction<Row, Tuple2<String,String>>() {

        private static final long serialVersionUID = 1L;

        @Override
        public Tuple2<String, String> call(Row value) throws Exception {
            Gson g = new Gson();  
            CabLocationData cabLocationData = g.fromJson(value.getString(1), CabLocationData.class);
            return new Tuple2<String, String>(value.getString(0), cabLocationData.getCabName());
        }
    }, Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
    .map(new MapFunction<Tuple2<String,String>, Row>(){

        private static final long serialVersionUID = 1L;

        @Override
        public Row call(Tuple2<String, String> value) throws Exception {
            return RowFactory.create(value._1.toString(), value._2);
        }
    }, Encoders.javaSerialization(Row.class));

当我检查df.columns()时,它只显示1列值。
你能帮我把数据集转换成2列1列是key,1列是value吗?这样我就可以把它作为键值对推送到另一个Kafka主题了。

juzqafwq

juzqafwq1#

谢谢@OneCricketeer的建议。下面是工作的代码
如前所述,添加了.withColumn来添加元组column_1和_2中的列键和值

Dataset<Row> df = spark.readStream().format("kafka")
                .option("kafka.bootstrap.servers", "kafka:9092")
                .option("subscribe", "cab-location")
                .option("startingOffsets", "earliest").load();
        Dataset<Tuple2<String, String>> df2 = df
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
        .map(new MapFunction<Row, Tuple2<String,String>>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2<String, String> call(Row value) throws Exception {
                Gson g = new Gson();  
                CabLocationData cabLocationData = g.fromJson(value.getString(1), CabLocationData.class);
                return new Tuple2<String, String>(value.getString(0), cabLocationData.getCabName());
            }
        }, Encoders.tuple(Encoders.STRING(), Encoders.STRING()));
        
        df2
        .withColumn("key", df2.col("_1"))
        .withColumn("value", df2.col("_2"))
        .writeStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", "kafka:9092")
        .option("checkpointLocation", "C:\\Workspace\\Java\\app\\ch")
        .option("topic", "location-output")
        .outputMode("append").start().awaitTermination();

相关问题