来自pyspark中kafka流数据的Dataframe

zbq4xfa0  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(232)

是否可以“存储”或处理流输出作为Dataframe,以创建数据的实时分析/摘要?
我有以下代码从Kafka消费者那里捕获数据:

import findspark
findspark.init("/home/apps/spark")

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

sc.stop()
sc = SparkContext(appName="KafkaStreaming-0")
ssc = StreamingContext(sc, 5)

kafkaParams = {"metadata.broker.list": "localhost:9090"}
directKafkaStream = KafkaUtils.createDirectStream(ssc, ["MyTopic"], kafkaParams)

def update_func(new_val, last_sum):
    return sum(new_val) + (last_sum or 0)

checkpointDir = "file:///home/spark/checkpoint"
ssc.checkpoint(checkpointDir) 

lines = directKafkaStream.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split("\t")) \
              .map(lambda word: (word, 1)) \
              .updateStateByKey(update_func)

counts.pprint()
ssc.start()

它返回以下内容:

-------------------------------------------
Time: 2021-04-17 15:47:10
-------------------------------------------
('551GEF,Category_A', 1)
('558PSX,Category_B', 1)
('512SED,Category_B', 1)

我想按“类别”创建一个计数摘要:

+-----------+------------+
|Category   |Total_Count |
+-----------+------------+
|Category_A |   1        |
+-----------+------------+
|Category_B |   2        |
+-----------+------------+

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题