scala—使用spark从mysql中提取数据库并将其保存在hdfs上

qhhrdooz  于 2021-06-02  发布在  Hadoop
关注(0)|答案(0)|浏览(377)

我有一个虚拟数据库和一个非常简单的模式。数据库由两个表组成,每个表有两列,两列类型都是int。我制作了一个spark程序,可以在Dataframe中提取数据库:

import org.apache.spark.sql.SparkSession

object mysql_to_hdfs extends App{
  val SPARK_CLUSTER_ADDRESS = "local[*]"
  val APPLICATION_NAME = "mysql_to_hdfs"

  val DB_URL = "jdbc:mysql://localhost:3306/practice_schema?useUnicode=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC"
  val DB_USER = "root"
  val DB_PWD = "root"

  val HDFS_URL = ""

  val sparkSession = SparkSession.builder()
    .master("local[*]")
    .appName(APPLICATION_NAME)
    .getOrCreate()

  val sc = sparkSession.sparkContext
  val sqlC = sparkSession.sqlContext

  val optionsMap:Map[String, String] = Map("url" -> DB_URL, "user" -> DB_USER, "password" -> DB_PWD, "dbtable" -> "table_1")

  //Insert the key/Value tableName in optionsMap of the table you want to load.

  val mysqlDataframe = sqlC.read.format("jdbc").options(optionsMap).load()

  mysqlDataframe.show()

}

现在我必须编写一部分代码,将Dataframe的内容放入hadoop中。我计划将Dataframe保存如下:

mysqlDataframe.write.save("pathToTheFile.csv")

我还不能测试它(我还没有一个虚拟hadoop),但我已经有一些问题了。
在数据被存储之后,如果我在hadoop上面安装hive,是否可以请求这个数据库?
我需要做一些其他的操作来重新划分文件吗?
谢谢您。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题