我想知道如何将Confluent Cloud连接到数据块。我想从confluent读取数据到spark dataframe。
我使用了这个代码:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", confluentBootstrapserver) \
.option("kafka.security.protocol", "SSL") \
.option("subscribe", confluentTopic) \
.option("startingOffsets", "earliest") \
.option("kafka.sasl.jaas.config",
"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required
username\"**********\" password = \"******************************\";").load()
字符串
我在用户名中使用了API密钥,在密码中使用了密钥。并在confluentTopic中提供了主题名称。
我得到各种类型的错误“java.util.concurrent.ExecutionException:kafkashaded.org.apache.kafka.common.errors.TimeoutException:等待节点分配超时。调用:describeTopics”。在此之前,我得到消费者不能创建。我是新来的,所以请详细说明你的答案。
1条答案
按热度按时间nkkqxpd91#
您可以使用下面的代码块。
字符串
在集群选项卡中创建新密钥,如下所示。
的数据
复制并保存API密钥和密码。
接下来,您需要在
kafka.sasl.jaas.config
spark选项中给予这些密钥和秘密,而不是像下面这样提供用户名和密码。我认为你给了同样的,它在我的环境中工作。型
接下来,创建一个函数来从二进制类型中获取结果并执行udf寄存器。
型
显示结果。
型
输出量:
的
在Spark
的
检查您是否提供了正确的端口
9092
,可能是您的批处理数据太大,或者从confluent访问数据块可能有问题。