spark structured streaming foreach sink自定义编写器无法从kafka主题读取数据

monwx1rj  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(364)

我有一份工作要读Kafka的主题。但是,在subscribe to主题上,作业不是将数据写入控制台,也不是使用foreach writer将数据转储到数据库。
我有课 DBWriter extends ForeachWriter<Row> 仍然是 open, process, close 从未调用此类的方法。
如果你需要更多的信息,请告诉我。
按照spark kafka集成指南的说明进行操作。它仍然不起作用。
spark版本2.3.1Kafka0.10.0

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
  <version>2.3.1</version>
</dependency>

我的代码是:

spark.readStream().format("kafka").option.option("kafka.bootstrap.servers", "YOUR.HOST:PORT1,YOUR.HOST:PORT2")   
  .option("subscribe", "TOPIC1")    
  .option("startingOffsets", "latest") // read data from the end of the stream
  .load()

以及

Dataset<Row> selectDf = dataframe.select(dataframe.col("key")
  .cast("string"),org.apache.spark.sql.functions.from_json(dataframe.col("value")
  .cast("string"), schema).alias("data"));

selectDf.writeStream()
  .trigger(Trigger.ProcessingTime(1000))
  .foreach(new DBWriterSink())
  .option("checkpointLocation","/tmp/chp_path/")

输入数据的格式如下:
数据采用json格式:

{"input_source_data": 
    { "key1":"value1", 
    "key2": "value2"
     } 
    }
lmyy7pcs

lmyy7pcs1#

实际问题是由于Kafka配置设置不正确。主题订阅不成功,握手失败。正确修正Kafka属性后。为了能够读取数据,它额外设置了这些属性。取出后,它开始工作。能够阅读消息并看到foreachwriter也被调用。 properties.put("security.protocol", "SSL");

相关问题