如何使用Azure Event Hub将消息从数据块流式传输到Kafka客户端[已关闭]

2lpgd968  于 2023-01-29  发布在  Apache
关注(0)|答案(1)|浏览(162)

已关闭。此问题需要超过focused。当前不接受答案。
**想要改进此问题吗?**更新此问题,使其仅关注editing this post的一个问题。

9小时前关门了。
Improve this question
我有一个从Kafka队列读取数据并写入DWH的进程。Kafka队列当前正在从Java应用程序接收数据,该应用程序从本地存储读取数据并写入Kafka队列。
我们需要实施以下内容:
1.使用Azure存储帐户替换本地存储(完成)
1.用Azure事件中心替换Kafka队列
1.用Databricks简单作业替换Java应用程序,该作业使用Autloader从Azure DataLake执行readStream,并写入Azure事件中心
约束:不能更改Kafka客户端使用者,而不能更改其连接字符串。
现在,好消息是Azure Event Hubs符合Kafka标准(假设每条消息的json主体小于10Kb),因此我的问题是如何配置此架构。

  • A)应如何配置Azure EH以使其符合其消费者的Kafka标准?
  • B)我应该使用Kafka协议也从Databricks发送消息,或者我可以使用它作为Azure事件中心信任的事实,它暴露自己与Kafka接口的消费者,并与事件中心接口的发送者?
  • C)我可以在哪里从消费者那里获取要使用的Kafka端点,除了新的连接字符串之外,我还应该注意什么?在侦听策略中,我找到了主键、连接字符串和SAS策略ARM ID,但我不确定如何将它们转换为Kafka端点
e5nqia27

e5nqia271#

要使用Kafka协议中的EventHubs,您只需正确配置Kafka选项。您需要:

  • 我们需要获取Shared Access Signatures (SAS)以向事件中心主题进行身份验证-它应类似于Endpoint=sb://<....>.windows.net/;?...,并将用作密码。出于安全原因,建议将其放入Databricks机密作用域(使用您的实际值更新变量secret_scopesecret_name)。
  • 我们需要构造正确的字符串(eh_sasl变量)用于SASL(简单身份验证和安全层)身份验证-我们使用静态值$ConnectionString作为用户名,和事件中心SAS用作密码。SASL字符串在数据块上看起来有点不同-它应该以kafkashaded.作为前缀,而不是org.apache.kafka.common.security.plain.PlainLoginModule...,因为原始Java包被着色以避免与其他包冲突。
  • 您需要提供事件中心名称空间&主题的名称,以便在eh_namespace_nametopic_name变量中从中读取数据。
secret_scope = "scope"
secret_name = "eventhub_sas"
topic_name = "topic1"
eh_namespace_name = "<eh-ns-name>"
readConnectionString = dbutils.secrets.get(secret_scope, secret_name)
eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
    + f' required username="$ConnectionString" password="{readConnectionString}";'
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
kafka_options = {
     "kafka.bootstrap.servers": bootstrap_servers,
     "kafka.sasl.mechanism": "PLAIN",
     "kafka.security.protocol": "SASL_SSL",
     "kafka.request.timeout.ms": "60000",
     "kafka.session.timeout.ms": "30000",
     "startingOffsets": "earliest",
     "kafka.sasl.jaas.config": eh_sasl,
     "subscribe": topic_name,
}
df = spark.readStream.format("kafka") \ 
    .options(**kafka_options).load()

使用类似的配置完成写入。

from pyspark.sql.functions import struct, to_json

# work with your dataframe
kafka_options = {
     "kafka.bootstrap.servers": bootstrap_servers,
     "kafka.sasl.mechanism": "PLAIN",
     "kafka.security.protocol": "SASL_SSL",
     "kafka.sasl.jaas.config": eh_sasl,
     "topic": topic_name,
}
df.select(to_json(struct("*")).alias("value")) \
  .write.format("kafka").options(**kafka_options).save()

有关Spark + Kafka的更多详细信息,请参阅SparkDatabricks文档。

相关问题