sqlexception上的sqlcontext hivedriver错误:不支持方法

li9yvcax  于 2021-06-26  发布在  Hive
关注(0)|答案(2)|浏览(432)

我一直在尝试使用 sqlContext.read.format("jdbc").options(driver="org.apache.hive.jdbc.HiveDriver") 把Hive桌放进Spark里却一无所获。我做了研究,阅读了以下内容:
如何从spark连接到远程hive服务器
spark 1.5.1不使用hive jdbc 1.2.0
http://belablotski.blogspot.in/2016/01/access-hive-tables-from-spark-using.html
我使用了最新的hortonworks沙盒2.6,并向那里的社区提出了同样的问题:
https://community.hortonworks.com/questions/156828/pyspark-jdbc-py4jjavaerror-calling-o95load-javasql.html?childtoview=156936#answer-156936
我想做的很简单 pyspark :

df = sqlContext.read.format("jdbc").options(driver="org.apache.hive.jdbc.HiveDriver", url="jdbc:hive2://localhost:10016/default", dbtable="sample_07",user="maria_dev", password="maria_dev").load()

给了我这个错误:

17/12/30 19:55:14 INFO HiveConnection: Will try to open client transport with JDBC Uri: jdbc:hive2://localhost:10016/default
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/hdp/current/spark-client/python/pyspark/sql/readwriter.py", line 139, in load
    return self._df(self._jreader.load())
  File "/usr/hdp/current/spark-client/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
  File "/usr/hdp/current/spark-client/python/pyspark/sql/utils.py", line 45, in deco
    return f(*a,**kw)
  File "/usr/hdp/current/spark-client/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o119.load.
: java.sql.SQLException: Method not supported
at org.apache.hive.jdbc.HiveResultSetMetaData.isSigned(HiveResultSetMetaData.java:143)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:136)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:91)
at org.apache.spark.sql.execution.datasources.jdbc.DefaultSource.createRelation(DefaultSource.scala:57)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:748)

使用直线,效果很好

beeline> !connect jdbc:hive2://localhost:10016/default maria_dev maria_dev
Connecting to jdbc:hive2://localhost:10016/default
Connected to: Spark SQL (version 2.1.1.2.6.1.0-129)
Driver: Hive JDBC (version 1.2.1000.2.6.1.0-129)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://localhost:10016/default> select * from sample_07 limit 2;
+----------+-------------------------+------------+---------+--+
|   code   |       description       | total_emp  | salary  |
+----------+-------------------------+------------+---------+--+
| 00-0000  | All Occupations         | 134354250  | 40690   |
| 11-0000  | Management occupations  | 6003930    | 96150   |
+----------+-------------------------+------------+---------+--+

我也可以这样做:

spark = SparkSession.Builder().appName("testapp").enableHiveSupport().‌​getOrCreate()
spark.sql("select * from default.sample_07").collect()

但这会直接读入配置单元元数据。我想使用jdbc来激发thrift服务器的细粒度安全性。
我可以这样做postgresql:

sqlContext.read.format("jdbc").options(driver="org.postgresql.Driver")

我也可以用scala java.sql.{DriverManager, Connection, Statement, ResultSet} 创建jdbc连接作为客户端以获得spark。但这基本上是将所有数据放入内存,然后手动重新创建Dataframe。
所以问题是:有没有一种方法可以用配置单元表数据创建spark dataframe,而无需将数据加载到内存中的jdbc客户机(如scala)中,而不用 SparkSession.Builder() 像上面的例子?我的用例是需要处理细粒度安全性。

kmpatx3s

kmpatx3s1#

实际上我查过了。hotornworks和cloudera正在放弃支持,通过thrift服务器从spark连接到hive。
所以你在做一些不可能的事情。
https://www.cloudera.com/documentation/spark2/latest/topics/spark2_known_issues.html#ki_thrift_server.
链接说节俭是被禁用的,但它是专门从SparkHive。我可以从spark连接到除hive以外的所有类型的数据库。
所以你必须使用不同的授权方式。
当Spark物体直接连接到Hive时,他们正在移除节俭的支撑。
从上一个问题来看,它能够读取数据,但读取的数据是错误的。spark 2.2查询配置单元表时dataframe numberformatexception上的thrift服务器错误
代码

>>> df = sqlContext.read.format("jdbc").options(driver="org.apache.hive.jdbc.HiveDriver", url="jdbc:hive2://localhost:10016/default", dbtable="test4",user="hive", password="hive").option("fetchsize", "10").load()
>>> df.select("*").show()
+---+----+
| id|desc|
+---+----+
| id|desc|
| id|desc|
+---+----+

问题出在Hive里
默认方言中引用标识符的默认方式是使用双引号。像select“dw\u date”from table…这样的sql查询将被配置单元解析为选择一个字符串文本,而不是一个名为“dw\u date”的列。通过用反勾号替换引号,问题似乎得到了解决。但是,在我的测试中,从配置单元获取的列名都以表名作为前缀,比如table.dw\u date。但你不能像这样直接把它包起来 table.dw_date . 或者,我们需要单独 Package 每个部分
代码

import org.apache.spark.sql.jdbc.JdbcDialect
    private case object HiveDialect extends JdbcDialect {
      override def canHandle(url : String): Boolean = url.startsWith("jdbc:hive2")
      override def quoteIdentifier(colName: String): String = {
        colName.split(‘.’).map(part => s”`$part`”).mkString(“.”)
      }
    }

请按照下面的帖子执行解决方案。
https://medium.com/@viirya/custom-jdbc-方言-for-hive-5dbb694cc2bd
https://medium.com/@huaxing/customize-spark-jdbc-data-source-to-work-with-your-dedicated-database-dialect-beec6519af27
注册方言

JdbcDialects.registerDialect(HiveDialect)

那么hivejdbc就可以工作了。

b4wnujal

b4wnujal2#

我不确定我是否正确地理解了您的问题,但是根据我的理解,您需要将配置单元表放入Dataframe中,因为您不需要jdbc连接,在您的示例链接中,它们试图连接到不同的数据库(rdbms),而不是配置单元。
请参阅下面的方法,使用配置单元上下文可以将表放入Dataframe中。

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SQLContext}

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setAppName("APPName")
    val sc = new SparkContext(sparkConf)
    val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
    val sqlContext = new SQLContext(sc)

val hive_df = hiveContext.sql("select * from schema.table").first()

//other way
// val hive_df= hiveContext.table ("SchemaName.TableName")

//Below will print the first line
df.first()
//count on dataframe
df.count()

}

如果您真的想使用jdbc连接,我有下面的例子,我用于oracle数据库,这可能会对您有所帮助。

val oracle_data = sqlContext.load("jdbc", Map("url" -> "jdbc:oracle:thin:username/password//hostname:2134/databaseName", "dbtable" -> "Your query tmp", "driver" -> "oracle.jdbc.driver.OracleDriver"));

相关问题