我尝试将数据从一个程序发送到一个安全的kafka集群(ibmcloud-cloud-foundry服务上的eventstreams),然后在我的消费者应用程序(spark streaming)中,我尝试从相同的kafka源读取数据。
这是你的名字 Properties
我在制作人内部设置:
def getProperties: Properties = {
val configs = new Properties()
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "kafka-java-console-sample-producer")
configs.put(ProducerConfig.ACKS_CONFIG, "1")
configs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "<url:port for 5 brokers>")
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL")
configs.put(SaslConfigs.SASL_MECHANISM, "PLAIN")
configs.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"token\" password=\"" + "<some apikey here>" + "\";")
configs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2")
configs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, "TLSv1.2")
configs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS")
configs
}
下面是我用来将数据发送到kafka集群的代码:
val producer = new KafkaProducer[String , String](getProperties)
/**Reading the file line by line */
for (line <- file.getLines) {
/**Sending the lines to the $topic inside kafka cluster initialized inside $producer */
val data = new ProducerRecord[String , String](topic , "key" , line)
producer.send(data)
}
我能够确认这确实将数据发送到kafka集群,因为我能够使用ibmcloud提供的grafana度量查看进入集群的数据。
现在在我的spark流代码中,下面是我如何从Kafka源代码中读取的:
val df = spark.readStream
.format("kafka")
.option("subscribe", "raw_weather")
.option("kafka.bootstrap.servers", "<url:port for the same 5 brokers>")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism" , "PLAIN")
.option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"token\" password=\"" + "<that same password given above>" + "\";")
.option("kafka.ssl.protocol", "TLSv1.2")
.option("kafka.ssl.enabled.protocols", "TLSv1.2")
.option("kafka.ssl.endpoint.identification.algorithm", "HTTPS")
.load()
.selectExpr("CAST(value as STRING)")
.as[String]
然后:
val query= df.writeStream
.outputMode(OutputMode.Append())
.foreachBatch((df: DataFrame , id: Long) => {
println(df.count() + " " + id)
})
.trigger(Trigger.ProcessingTime(1))
.start()
query.awaitTermination()
我不知道为什么,但我的spark streaming根本无法从源代码读取数据。当我启动spark流程序时,它在输出中显示:
19/05/19 04:22:28 DEBUG SparkEnv: Using serializer: class org.apache.spark.serializer.JavaSerializer
19/05/19 04:22:28 INFO SparkEnv: Registering MapOutputTracker
19/05/19 04:22:28 INFO SparkEnv: Registering BlockManagerMaster
19/05/19 04:22:28 INFO SparkEnv: Registering OutputCommitCoordinator
0 0
有一次,当我再次运行我的制作人,Spark流仍然在那里 0 0
. 我不确定我在这里写错了什么。
编辑:保持消费者运行超过7小时,仍然没有变化
暂无答案!
目前还没有任何答案,快来回答吧!