spark 2.3结构化流媒体,Kafka卡卡在微博客上

7kjnsjlb  于 2021-05-18  发布在  Spark
关注(0)|答案(0)|浏览(264)

我一直试图从Kafka的一个主题中阅读流,但没有成功。我的Spark代码是相当香草从原始文件。

val df = spark
     .readStream
     .format("kafka")
     .option("kafka.bootstrap.servers", brokers)
     .option("kafka.security.protocol", "SSL")
     .option("kafka.ssl.endpoint.identification.algorithm", "")
     .option("kafka.ssl.truststore.location", "~/.ssl/test.truststore.jks")
     .option("kafka.ssl.truststore.password", KAFKA_SSL_TRUSTSTORE_PASSWORD)
     .option("kafka.ssl.keystore.location", "~/.ssl/test.keystore.jks")
     .option("kafka.ssl.keystore.password", KAFKA_SSL_KEYSTORE_PASSWORD)
     .option("subscribe", KAFKA_INPUT_TOPIC)
     .option("failOnDataLoss", "true")
     .option("startingOffsets","earliest")
     .option("maxOffsetsPerTrigger", 1000)

   val query = df
     .writeStream
     .format("console")
     .outputMode("append")
     .start()

日志并不能说明什么,不管我是否使用truststore和keystore jks文件,它仍然挂起/冻结在同一个地方。

INFO Streaming: query.status: {
  "message" : "Initializing sources",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
INFO AppInfoParser: Kafka version : 0.10.0.1
INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5
INFO MicroBatchExecution: Starting new streaming query.

代理部署在kubernetes pod中,启用了ssl/tls,但是不安全并且没有客户端身份验证。我还尝试使用我自己的信任库和密钥库使用ca根证书,但没有成功。
使用kafka命令行工具(kafkactl)或使用c实现(librdkafka)的其他kafka客户机流式处理相同的主题没有问题。
有什么帮助吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题