我能够读取Kafka主题的数据,并能够打印在控制台上使用Spark流的数据。
我希望数据是Dataframe格式。
这是我的密码:
spark = SparkSession \
.builder \
.appName("StructuredSocketRead") \
.getOrCreate()
spark.sparkContext.setLogLevel('ERROR')
lines = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers","********") \
.option("subscribe","******") \
.option("startingOffsets", "earliest") \
.load()
readable = lines.selectExpr("CAST(value AS STRING)")
query = readable \
.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", "False") \
.start()
query.awaitTermination()
输出为json文件格式。如何将其转换为Dataframe?请查看以下输出:
{"items": [{"SKU": "23565", "title": "EGG CUP MILKMAID HELGA ", "unit_price": 2.46, "quantity": 2}], "type": "ORDER", "country": "United Kingdom", "invoice_no": 154132541847735, "timestamp": "2020-11-02 20:56:01"}
1条答案
按热度按时间dzjeubhm1#
iicu,请用
explode()
以及getItems()
为了从json中创建Dataframe。。在此处创建Dataframe
逻辑在这里