我正在pyspark中编写一个spark结构化流应用程序,以便在合流云中从kafka读取数据。spark的文档 readstream()
函数太浅,没有在可选参数部分特别是auth机制部分指定太多。我不确定是哪个参数出错并导致连接崩溃。谁能有Spark的经验,帮助我开始这个连接?
必需参数
> Consumer({'bootstrap.servers':
> 'cluster.gcp.confluent.cloud:9092',
> 'sasl.username':'xxx',
> 'sasl.password': 'xxx',
> 'sasl.mechanisms': 'PLAIN',
> 'security.protocol': 'SASL_SSL',
> 'group.id': 'python_example_group_1',
> 'auto.offset.reset': 'earliest' })
这是我的Pypark代码:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "cluster.gcp.confluent.cloud:9092") \
.option("subscribe", "test-topic") \
.option("kafka.sasl.mechanisms", "PLAIN")\
.option("kafka.security.protocol", "SASL_SSL")\
.option("kafka.sasl.username","xxx")\
.option("kafka.sasl.password", "xxx")\
.option("startingOffsets", "latest")\
.option("kafka.group.id", "python_example_group_1")\
.load()
display(df)
但是,我不断得到一个错误:
kafkashaded.org.apache.kafka.common.kafkaexception:构造kafka使用者失败
databrick笔记本-用于测试
https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/4673082066872014/3543014086288496/1802788104169533/latest.html
文档
https://home.apache.org/~pwendell/spark-nightly/spark-branch-2.0-docs/latest/structured-streaming-kafka-integration.html
2条答案
按热度按时间zour9fqk1#
此错误表示您的kafka使用者看不到jaas配置。要解决此问题,请包括基于以下步骤的jass:
step01:为下面的jaas文件创建一个文件:/home/jass/path
step02:根据下面的conf参数调用spark submit中的jass文件路径。
完整提交命令:
Pypark结构化流示例代码:
gdrx4gfi2#
我们需要详细说明
kafka.sasl.jaas.config
为合流kafka sasl ssl auth方法添加用户名和密码。它的参数看起来有点奇怪,但它在工作。