pyspark 如何将Confluent Cloud连接到数据砖

amrnrhlw  于 2023-08-02  发布在  Spark
关注(0)|答案(1)|浏览(160)

我想知道如何将Confluent Cloud连接到数据块。我想从confluent读取数据到spark dataframe。
我使用了这个代码:

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", confluentBootstrapserver) \
  .option("kafka.security.protocol", "SSL") \
  .option("subscribe", confluentTopic) \
  .option("startingOffsets", "earliest") \
  .option("kafka.sasl.jaas.config",
  "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required
   username\"**********\" password = \"******************************\";").load()

字符串
我在用户名中使用了API密钥,在密码中使用了密钥。并在confluentTopic中提供了主题名称。
我得到各种类型的错误“java.util.concurrent.ExecutionException:kafkashaded.org.apache.kafka.common.errors.TimeoutException:等待节点分配超时。调用:describeTopics”。在此之前,我得到消费者不能创建。我是新来的,所以请详细说明你的答案。

nkkqxpd9

nkkqxpd91#

您可以使用下面的代码块。

from pyspark.sql.functions import *
from pyspark.sql.types import *
confluentBootstrapserver = "your_bootstrao_server"
confluentTopic = "topic_0"
apiKey="api_key"
apiSecret="api_secret"

字符串
在集群选项卡中创建新密钥,如下所示。


的数据
复制并保存API密钥和密码。
接下来,您需要在kafka.sasl.jaas.config spark选项中给予这些密钥和秘密,而不是像下面这样提供用户名和密码。我认为你给了同样的,它在我的环境中工作。

df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", confluentBootstrapserver) \
        .option("kafka.security.protocol", "SASL_SSL") \
        .option("kafka.sasl.mechanism", "PLAIN") \
        .option("kafka.sasl.jaas.config", f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{apiKey}" password="{apiSecret}";') \
        .option("subscribe", confluentTopic) \
        .option("startingOffsets", "earliest") \
        .load()


接下来,创建一个函数来从二进制类型中获取结果并执行udf寄存器。

def conv_bin(x):
    import base64
    import json
    j = json.loads(base64.b64decode(str(x)).decode('utf-8'))
    return j
    
get_string = udf(conv_bin, StringType())


显示结果。

display(df.withColumn("key",get_string(base64(col("key")))).withColumn("result",get_string(base64(col("value")))).select("key","result","topic"))


输出量:



在Spark



检查您是否提供了正确的端口9092,可能是您的批处理数据太大,或者从confluent访问数据块可能有问题。

相关问题