我正在使用PyFlink。我以为所有的Java依赖项都与pip install apache-flink
沿着安装
上面的错误发生在此行:
kafka_consumer = FlinkKafkaConsumer(
topics='mytopic',
deserialization_schema=deserialization_schema,
properties={
'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS})
字符串
根据错误,我是否需要在pyflink环境中通过下载并添加jar来手动指定Kafka消费者依赖项?
任何指导都将不胜感激。
Python Version: 3.8.2
Java Version: java 11.0.11
型
2条答案
按热度按时间bvjxkvbb1#
由于Flink是一个基于Java/Scala的项目,对于连接器和格式,实现都可以作为jar使用
pyflink中的FlinkKafkaConsumer依赖于Java的FlinkKafkaConsumer实现
您需要将Kafka连接器jar包下载到pyflink的lib目录中。lib目录的路径通常是:/usr/local/lib/python3.8.2/site-packages/pyflink/lib
字符串
fwzugrvs2#
Kafka连接器似乎从Flink 1.12开始停产。查看文档。现在你可以使用通用的Kafka连接器了。阅读更多关于cloudera