spark jdbc

b1payxdu  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(779)

我对java编程是个新手。我有一种从oracle数据库读取数据的方法。现在我需要帮助为下面的代码编写使用junit框架的测试用例。
数据集df=spark.read().format(“jdbc”).jdbc(jdbcurl,dbtable1,connectionproperties);

zf2sa74q

zf2sa74q1#

您可以使用下面的方法,您需要使用h2库来创建一个带有数据库的内存临时服务器,您可以在其中创建带有测试数据的所需表。本例使用的是mssqlserver(mode=mssqlserver),您可以对其进行更改。

<!-- https://mvnrepository.com/artifact/com.h2database/h2 -->
<dependency>
  <groupId>com.h2database</groupId>
  <artifactId>h2</artifactId>
  <version>1.4.200</version>
  <scope>test</scope>
</dependency>

实际方法

def GetJDBCDataframe(spark: SparkSession, jdbcUrl: String, connectionProperties :Properties): DataFrame = {
    val employeesQuery = "(SELECT * FROM Employee) ref_alias"
    val df = spark.read.jdbc(url = jdbcUrl, table = employeesQuery, connectionProperties)
    df
}

测试用例方法:

@Test
def GetJDBCDataframe(): Unit = {

  val spark: SparkSession = {
    SparkSession
      .builder()
      .master("local[2]")
      .appName("SampleSparkScalaTest")
      .getOrCreate()
  }
  //H2 library connection for creating a inmemory MSSQL databse
  val url = "jdbc:h2:mem:Mydatabasename;MODE=MSSQLServer"

  val connectionProperties = new Properties()
  connectionProperties.setProperty("user", "sa")
  connectionProperties.setProperty("password", "")

  Class.forName("org.h2.Driver")
  val conn = DriverManager.getConnection(url, connectionProperties)

  //create mock jdbc Employee table (immemory)
  conn.prepareStatement("create table Employee(EmpId int, FirstName varchar, LastName varchar)").executeUpdate()
  conn.prepareStatement("insert into Employee values (1, 'Brain', 'Lara')").executeUpdate()
  conn.prepareStatement("insert into Employee values (2, 'Virat', 'Kholi')").executeUpdate()
  conn.prepareStatement("insert into Employee values (3, 'MS', 'Dhoni')").executeUpdate()
  conn.commit()

  val rowCount = TransactionLogicTransformation.GetJDBCDataframe(spark,url,connectionProperties).count()

  assertEquals(3, rowCount)
}

相关问题