融合云kafka连通性问题的spark结构流

w8f9ii69  于 2021-05-18  发布在  Spark
关注(0)|答案(2)|浏览(522)

我正在pyspark中编写一个spark结构化流应用程序,以便在合流云中从kafka读取数据。spark的文档 readstream() 函数太浅,没有在可选参数部分特别是auth机制部分指定太多。我不确定是哪个参数出错并导致连接崩溃。谁能有Spark的经验,帮助我开始这个连接?
必需参数

> Consumer({'bootstrap.servers':
> 'cluster.gcp.confluent.cloud:9092',
>               'sasl.username':'xxx',
>               'sasl.password':  'xxx',
>               'sasl.mechanisms': 'PLAIN',
>               'security.protocol': 'SASL_SSL',
>     'group.id': 'python_example_group_1',
>     'auto.offset.reset': 'earliest' })

这是我的Pypark代码:

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "cluster.gcp.confluent.cloud:9092") \
  .option("subscribe", "test-topic") \
  .option("kafka.sasl.mechanisms", "PLAIN")\
  .option("kafka.security.protocol", "SASL_SSL")\
  .option("kafka.sasl.username","xxx")\
  .option("kafka.sasl.password", "xxx")\
  .option("startingOffsets", "latest")\
  .option("kafka.group.id", "python_example_group_1")\
  .load()
display(df)

但是,我不断得到一个错误:
kafkashaded.org.apache.kafka.common.kafkaexception:构造kafka使用者失败
databrick笔记本-用于测试
https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/4673082066872014/3543014086288496/1802788104169533/latest.html
文档
https://home.apache.org/~pwendell/spark-nightly/spark-branch-2.0-docs/latest/structured-streaming-kafka-integration.html

zour9fqk

zour9fqk1#

此错误表示您的kafka使用者看不到jaas配置。要解决此问题,请包括基于以下步骤的jass:
step01:为下面的jaas文件创建一个文件:/home/jass/path

KafkaClient {
     com.sun.security.auth.module.Krb5LoginModule required
     useTicketCache=true
     renewTicket=true
     serviceName="kafka";
     };

step02:根据下面的conf参数调用spark submit中的jass文件路径。

--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/jass/path"

完整提交命令:

/usr/hdp/2.6.1.0-129/spark2/bin/spark-submit --packages com.databricks:spark-avro_2.11:3.2.0,org.apache.spark:spark-avro_2.11:2.4.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 --conf spark.ui.port=4055 --files /home/jass/path,/home/bdpda/bdpda.headless.keytab --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/jass/path" --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/home/jass/path" pysparkstructurestreaming.py

Pypark结构化流示例代码:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.streaming import StreamingContext
import time

# Spark Streaming context :

spark = SparkSession.builder.appName('PythonStreamingDirectKafkaWordCount').getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 20)

# Kafka Topic Details :

KAFKA_TOPIC_NAME_CONS = "topic_name"
KAFKA_OUTPUT_TOPIC_NAME_CONS = "topic_to_hdfs"
KAFKA_BOOTSTRAP_SERVERS_CONS = 'kafka_server:9093'

# Creating  readstream DataFrame :

df = spark.readStream \
     .format("kafka") \
     .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS) \
     .option("subscribe", KAFKA_TOPIC_NAME_CONS) \
     .option("startingOffsets", "earliest") \
     .option("kafka.security.protocol","SASL_SSL")\
     .option("kafka.client.id" ,"Clinet_id")\
     .option("kafka.sasl.kerberos.service.name","kafka")\
     .option("kafka.ssl.truststore.location", "/home/path/kafka_trust.jks") \
     .option("kafka.ssl.truststore.password", "password_rd") \
     .option("kafka.sasl.kerberos.keytab","/home/path.keytab") \
     .option("kafka.sasl.kerberos.principal","path") \
     .load()

df1 = df.selectExpr( "CAST(value AS STRING)")

# Creating  Writestream DataFrame :

df1.writeStream \
   .option("path","target_directory") \
   .format("csv") \
   .option("checkpointLocation","chkpint_directory") \
   .outputMode("append") \
   .start()

ssc.awaitTermination()
gdrx4gfi

gdrx4gfi2#

我们需要详细说明 kafka.sasl.jaas.config 为合流kafka sasl ssl auth方法添加用户名和密码。它的参数看起来有点奇怪,但它在工作。

df = spark \
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "pkc-43n10.us-central1.gcp.confluent.cloud:9092") \
      .option("subscribe", "wallet_txn_log") \
      .option("startingOffsets", "earliest") \
      .option("kafka.security.protocol","SASL_SSL") \
      .option("kafka.sasl.mechanism", "PLAIN") \
      .option("kafka.sasl.jaas.config", """kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="xxx" password="xxx";""").load()
display(df)

相关问题