在线上有很多从kafka读取json的例子(写入parquet),但我不知道如何将模式应用于kafka的csv字符串。
流数据:
customer_1945,cusaccid_995,27999941
customer_1459,cusaccid_1102,27999942
架构:
schema = StructType() \
.add("customer_id",StringType()) \
.add("customer_acct_id",StringType()) \
.add("serv_acct_id",StringType())
读取流:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "xx.xx.xx.xx:9092") \
.option("subscribe", "test") \
.load()
我用这个来表示json:
interval=df \
.select(from_json(col("value").cast("string"), schema).alias("json")) \
.select("json.*")
在使用指定的模式将其写入parquet之前:
query=interval \
.writeStream \
.format("parquet") \
.option("checkpointLocation", "/user/whatever/checkpoint24") \
.start("/user/ehatever/interval24")
因为我不能将from_json()用于csv-我不知道如何将模式应用于Dataframe,以便使用类似的writestream()命令。
1条答案
按热度按时间gblwokeq1#
我就是这样做的。不使用from\u json,提取csv字符串:
然后把它分成几列。可以使用上面相同的语句将其写入Parquet文件