spark streaming正在阅读kafka主题以及如何将嵌套的json格式转换为Dataframe

oxiaedzo  于 2021-05-18  发布在  Spark
关注(0)|答案(1)|浏览(374)

我能够读取Kafka主题的数据,并能够打印在控制台上使用Spark流的数据。
我希望数据是Dataframe格式。
这是我的密码:

spark = SparkSession  \
    .builder  \
    .appName("StructuredSocketRead")  \
    .getOrCreate()
spark.sparkContext.setLogLevel('ERROR') 

lines = spark  \
    .readStream  \
    .format("kafka")  \
    .option("kafka.bootstrap.servers","********")  \
    .option("subscribe","******")  \
    .option("startingOffsets", "earliest")  \
    .load()

readable = lines.selectExpr("CAST(value AS STRING)")

query = readable  \
    .writeStream  \
    .outputMode("append")  \
    .format("console")  \
    .option("truncate", "False")  \
    .start()

query.awaitTermination()

输出为json文件格式。如何将其转换为Dataframe?请查看以下输出:

{"items": [{"SKU": "23565", "title": "EGG CUP MILKMAID HELGA ", "unit_price": 2.46, "quantity": 2}], "type": "ORDER", "country": "United Kingdom", "invoice_no": 154132541847735, "timestamp": "2020-11-02 20:56:01"}
dzjeubhm

dzjeubhm1#

iicu,请用 explode() 以及 getItems() 为了从json中创建Dataframe。。

在此处创建Dataframe

a_json = {"items": [{"SKU": "23565", "title": "EGG CUP MILKMAID HELGA ", "unit_price": 2.46, "quantity": 2}], "type": "ORDER", "country": "United Kingdom", "invoice_no": 154132541847735, "timestamp": "2020-11-02 20:56:01"}
df = spark.createDataFrame([(a_json)])
df.show(truncate=False)
+--------------+---------------+-------------------------------------------------------------------------------------+-------------------+-----+
|country       |invoice_no     |items                                                                                |timestamp          |type |
+--------------+---------------+-------------------------------------------------------------------------------------+-------------------+-----+
|United Kingdom|154132541847735|[[quantity -> 2, unit_price -> 2.46, title -> EGG CUP MILKMAID HELGA , SKU -> 23565]]|2020-11-02 20:56:01|ORDER|
+--------------+---------------+-------------------------------------------------------------------------------------+-------------------+-----+

逻辑在这里

df = df.withColumn("items_array", F.explode("items"))
df = df.withColumn("quantity", df.items_array.getItem("quantity")).withColumn("unit_price", df.items_array.getItem("unit_price")).withColumn("title", df.items_array.getItem("title")).withColumn("SKU", df.items_array.getItem("SKU"))
df.select("country", "invoice_no", "quantity","unit_price", "title", "SKU", "timestamp", "timestamp").show(truncate=False)
+--------------+---------------+--------+----------+-----------------------+-----+-------------------+-------------------+
|country       |invoice_no     |quantity|unit_price|title                  |SKU  |timestamp          |timestamp          |
+--------------+---------------+--------+----------+-----------------------+-----+-------------------+-------------------+
|United Kingdom|154132541847735|2       |2.46      |EGG CUP MILKMAID HELGA |23565|2020-11-02 20:56:01|2020-11-02 20:56:01|
+--------------+---------------+--------+----------+-----------------------+-----+-------------------+-------------------+

相关问题