如何使用带有spark的jdbc驱动程序读取druid数据?

l3zydbqr  于 2021-05-19  发布在  Spark
关注(0)|答案(1)|浏览(1169)

如何使用spark和avatica jdbc驱动程序从druid读取数据?这是avatica jdbc文档
使用python和jaydebeapi模块从druid读取数据,我成功地完成了以下代码。

$ python
import jaydebeapi

conn = jaydebeapi.connect("org.apache.calcite.avatica.remote.Driver",
                          "jdbc:avatica:remote:url=http://0.0.0.0:8082/druid/v2/sql/avatica/",
                          {"user": "druid", "password":"druid"},
                          "/root/avatica-1.17.0.jar",
       )
cur = conn.cursor()
cur.execute("SELECT * FROM INFORMATION_SCHEMA.TABLES")
cur.fetchall()

输出为:

[('druid', 'druid', 'wikipedia', 'TABLE'),
('druid', 'INFORMATION_SCHEMA', 'COLUMNS', 'SYSTEM_TABLE'),
('druid', 'INFORMATION_SCHEMA', 'SCHEMATA', 'SYSTEM_TABLE'),
('druid', 'INFORMATION_SCHEMA', 'TABLES', 'SYSTEM_TABLE'),
('druid', 'sys', 'segments', 'SYSTEM_TABLE'),
('druid', 'sys', 'server_segments', 'SYSTEM_TABLE'),
('druid', 'sys', 'servers', 'SYSTEM_TABLE'),
('druid', 'sys', 'supervisors', 'SYSTEM_TABLE'),
('druid', 'sys', 'tasks', 'SYSTEM_TABLE')]  -> default tables

但是我想用spark和jdbc来阅读。
我试过了,但有一个问题,使用Spark像下面的代码。

$ pyspark --jars /root/avatica-1.17.0.jar

df = spark.read.format('jdbc') \
    .option('url', 'jdbc:avatica:remote:url=http://0.0.0.0:8082/druid/v2/sql/avatica/') \
    .option("dbtable", 'INFORMATION_SCHEMA.TABLES') \
    .option('user', 'druid') \
    .option('password', 'druid') \
    .option('driver', 'org.apache.calcite.avatica.remote.Driver') \
    .load()

输出为:

Traceback (most recent call last):
  File "<stdin>", line 8, in <module>
  File "/root/spark-2.4.4-bin-hadoop2.7/python/pyspark/sql/readwriter.py", line 172, in load
    return self._df(self._jreader.load())
  File "/root/spark-2.4.4-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/root/spark-2.4.4-bin-hadoop2.7/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a,**kw)
  File "/root/spark-2.4.4-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o2999.load.
: java.sql.SQLException: While closing connection
...
Caused by: java.lang.RuntimeException: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "rpcMetadata" (class org.apache.calcite.avatica.remote.Service$CloseConnectionResponse), not marked as ignorable (0 known properties: ])
 at [Source: {"response":"closeConnection","rpcMetadata":{"response":"rpcMetadata","serverAddress":"172.18.0.7:8082"}}
; line: 1, column: 46]
...
Caused by: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "rpcMetadata" (class org.apache.calcite.avatica.remote.Service$CloseConnectionResponse), not marked as ignorable (0 known properties: ])
 at [Source: {"response":"closeConnection","rpcMetadata":{"response":"rpcMetadata","serverAddress":"172.18.0.7:8082"}}
; line: 1, column: 46] 
...

注:
我下载了avatica jar文件( avatica-1.17.0.jar )来自maven存储库
我使用docker compose和默认设置值安装了druid服务器。

kkih6yb8

kkih6yb81#

我找到了另一个解决这个问题的方法。我用电Spark连接Druid和电Spark。
但我修改了一些类似的代码,以便在我的环境中使用这些代码。
这是我的环境:
Spark:2.4.4
斯卡拉:2.11.12
python:python 3.6.8版
Druid:
Zookeeper:3.5
Druid:0.17.0
但是,它有一个问题。
如果至少使用一次spark druid连接器,所有sql查询 spark.sql("select * from tmep_view") 从下面使用的将被输入到这个计划。
但是,如果使用dataframe的api df.distinct().count() ,那么就没有问题了。我还没解决。

相关问题