spark streaming jdbc在数据到达时读取流-数据源jdbc不支持流式读取

rsl1atfo  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(614)

我使用postgre作为数据库。我想为每个批捕获一个表数据,并将其转换为parquet文件并存储到s3中。我尝试使用spark和readstream的jdbc选项进行连接,如下所示。。。

val jdbcDF = spark.readStream
    .format("jdbc")
    .option("url", "jdbc:postgresql://myserver:5432/mydatabase")
    .option("dbtable", "database.schema.table")
    .option("user", "xxxxx")
    .option("password", "xxxxx")
    .load()

但它抛出了不受支持的例外

Exception in thread "main" java.lang.UnsupportedOperationException: Data source jdbc does not support streamed reading
    at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:234)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
    at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
    at examples.SparkJDBCStreaming$.delayedEndpoint$examples$SparkJDBCStreaming$1(SparkJDBCStreaming.scala:16)
    at examples.SparkJDBCStreaming$delayedInit$body.apply(SparkJDBCStreaming.scala:5)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)

我走对了吗?真的不支持数据库作为spark流的数据源吗?
另一种方法是编写一个Kafka制作者将数据发布到Kafka主题中,然后使用spark流。。。
注意:我不想使用Kafka连接,因为我需要做一些辅助转换。
这是唯一的办法吗?
正确的方法是什么?这种事有什么例子吗?请协助!

mznpcxlj

mznpcxlj1#

spark structured streaming没有标准的jdbc源代码,但是您可以编写一个自定义的源代码,但是您应该了解您的表必须有一个唯一的键,您可以通过它来跟踪更改。例如,你可以拿我的实现来说,别忘了给依赖项添加必要的jdbc驱动程序

pzfprimi

pzfprimi2#

这个库可能有帮助:jdbc2s。
它提供了jdbc流功能,并且构建在sparkjdbc批处理源代码之上。
基本上,您可以像使用任何其他流媒体源一样使用它,唯一的强制配置是正在使用的表中偏移量列的名称。

相关问题