sparkjdbc从hive读写

fhity93d  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(449)

我正在尝试提出一个通用的实现,使用sparkjdbc来支持从各种jdbc兼容的数据库(如postgresql、mysql、hive等)读/写数据。
我的代码如下所示。

val conf = new SparkConf().setAppName("Spark Hive JDBC").setMaster("local[*]")

val sc = new SparkContext(conf)

val spark = SparkSession
  .builder()
  .appName("Spark Hive JDBC Example")
  .getOrCreate()

val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:hive2://host1:10000/default")
  .option("dbtable", "student1")
  .option("user", "hive")
  .option("password", "hive")
  .option("driver", "org.apache.hadoop.hive.jdbc.HiveDriver")
  .load()

jdbcDF.printSchema

jdbcDF.write
    .format("jdbc")
    .option("url", "jdbc:hive2://127.0.0.1:10000/default")
    .option("dbtable", "student2")
    .option("user", "hive")
    .option("password", "hive")
    .option("driver", "org.apache.hadoop.hive.jdbc.HiveDriver")
    .mode(SaveMode.Overwrite)

输出:

root
    |-- name: string (nullable = true)
    |-- id: integer (nullable = true)
    |-- dept: string (nullable = true)

上面的代码可以无缝地用于postgresql、mysql数据库,但是当我使用与hive相关的jdbc配置时,它就开始引起问题。
首先,我的read无法读取任何数据并返回空结果。在一些搜索之后,我可以通过添加定制的hivedialect使read工作,但是,我仍然面临着将数据写入hive的问题。

case object HiveDialect extends JdbcDialect {
  override def canHandle(url: String): Boolean = url.startsWith("jdbc:hive2")
  override def quoteIdentifier(colName: String): String = s"`$colName`"
  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
    case StringType => Option(JdbcType("STRING", Types.VARCHAR))
    case _ => None
  }
}

JdbcDialects.registerDialect(HiveDialect)

写入错误:

19/11/13 10:30:14 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.sql.SQLException: Method not supported
    at org.apache.hive.jdbc.HivePreparedStatement.addBatch(HivePreparedStatement.java:75)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:664)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)

如何使用spark jdbc执行从spark到多个远程配置单元服务器的配置单元查询(读/写)?
我不能使用配置单元元存储uri方法,因为在这种情况下,我将使用单个配置单元服务器配置来限制自己。正如我前面提到的,我希望这种方法对所有数据库类型(postgresql、mysql、hive)都是通用的,所以在我的例子中,采用hive metastore uri方法是行不通的。
依赖关系详细信息:
scala版本:2.11
spark版本:2.4.3
配置单元版本:2.1.1。
使用的配置单元jdbc驱动程序:2.0.1

0vvn1miw

0vvn1miw1#

有很多背景,我不确定是否相关。但是,错误很简单:
spark无法在数据类型为“text”的配置单元中创建表。
配置单元中确实没有称为text的数据类型,可能您正在查找以下数据类型之一:
一串
瓦尔查尔
烧焦
我希望这有帮助,否则请考虑减少您的问题到最低限度(同时保持一个可复制的例子),以避免分心。

相关问题