我试图从kafka主题推断动态json模式。在blog中找到这段代码,它使用pyspark推断模式。
def read_kafka_topic(topic):
df_json = (spark.read
.format("kafka")
.option("kafka.bootstrap.servers", kafka_broker)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.option("failOnDataLoss", "false")
.load()
.withColumn("value", expr("string(value)"))
.filter(col("value").isNotNull())
.select("key", expr("struct(offset, value) r"))
.groupBy("key").agg(expr("max(r) r"))
.select("r.value"))
df_read = spark.read.json(
df_json.rdd.map(lambda x: x.value), multiLine=True)**
试用scala:
**val df_read = spark.read.json(df_json.rdd.map(x=>x))**
但我的错误正在减少。
无法应用于(org.apache.spark.rdd.rdd[org.apache.spark.sql.row])val df琰read=spark.read.json(df琰json.rdd.map(x=>x))
有什么办法吗?请帮忙。
1条答案
按热度按时间wnavrhmk1#
结构化流媒体不支持rdd。
结构化流不允许模式推断。
需要定义架构。
e、 g.对于文件源
e、 g.Kafka的情况就像数据瑞克教你的那样