我正在尝试提出一个通用的实现,使用sparkjdbc来支持从各种jdbc兼容的数据库(如postgresql、mysql、hive等)读/写数据。
我的代码如下所示。
val conf = new SparkConf().setAppName("Spark Hive JDBC").setMaster("local[*]")
val sc = new SparkContext(conf)
val spark = SparkSession
.builder()
.appName("Spark Hive JDBC Example")
.getOrCreate()
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:hive2://host1:10000/default")
.option("dbtable", "student1")
.option("user", "hive")
.option("password", "hive")
.option("driver", "org.apache.hadoop.hive.jdbc.HiveDriver")
.load()
jdbcDF.printSchema
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:hive2://127.0.0.1:10000/default")
.option("dbtable", "student2")
.option("user", "hive")
.option("password", "hive")
.option("driver", "org.apache.hadoop.hive.jdbc.HiveDriver")
.mode(SaveMode.Overwrite)
输出:
root
|-- name: string (nullable = true)
|-- id: integer (nullable = true)
|-- dept: string (nullable = true)
上面的代码可以无缝地用于postgresql、mysql数据库,但是当我使用与hive相关的jdbc配置时,它就开始引起问题。
首先,我的read无法读取任何数据并返回空结果。在一些搜索之后,我可以通过添加定制的hivedialect使read工作,但是,我仍然面临着将数据写入hive的问题。
case object HiveDialect extends JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:hive2")
override def quoteIdentifier(colName: String): String = s"`$colName`"
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case StringType => Option(JdbcType("STRING", Types.VARCHAR))
case _ => None
}
}
JdbcDialects.registerDialect(HiveDialect)
写入错误:
19/11/13 10:30:14 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.sql.SQLException: Method not supported
at org.apache.hive.jdbc.HivePreparedStatement.addBatch(HivePreparedStatement.java:75)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:664)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
如何使用spark jdbc执行从spark到多个远程配置单元服务器的配置单元查询(读/写)?
我不能使用配置单元元存储uri方法,因为在这种情况下,我将使用单个配置单元服务器配置来限制自己。正如我前面提到的,我希望这种方法对所有数据库类型(postgresql、mysql、hive)都是通用的,所以在我的例子中,采用hive metastore uri方法是行不通的。
依赖关系详细信息:
scala版本:2.11
spark版本:2.4.3
配置单元版本:2.1.1。
使用的配置单元jdbc驱动程序:2.0.1
1条答案
按热度按时间0vvn1miw1#
有很多背景,我不确定是否相关。但是,错误很简单:
spark无法在数据类型为“text”的配置单元中创建表。
配置单元中确实没有称为text的数据类型,可能您正在查找以下数据类型之一:
一串
瓦尔查尔
烧焦
我希望这有帮助,否则请考虑减少您的问题到最低限度(同时保持一个可复制的例子),以避免分心。