目前,我有一个flink集群,它想用一个模式来消费kafka主题,通过这种方式,我们不需要维护一个硬代码kafka主题列表。
import java.util.regex.Pattern;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
...
private static final Pattern topicPattern = Pattern.compile("(DC_TEST_([A-Z0-9_]+)");
...
FlinkKafkaConsumer010<KafkaMessage> kafkaConsumer = new FlinkKafkaConsumer010<>(
topicPattern, deserializerClazz.newInstance(), kafkaConsumerProps);
DataStream<KafkaMessage> input = env.addSource(kafkaConsumer);
我只想知道,通过上述方式,如何在处理过程中了解Kafka的真实主题名?谢谢。
--update——我需要知道主题信息的原因是我们需要这个主题名作为参数,以便在即将到来的flink sink部分中使用。
2条答案
按热度按时间xwbd5t1u1#
您可以实现自己的自定义kafkadeserializationschema,如下所示:
使用自定义kafkadeserializationschema,可以创建元素包含主题信息的数据流。在我的示例中,元素类型是
Tuple2<String, String>
,因此您可以通过Tuple2#f0
.ipakzgxi2#
有两种方法。
方案1:
您可以使用kafka客户端库访问kafka元数据,获取主题列表。添加maven依赖项或等效项。
您可以从kafka集群获取主题,并使用regex进行过滤,如下所示
一旦你有了匹配的topics列表,你就可以把它传递给flinkkafkaconsumer。
方案2:
FlinkKafkaConsumer011
在flink版本1.8中,支持基于模式的动态主题和分区发现。以下是示例:链接:https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/kafka.html#kafka-使用者主题和分区发现
在你的情况下,选择2最合适。
由于您希望作为kafkamessage的一部分访问主题元数据,因此需要实现kafkadeserializationschema接口,如下所示:
然后打电话: