我想把Flink和Kafka的消费者联系起来
我正在使用docker compose构建4个容器zookeeper、kafka、flink jobmanager和flink taskmanager。
对于zookeeper和kafka,我使用的是wurstmeister图像,对于flink,我使用的是官方图像。
docker-compose.yml公司
version: '3.1'
services:
zookeeper:
image: wurstmeister/zookeeper:3.4.6
hostname: zookeeper
expose:
- "2181"
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:2.11-2.0.0
depends_on:
- zookeeper
ports:
- "9092:9092"
hostname: kafka
links:
- zookeeper
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_PORT: 9092
KAFKA_CREATE_TOPICS: 'pipeline:1:1:compact'
jobmanager:
build: ./flink_pipeline
depends_on:
- kafka
links:
- zookeeper
- kafka
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
JOB_MANAGER_RPC_ADDRESS: jobmanager
BOOTSTRAP_SERVER: kafka:9092
ZOOKEEPER: zookeeper:2181
taskmanager:
image: flink
expose:
- "6121"
- "6122"
links:
- jobmanager
- zookeeper
- kafka
depends_on:
- jobmanager
command: taskmanager
# links:
# - "jobmanager:jobmanager"
environment:
JOB_MANAGER_RPC_ADDRESS: jobmanager
当我向dispatcher提交一个简单的作业时,该作业失败,并出现以下错误:
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition pipeline-0 could be determined
我的工作代码是:
public class Main {
public static void main( String[] args ) throws Exception
{
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
Properties properties = new Properties();
String bootstrapServer = System.getenv("BOOTSTRAP_SERVER");
String zookeeperServer = System.getenv("ZOOKEEPER");
if (bootstrapServer == null) {
System.exit(1);
}
properties.setProperty("zookeeper", zookeeperServer);
properties.setProperty("bootstrap.servers", bootstrapServer);
properties.setProperty("group.id", "pipeline-analysis");
FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<String>("pipeline", new SimpleStringSchema(), properties);
// kafkaConsumer.setStartFromGroupOffsets();
kafkaConsumer.setStartFromLatest();
DataStream<String> stream = env.addSource(kafkaConsumer);
// Defining Pipeline here
// Printing Outputs
stream.print();
env.execute("Stream Pipeline");
}
}
2条答案
按热度按时间ljsrvy3e1#
我犯了一个看起来一样的错误。
原来是我的主机文件被更改了,所以代理地址是错误的。
尝试此日志设置以调试更多详细信息。
vnjpjtjt2#
我知道我参加聚会迟到了,但我也犯了同样的错误。在我的例子中,我没有正确设置主题分区。我的主题有两个分区,我的制作者可以很好地生成消息,但作为我的消费者,spark流媒体应用程序并没有真正启动并在60秒后放弃抱怨同样的错误。
我的密码错了-
正确的代码-