我使用postgre作为数据库。我想为每个批捕获一个表数据,并将其转换为parquet文件并存储到s3中。我尝试使用spark和readstream的jdbc选项进行连接,如下所示。。。
val jdbcDF = spark.readStream
.format("jdbc")
.option("url", "jdbc:postgresql://myserver:5432/mydatabase")
.option("dbtable", "database.schema.table")
.option("user", "xxxxx")
.option("password", "xxxxx")
.load()
但它抛出了不受支持的例外
Exception in thread "main" java.lang.UnsupportedOperationException: Data source jdbc does not support streamed reading
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:234)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
at examples.SparkJDBCStreaming$.delayedEndpoint$examples$SparkJDBCStreaming$1(SparkJDBCStreaming.scala:16)
at examples.SparkJDBCStreaming$delayedInit$body.apply(SparkJDBCStreaming.scala:5)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
我走对了吗?真的不支持数据库作为spark流的数据源吗?
另一种方法是编写一个Kafka制作者将数据发布到Kafka主题中,然后使用spark流。。。
注意:我不想使用Kafka连接,因为我需要做一些辅助转换。
这是唯一的办法吗?
正确的方法是什么?这种事有什么例子吗?请协助!
2条答案
按热度按时间mznpcxlj1#
spark structured streaming没有标准的jdbc源代码,但是您可以编写一个自定义的源代码,但是您应该了解您的表必须有一个唯一的键,您可以通过它来跟踪更改。例如,你可以拿我的实现来说,别忘了给依赖项添加必要的jdbc驱动程序
pzfprimi2#
这个库可能有帮助:jdbc2s。
它提供了jdbc流功能,并且构建在sparkjdbc批处理源代码之上。
基本上,您可以像使用任何其他流媒体源一样使用它,唯一的强制配置是正在使用的表中偏移量列的名称。