scala从oracle表中获取详细信息并在配置单元表中启动查询

j8ag8udp  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(356)

我正在尝试连接到oracle数据库,为此我必须使用纯scala连接,而不是spark连接。所以我为oracle数据库编写了连接代码。
现在最头疼的是我在oracle表中有一个列,它为每一行编写了一个select查询(表中有元数据)。我需要获取写在每一列中的查询并激发它,它将位于hive表中,并将查询结果存储在dataframe中。我不知道用什么方法来解决上述问题。
oracle表数据

我可以使用纯scala连接连接到oracle表,我需要获取查询列数据并启动它。我需要将查询结果存储在dataframe中以便进一步处理。
连接code:-

object ScalaJdbcConnectSelect {

  def main(args: Array[String]) {
    // connect to the database named "mysql" on the localhost
    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://localhost/mysql"
    val username = "root"
    val password = "root"

    var connection:Connection = null

    try {
      // make the connection
      Class.`enter code here`forName(driver)
      connection = DriverManager.getConnection(url, username, password)

      // create the statement, and run the select query
      val statement = connection.createStatement()
      val resultSet = statement.executeQuery(query)`
n1bvdmb6

n1bvdmb61#

您可以从spark连接到oracle,您应该这样做,因为否则您必须在使用spark读取数据之前,按顺序将所有数据带到某个中间存储中,这与多个数据库连接并行检索数据相比是相当浪费的。

// Set the variables server, port, service
val url = s"jdbc:oracle:thin:@$server:$port:$service"

// Add odbc6.jar via --driver-class-path and --jars during spark-shell/submit 
val reader = spark.read.format("jdbc")
  .option("url", url)
  .option("user", user)
  .option("password", password)
  .option("driver", "oracle.jdbc.driver.OracleDriver")

// Note the use of partitionColumn is necessary to create multiple connections 
// from the workers
val df = reader.option("dbtable", "db.table")
               .option("partitionColumn", "col1")
               .load
val dfWithQuery = reader.option("dbtable", "(SELECT a, b, c FROM t1) AS tbl1")
                  .option("partitionColumn", "a")
                  .load

你得到你的结果了吗 df 首先,然后可以收集查询列,循环遍历并将收集的查询添加到 reader 创建新的 DataFrame .
如果你坚持使用你的方法,那么你已经有了一个 ResultSet 2, . 只需在遍历行时获取列,然后将查询用于 Connection .

// ^ Your code above, Connection already exists
while (resultSet.next()){
  try {
    val queryValue = resultSet.getString("query")
    val queryResultSet = statement.executeQuery(queryValue)
    while (queryResultSet){
      // Do stuff with your newly queried ResultSet
    }
  }
}

你可以看到 try catch 不是很漂亮。
如果需要,可以创建 Dataset 在外面 ResultSet ,但请注意,这需要加载整个 RestulSet 如果数据太大,你可能会用完。

def resultSetToSpark[T](rs: ResultSet, f: ResultSet => T, 
                        spark: SparkSession, encoder: Encoder[T]): Dataset[T] = {
  val data: Seq[T] = Iterator.continually(rs.next, rs)
    .takeWhile(_._1).map{
      case (_,rs) => f(rs)
    }.toList

  spark.createDataset(data)(encoder)
}

注意,您必须提供一个从 ResultSet 并示例化 class 以及类的编码器。下面是一个例子。

case class Potato(a: String, b: String, c: String)

def parseResultSet(rs: ResultSet): Potato = Potato(
 rs.getString("a"), rs.getString("b"), rs.getString("c")
)

import org.apache.spark.sql.{Encoder, Encoders}
val encoder: Encoder[Potato] =  Encoders.product[Potato]

// Use like this
val dfFromRS = resultSetToSpark(resultSet, parseResultSet, spark, encoder)

所以最好的方法就是使用spark的 DataFrameReader 并让spark workers与数据库建立多个连接,但这实际上只在有问题的数据可能导致驱动程序内存不足时才起作用。
您总是可以得到这样的所有查询的列表。

case class Query(statement: String)
implicit val queryEncoder: Encoder[Query] = Encoders.product[Query]
val queryDs = resultSetToSpark(rs)(rs => Message(rs.getString("query")))(spark, queryEncoder)
val queryList = queryDs.collect.toList
val df1 = spark.sql(queryList.head) // get a DataFrame from the first query in the list

相关问题