我正在尝试连接到Kafka主题的Spark,该主题具有位置数据。我想将df(dataframe结果)转换为键值对,以便将其输出到另一个Kafka主题
Dataset<Row> df = spark.readStream().format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "cab-location")
.option("startingOffsets", "earliest").load();
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.map(new MapFunction<Row, Tuple2<String,String>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(Row value) throws Exception {
Gson g = new Gson();
CabLocationData cabLocationData = g.fromJson(value.getString(1), CabLocationData.class);
return new Tuple2<String, String>(value.getString(0), cabLocationData.getCabName());
}
}, Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
.map(new MapFunction<Tuple2<String,String>, Row>(){
private static final long serialVersionUID = 1L;
@Override
public Row call(Tuple2<String, String> value) throws Exception {
return RowFactory.create(value._1.toString(), value._2);
}
}, Encoders.javaSerialization(Row.class));
当我检查df.columns()时,它只显示1列值。
你能帮我把数据集转换成2列1列是key,1列是value吗?这样我就可以把它作为键值对推送到另一个Kafka主题了。
1条答案
按热度按时间juzqafwq1#
谢谢@OneCricketeer的建议。下面是工作的代码
如前所述,添加了.withColumn来添加元组column_1和_2中的列键和值