如何使用kafka为azure eventhub格式化pyspark连接字符串

1sbrub3j  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(567)

我正在尝试使用pyspark从启用了kafka兼容性的azure eventhub解析json消息。我找不到任何关于如何建立连接的文档。

import os
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json

sc.stop() # Jupyter somehow created a context already.. 
sc = SparkContext(appName="PythonTest")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 60)

# my connection string:

# Endpoint=sb://example.servicebus.windows.net/;SharedAccessKeyName=examplekeyname;SharedAccessKey=HERETHEJEY=;EntityPath=examplepathname - has a total of 5 partitions

kafkaStream = KafkaUtils.createStream(HOW DO I STRUCTURE THIS??)
parsed = kafkaStream.map(lambda v: json.loads(v[1]))
parsed.count().map(lambda x:'Messages in this batch: %s' % x).pprint()
ssc.start()
ssc.awaitTermination()
gv8xihay

gv8xihay1#

请看我的答案(和问题)。这是关于如何在pyspark中写入支持kafka的事件中心的,但我假设读取配置应该非常类似。最棘手的部分是正确的安全配置。

EH_SASL = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=****";'
// Source: https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/tutorials/spark#running-spark

dfKafka \
.write  \
.format("kafka") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", EH_SASL) \
.option("kafka.batch.size", 5000) \
.option("kafka.bootstrap.servers", "myeventhub.servicebus.windows.net:9093") \
.option("kafka.request.timeout.ms", 120000) \
.option("topic", "raw") \
.option("checkpointLocation", "/mnt/telemetry/cp.txt") \
.save()

你可以在这里找到任何关于如何设置消费者的官方教程。它是针对scala而不是pyspark的,但是如果您将其与我的示例进行比较,那么转换代码就相当容易了。

相关问题