我正在尝试从以下url运行flink目录示例:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/catalogs.html#how-创建并将flink表注册到目录
from pyflink.table import *
from pyflink.table.catalog import HiveCatalog, CatalogDatabase, ObjectPath, CatalogBaseTable
from pyflink.table.descriptors import Kafka
settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = BatchTableEnvironment.create(environment_settings=settings)
catalog = HiveCatalog("myhive", None, "<path_of_hive_conf>")
但我得到了错误:
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/home/sean/code/play_with_data/venv_py36/lib/python3.6/site-packages/py4j/java_gateway.py", line 1188, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/sean/code/play_with_data/venv_py36/lib/python3.6/site-packages/py4j/java_gateway.py", line 1014, in send_command
response = connection.send_command(command)
File "/home/sean/code/play_with_data/venv_py36/lib/python3.6/site-packages/py4j/java_gateway.py", line 1193, in send_command
"Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
---------------------------------------------------------------------------
Py4JError Traceback (most recent call last)
<ipython-input-2-bfdc1a237737> in <module>
3
4 # Create a HiveCatalog
----> 5 catalog = HiveCatalog("myhive", None, "/opt/apache/hive/conf")
~/code/play_with_data/venv_py36/lib/python3.6/site-packages/pyflink/table/catalog.py in __init__(self, catalog_name, default_database, hive_conf_dir, j_hive_catalog)
994
995 if j_hive_catalog is None:
--> 996 j_hive_catalog = gateway.jvm.org.apache.flink.table.catalog.hive.HiveCatalog(
997 catalog_name, default_database, hive_conf_dir)
998 super(HiveCatalog, self).__init__(j_hive_catalog)
~/code/play_with_data/venv_py36/lib/python3.6/site-packages/py4j/java_gateway.py in __getattr__(self, name)
1625 answer[proto.CLASS_FQN_START:], self._gateway_client)
1626 else:
-> 1627 raise Py4JError("{0} does not exist in the JVM".format(new_fqn))
1628
1629
Py4JError: org.apache.flink.table.catalog.hive.HiveCatalog does not exist in the JVM
${FLINK_HOME}/lib:
lib
├── flink-connector-jdbc_2.11-1.11.2.jar
├── flink-csv-1.11.2.jar
├── flink-dist_2.12-1.11.2.jar
├── flink-hadoop-compatibility_2.12-1.11.2.jar
├── flink-json-1.11.2.jar
├── flink-shaded-zookeeper-3.4.14.jar
├── flink-sql-connector-hive-3.1.2_2.12-1.11.2.jar
├── flink-table_2.12-1.11.2.jar
├── flink-table-blink_2.12-1.11.2.jar
├── hive-common-3.1.2.jar -> /opt/apache/hive/lib/hive-common-3.1.2.jar
├── hive-exec-3.1.2.jar -> /opt/apache/hive/lib/hive-exec-3.1.2.jar
├── libfb303-0.9.3.jar -> /opt/apache/hive/lib/libfb303-0.9.3.jar
├── log4j-1.2-api-2.12.1.jar
├── log4j-api-2.12.1.jar
├── log4j-core-2.12.1.jar
├── log4j-slf4j-impl-2.12.1.jar
└── postgresql-42.2.18.jar
怎么了?好像少了个图书馆。但我不明白。
暂无答案!
目前还没有任何答案,快来回答吧!