我有一份工作要读Kafka的主题。但是,在subscribe to主题上,作业不是将数据写入控制台,也不是使用foreach writer将数据转储到数据库。
我有课 DBWriter extends ForeachWriter<Row>
仍然是 open, process, close
从未调用此类的方法。
如果你需要更多的信息,请告诉我。
按照spark kafka集成指南的说明进行操作。它仍然不起作用。
spark版本2.3.1Kafka0.10.0
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.3.1</version>
</dependency>
我的代码是:
spark.readStream().format("kafka").option.option("kafka.bootstrap.servers", "YOUR.HOST:PORT1,YOUR.HOST:PORT2")
.option("subscribe", "TOPIC1")
.option("startingOffsets", "latest") // read data from the end of the stream
.load()
以及
Dataset<Row> selectDf = dataframe.select(dataframe.col("key")
.cast("string"),org.apache.spark.sql.functions.from_json(dataframe.col("value")
.cast("string"), schema).alias("data"));
selectDf.writeStream()
.trigger(Trigger.ProcessingTime(1000))
.foreach(new DBWriterSink())
.option("checkpointLocation","/tmp/chp_path/")
输入数据的格式如下:
数据采用json格式:
{"input_source_data":
{ "key1":"value1",
"key2": "value2"
}
}
1条答案
按热度按时间lmyy7pcs1#
实际问题是由于Kafka配置设置不正确。主题订阅不成功,握手失败。正确修正Kafka属性后。为了能够读取数据,它额外设置了这些属性。取出后,它开始工作。能够阅读消息并看到foreachwriter也被调用。
properties.put("security.protocol", "SSL");