TypeError:找不到Java类“org.apache.flink.streaming.connectors.Kafka.FlinkKafkaConsumer”

pu82cl6c  于 2023-08-01  发布在  Apache
关注(0)|答案(2)|浏览(303)

我正在使用PyFlink。我以为所有的Java依赖项都与pip install apache-flink沿着安装
上面的错误发生在此行:

kafka_consumer = FlinkKafkaConsumer(
    topics='mytopic',
    deserialization_schema=deserialization_schema,
    properties={
        'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS})

字符串
根据错误,我是否需要在pyflink环境中通过下载并添加jar来手动指定Kafka消费者依赖项?
任何指导都将不胜感激。

Python Version: 3.8.2
Java Version: java 11.0.11

bvjxkvbb

bvjxkvbb1#

由于Flink是一个基于Java/Scala的项目,对于连接器和格式,实现都可以作为jar使用
pyflink中的FlinkKafkaConsumer依赖于Java的FlinkKafkaConsumer实现
您需要将Kafka连接器jar包下载到pyflink的lib目录中。lib目录的路径通常是:/usr/local/lib/python3.8.2/site-packages/pyflink/lib

class FlinkKafkaConsumer(FlinkKafkaConsumerBase):
    """
    The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
    Apache Kafka. The consumer can run in multiple parallel instances, each of which will
    pull data from one or more Kafka partitions.

    The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost
    during a failure, and that the computation processes elements 'exactly once. (These guarantees
    naturally assume that Kafka itself does not lose any data.)

    Please note that Flink snapshots the offsets internally as part of its distributed checkpoints.
    The offsets committed to Kafka / Zookeeper are only to bring the outside view of progress in
    sync with Flink's view of the progress. That way, monitoring and other jobs can get a view of
    how far the Flink Kafka consumer has consumed a topic.

    Please refer to Kafka's documentation for the available configuration properties:
    http://kafka.apache.org/documentation.html#newconsumerconfigs
    """

    def __init__(self, topics: Union[str, List[str]], deserialization_schema: DeserializationSchema,
                 properties: Dict):
        """
        Creates a new Kafka streaming source consumer for Kafka 0.10.x.

        This constructor allows passing multiple topics to the consumer.

        :param topics: The Kafka topics to read from.
        :param deserialization_schema: The de-/serializer used to convert between Kafka's byte
                                       messages and Flink's objects.
        :param properties: The properties that are used to configure both the fetcher and the offset
                           handler.
        """

        JFlinkKafkaConsumer = get_gateway().jvm
            .org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
        j_flink_kafka_consumer = _get_kafka_consumer(topics, properties, deserialization_schema,
                                                     JFlinkKafkaConsumer)
        super(FlinkKafkaConsumer, self).__init__(j_flink_kafka_consumer=j_flink_kafka_consumer)

字符串

fwzugrvs

fwzugrvs2#

Kafka连接器似乎从Flink 1.12开始停产。查看文档。现在你可以使用通用的Kafka连接器了。阅读更多关于cloudera

相关问题