spark连接两个数据库表以生成第三个数据

xmq68pz9  于 2023-02-16  发布在  Apache
关注(0)|答案(2)|浏览(152)
    • bounty已结束**。此问题的答案可获得+50的信誉奖励。奖励宽限期将在5小时后结束。santhosh正在寻找来自信誉良好来源的答案
    • 从左侧数据库加载的 Dataframe =使用DataFrameReader从第一个数据库(例如左侧数据库**)加载的数据。

我需要

  • 迭代通过该 Dataframe 中每一行,
  • 连接到第二个数据库如RightDB
  • 从RightDB中找到某个匹配记录,
  • 做一些商业逻辑

这是一个迭代操作,因此它不是简单地使用LeftDBRightDB之间的JOIN来查找一些新字段、创建新 Dataframe targetDF并使用DataframeWriter写入第三个数据库(例如ThirdDB)即可实现的
我知道我可以用

val targetDF = DataFrameLoadedFromLeftDatabase.mapPartitions(
  partition => {
    val rightDBconnection = new DbConnection // establish a connection to RightDB 
    val result = partition.map(record => {
    readMatchingFromRightDBandDoBusinessLogicTransformationAndReturnAList(record, rightDBconnection)
  }).toList
    rightDBconnection.close()
    result.iterator
  }
).toDF()
targetDF.write
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "table3")
  .option("user", "username")
  .option("password", "password")
  .save()

1.我想知道apache spark是否适合这些类型的聊天数据处理应用程序
1.我想知道,在这种方法中,通过RightDB中的每条记录进行交互是否过于繁琐
1.我期待着一些建议来改进这个设计,以利用Spark的能力。我也想确保处理不会导致太多的 Shuffle 操作的性能原因
参考:Related SO Post

ltskdhd1

ltskdhd11#

可以想到很多改进,但总的来说,所有这些改进都将取决于在HDFS、HBase、Hive数据库、MongoDB等中预先分发数据。
我的意思是:您正在考虑“关系数据与分布式处理思维模式”......我认为我们已经超越了XD

jhdbpxl9

jhdbpxl92#

在这种情况下,我们总是倾向于spark.sql。基本上,定义两个不同的DF,并基于查询将它们连接起来,然后您可以应用您的业务逻辑。
例如:

import org.apache.spark.sql.{DataFrame, SparkSession}

// Add your columns here
case class MyResult(ID: String, NAME: String)

// Create a SparkSession
val spark = SparkSession.builder()
  .appName("Join Tables and Add Prefix to ID Column")
  .config("spark.master", "local[*]")
  .getOrCreate()

// Read the first table from DB1
val firstTable: DataFrame = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql://localhost/DB1")
  .option("dbtable", "FIRST_TABLE")
  .option("user", "your_username")
  .option("password", "your_password")
  .load()

firstTable.createOrReplaceTempView("firstTable")

// Read the second table from DB2
val secondTable: DataFrame = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql://localhost/DB2")
  .option("dbtable", "SECOND_TABLE")
  .option("user", "your_username")
  .option("password", "your_password")
  .load()

secondTable.createOrReplaceTempView("secondTable")

// Apply you filtering here
val result: DataFrame = spark.sql("SELECT f.*, s.* FROM firstTable as f left join secondTable as s on f.ID = s.ID")

val finalData = result.as[MyResult]
  .map{record=>
    // Do your business logic
    businessLogic(record)
  }

// Write the result to the third table in DB3
finalData.write
  .format("jdbc")
  .option("url", "jdbc:postgresql://localhost/DB3")
  .option("dbtable", "THIRD_TABLE")
  .option("user", "your_username")
  .option("password", "your_password")
  .save()

如果您的表很大,您可以执行查询并直接读取其结果。如果这样做,您可以通过按日期等进行筛选来减少输入大小:

val myQuery = """
  (select * from table
    where // do your filetering here
  ) foo
"""

val df = sqlContext.format("jdbc").
  option("url", "jdbc:postgresql://localhost/DB").
  .option("user", "your_username")
  .option("password", "your_password")
  .option("dbtable", myQuery)
  .load()

除此之外,很难通过spark直接记录特定的操作。你必须像自定义逻辑一样维护你的客户端连接等。spark设计用于读/写大量的数据。它为此创建了管道。简单的操作将是它的开销。总是做你的API调用(或单个DB调用)。如果您在其中使用缓存层,它可以在性能方面挽救生命。始终尝试在您的自定义数据库调用中使用connection pool,否则spark将尝试使用不同的连接执行所有的Map操作,这可能会对您的数据库造成压力并导致连接失败。

相关问题