我一直试图从Kafka的一个主题中阅读流,但没有成功。我的Spark代码是相当香草从原始文件。
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.endpoint.identification.algorithm", "")
.option("kafka.ssl.truststore.location", "~/.ssl/test.truststore.jks")
.option("kafka.ssl.truststore.password", KAFKA_SSL_TRUSTSTORE_PASSWORD)
.option("kafka.ssl.keystore.location", "~/.ssl/test.keystore.jks")
.option("kafka.ssl.keystore.password", KAFKA_SSL_KEYSTORE_PASSWORD)
.option("subscribe", KAFKA_INPUT_TOPIC)
.option("failOnDataLoss", "true")
.option("startingOffsets","earliest")
.option("maxOffsetsPerTrigger", 1000)
val query = df
.writeStream
.format("console")
.outputMode("append")
.start()
日志并不能说明什么,不管我是否使用truststore和keystore jks文件,它仍然挂起/冻结在同一个地方。
INFO Streaming: query.status: {
"message" : "Initializing sources",
"isDataAvailable" : false,
"isTriggerActive" : false
}
INFO AppInfoParser: Kafka version : 0.10.0.1
INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5
INFO MicroBatchExecution: Starting new streaming query.
代理部署在kubernetes pod中,启用了ssl/tls,但是不安全并且没有客户端身份验证。我还尝试使用我自己的信任库和密钥库使用ca根证书,但没有成功。
使用kafka命令行工具(kafkactl)或使用c实现(librdkafka)的其他kafka客户机流式处理相同的主题没有问题。
有什么帮助吗?
暂无答案!
目前还没有任何答案,快来回答吧!