从Kafka主题中读取数据并使用spark tempview进行聚合?

ezykj2lf  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(416)

我想从Kafka主题中读取数据,并创建spark tempview以按列分组?

+----+--------------------+
| key|               value|          
+----+--------------------+
|null|{"e":"trade","E":...|
|null|{"e":"trade","E":...|
|null|{"e":"trade","E":...|

但是我不能从tempview聚合数据??值列数据存储为字符串???

Dataset<Row> data = spark
                  .readStream()
                  .format("kafka")
                  .option("kafka.bootstrap.servers", "localhost:9092,localhost:9093")
                  .option("subscribe", "data2-topic")
                  .option("startingOffsets", "latest")
                  .option ("group.id", "test")
                  .option("enable.auto.commit", "true")
                  .option("auto.commit.interval.ms", "1000")          
                  .load();
          data.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
          data.createOrReplaceTempView("Tempdata");
          data.show();
Dataset<Row> df2=spark.sql("SELECT e FROM Tempdata group by e");
df2.show();
tyg4sfes

tyg4sfes1#

值列数据存储为字符串???
对。。因为你 CAST(value as STRING) 你会想用 from_json 函数,该函数将行加载到您可以在其中搜索的适当Dataframe中。
有关一些示例,请参见databrick关于kafka上结构化流媒体的博客
如果主要目标只是对一些字段进行分组,那么ksql可能是另一种选择。

相关问题