kafka客户端在确定分区位置之前超时60000ms

wsxa1bj1  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(2719)

我想把Flink和Kafka的消费者联系起来
我正在使用docker compose构建4个容器zookeeper、kafka、flink jobmanager和flink taskmanager。
对于zookeeper和kafka,我使用的是wurstmeister图像,对于flink,我使用的是官方图像。
docker-compose.yml公司

version: '3.1'
services:
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    hostname: zookeeper
    expose:
      - "2181"
    ports:
      - "2181:2181"

  kafka:
    image: wurstmeister/kafka:2.11-2.0.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    hostname: kafka
    links:
      - zookeeper
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_CREATE_TOPICS: 'pipeline:1:1:compact'

  jobmanager:
    build: ./flink_pipeline
    depends_on:
      - kafka
    links:
      - zookeeper
      - kafka
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      JOB_MANAGER_RPC_ADDRESS: jobmanager
      BOOTSTRAP_SERVER: kafka:9092
      ZOOKEEPER: zookeeper:2181

  taskmanager:
    image: flink
    expose:
      - "6121"
      - "6122"
    links:
      - jobmanager
      - zookeeper
      - kafka
    depends_on:
      - jobmanager
    command: taskmanager
    # links:
    #   - "jobmanager:jobmanager"
    environment:
      JOB_MANAGER_RPC_ADDRESS: jobmanager

当我向dispatcher提交一个简单的作业时,该作业失败,并出现以下错误:

org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition pipeline-0 could be determined

我的工作代码是:

public class Main {
    public static void main( String[] args ) throws Exception
    {
        // get the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data by connecting to the socket
        Properties properties = new Properties();
        String bootstrapServer = System.getenv("BOOTSTRAP_SERVER");
        String zookeeperServer = System.getenv("ZOOKEEPER");

        if (bootstrapServer == null) {
            System.exit(1);
        }

        properties.setProperty("zookeeper", zookeeperServer);
        properties.setProperty("bootstrap.servers", bootstrapServer);
        properties.setProperty("group.id", "pipeline-analysis");

        FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<String>("pipeline", new SimpleStringSchema(), properties);
        // kafkaConsumer.setStartFromGroupOffsets();
        kafkaConsumer.setStartFromLatest();

        DataStream<String> stream = env.addSource(kafkaConsumer);

        // Defining Pipeline here

        // Printing Outputs
        stream.print();

        env.execute("Stream Pipeline");
    }
}
ljsrvy3e

ljsrvy3e1#

我犯了一个看起来一样的错误。

17:34:37.668 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] ERROR o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-3, groupId=api.dev] User provided listener org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener failed on partition assignment
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition aaa-1 could be determined

原来是我的主机文件被更改了,所以代理地址是错误的。
尝试此日志设置以调试更多详细信息。

<logger name="org.apache.kafka.clients.consumer.internals.Fetcher" level="info" />
vnjpjtjt

vnjpjtjt2#

我知道我参加聚会迟到了,但我也犯了同样的错误。在我的例子中,我没有正确设置主题分区。我的主题有两个分区,我的制作者可以很好地生成消息,但作为我的消费者,spark流媒体应用程序并没有真正启动并在60秒后放弃抱怨同样的错误。
我的密码错了-

List<TopicPartition> topicPartitionList = Arrays.asList(new topicPartition(topicName, Integer.parseInt(numPartition)));

正确的代码-

List<TopicPartition> topicPartitionList = new ArrayList<TopicPartition>();

for (int i = 0; i < Integer.parseInt(numPartitions); i++) {
    topicPartitionList.add(new TopicPartition(topicName, i));
}

相关问题