我正在尝试读取Kafka的数据,并使用spark将其上传到greenplum数据库。我使用的是greenplum spark Connector,但获取的数据源io.pivotal.greenplum.spark.greenplumrelationprovider不支持流式写入。是因为greenplum源不支持流数据吗?我可以在网站上看到“连续etl管道(流)”。
我已经尝试将datasource作为“greenplum”和“io.pivotal.greenplum.spark.greenplumrelationprovider”转换为.format(“datasource”)
val EventStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", args(0))
.option("subscribe", args(1))
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
.load
val gscWriteOptionMap = Map(
"url" -> "link for greenplum",
"user" -> "****",
"password" -> "****",
"dbschema" -> "dbname"
)
val stateEventDS = EventStream
.selectExpr("CAST(key AS String)", "*****(value)")
.as[(String,******)]
.map(_._2)
val EventOutputStream = stateEventDS.writeStream
.format("io.pivotal.greenplum.spark.GreenplumRelationProvider")
.options(gscWriteOptionMap)
.start()
assetEventOutputStream.awaitTermination()
2条答案
按热度按时间x9ybnkn61#
greenplum spark结构化流媒体
演示如何使用jdbc将writeStreamAPI与gpdb一起使用
下面的代码块使用速率流源读取数据,并使用基于jdbc的接收器将数据分批流到gpdb
基于批处理的流媒体
基于记录的流媒体
这要用到作者
soat7uwm2#
您使用的是什么版本的gpdb/spark?你可以绕过Spark而选择绿色李子Kafka连接器。
https://gpdb.docs.pivotal.io/5170/greenplum-kafka/overview.html
在早期版本中,greenplum spark连接器公开了一个名为io.pivotal.greenplum.spark.greenplumrelationprovider的spark数据源,以将greenplum数据库中的数据读取到sparkDataframe中。
在以后的版本中,连接器公开了一个名为greenplum的spark数据源,用于在spark和greenplum数据库之间传输数据。
应该是这样的--
val eventoutputstream=stateeventds.write.format(“greenplum”).options(gscwriteoptionmap).save()
请参见:https://greenplum-spark.docs.pivotal.io/160/write_to_gpdb.html