我正在尝试使用pyspark从启用了kafka兼容性的azure eventhub解析json消息。我找不到任何关于如何建立连接的文档。
import os
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
sc.stop() # Jupyter somehow created a context already..
sc = SparkContext(appName="PythonTest")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 60)
# my connection string:
# Endpoint=sb://example.servicebus.windows.net/;SharedAccessKeyName=examplekeyname;SharedAccessKey=HERETHEJEY=;EntityPath=examplepathname - has a total of 5 partitions
kafkaStream = KafkaUtils.createStream(HOW DO I STRUCTURE THIS??)
parsed = kafkaStream.map(lambda v: json.loads(v[1]))
parsed.count().map(lambda x:'Messages in this batch: %s' % x).pprint()
ssc.start()
ssc.awaitTermination()
1条答案
按热度按时间gv8xihay1#
请看我的答案(和问题)。这是关于如何在pyspark中写入支持kafka的事件中心的,但我假设读取配置应该非常类似。最棘手的部分是正确的安全配置。
你可以在这里找到任何关于如何设置消费者的官方教程。它是针对scala而不是pyspark的,但是如果您将其与我的示例进行比较,那么转换代码就相当容易了。