- 从左侧数据库加载的 Dataframe =使用
DataFrameReader
从第一个数据库(例如左侧数据库**)加载的数据。
- 从左侧数据库加载的 Dataframe =使用
我需要
- 迭代通过该 Dataframe 中每一行,
- 连接到第二个数据库如RightDB,
- 从RightDB中找到某个匹配记录,
- 做一些商业逻辑
这是一个迭代操作,因此它不是简单地使用LeftDB和RightDB之间的JOIN来查找一些新字段、创建新 Dataframe targetDF并使用DataframeWriter
写入第三个数据库(例如ThirdDB)即可实现的
我知道我可以用
val targetDF = DataFrameLoadedFromLeftDatabase.mapPartitions(
partition => {
val rightDBconnection = new DbConnection // establish a connection to RightDB
val result = partition.map(record => {
readMatchingFromRightDBandDoBusinessLogicTransformationAndReturnAList(record, rightDBconnection)
}).toList
rightDBconnection.close()
result.iterator
}
).toDF()
targetDF.write
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "table3")
.option("user", "username")
.option("password", "password")
.save()
1.我想知道apache spark是否适合这些类型的聊天数据处理应用程序
1.我想知道,在这种方法中,通过RightDB中的每条记录进行交互是否过于繁琐
1.我期待着一些建议来改进这个设计,以利用Spark的能力。我也想确保处理不会导致太多的 Shuffle 操作的性能原因
参考:Related SO Post
2条答案
按热度按时间ltskdhd11#
可以想到很多改进,但总的来说,所有这些改进都将取决于在HDFS、HBase、Hive数据库、MongoDB等中预先分发数据。
我的意思是:您正在考虑“关系数据与分布式处理思维模式”......我认为我们已经超越了XD
jhdbpxl92#
在这种情况下,我们总是倾向于
spark.sql
。基本上,定义两个不同的DF,并基于查询将它们连接起来,然后您可以应用您的业务逻辑。例如:
如果您的表很大,您可以执行查询并直接读取其结果。如果这样做,您可以通过按日期等进行筛选来减少输入大小:
除此之外,很难通过spark直接记录特定的操作。你必须像自定义逻辑一样维护你的客户端连接等。spark设计用于读/写大量的数据。它为此创建了管道。简单的操作将是它的开销。总是做你的API调用(或单个DB调用)。如果您在其中使用缓存层,它可以在性能方面挽救生命。始终尝试在您的自定义数据库调用中使用
connection pool
,否则spark将尝试使用不同的连接执行所有的Map操作,这可能会对您的数据库造成压力并导致连接失败。