scala 我怎样才能连接到jdbc作为数据库中的流源

yzuktlbb  于 2023-03-02  发布在  Scala
关注(0)|答案(1)|浏览(167)

使用https://github.com/sutugin/spark-streaming-jdbc-source中的示例,我尝试连接到Postgres数据库作为AWS数据库中的流源。
我的群集正在运行:11.3 LTS(包括Apache Spark 3.3.0和Scala 2.12)
此库安装在我的群集上:apache. spark网站:Spark流_2.12:3.3.2

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark = SparkSession
.builder
.appName("StructuredJDBC")
.getOrCreate()

import spark.implicits._

val jdbcOptions = Map(
"user" -> "myusername",
"password" -> "mypassword",
"database" -> "testDB",
"driver" -> "org.postgresql.Driver",
"url" -> "jdbc:postgresql://dbhostname:5432:mem:myDb;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false"
)

// Create DataFrame representing the stream of input lines from jdbc
val stream = spark.readStream
.format("jdbc-streaming")
.options(jdbcOptions + ("dbtable" -> "dimensions_test_table") + ("offsetColumn" -> "loaded_timestamp"))
.load

// Start running the query that prints 'select result' to the console
val query = stream.writeStream
.outputMode("append")
.format("console")
.start()

query.awaitTermination()

但我被一个错误所困扰:未找到类定义错误:org/apache/spark/sql/源代码/v2/流写入支持原因:未找到类异常:org.apache.spark.sql.sources.v2.StreamWriteSupport
我能找到的有关此错误的唯一信息似乎不适用于我的情况。我错过了什么?
我也找过其他库,但这似乎是Scala 2.12上唯一支持jdbc作为源代码的库。

72qzrwbm

72qzrwbm1#

这里有几个问题:

  • 您不需要在Databricks集群上安装org.apache.spark:spark-streaming_2.12:3.3.2库。Databricks运行时包括所有必需的Spark库,并且通过安装开源版本,您很可能会破坏特定于Databricks的修改。
  • 要使用这个库,你需要自己编译它并安装到集群上,但正如我所看到的,它有4年没有更新了,默认情况下,它是为Spark 3.0编译的(与DBR7.3匹配)。

如果要从数据库获取更改,可以查看更改数据捕获功能,例如CDC for RDS MySQL。然后,可以将数据放置到S3,并使用Delta Live Tables implementing CDC pattern进行拾取。

相关问题