如何在flink kafka consumer中动态获取处理kafka主题名?

vh0rcniy  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(519)

目前,我有一个flink集群,它想用一个模式来消费kafka主题,通过这种方式,我们不需要维护一个硬代码kafka主题列表。

import java.util.regex.Pattern;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
...
private static final Pattern topicPattern = Pattern.compile("(DC_TEST_([A-Z0-9_]+)");
...
FlinkKafkaConsumer010<KafkaMessage> kafkaConsumer = new FlinkKafkaConsumer010<>(
          topicPattern, deserializerClazz.newInstance(), kafkaConsumerProps);
DataStream<KafkaMessage> input = env.addSource(kafkaConsumer);

我只想知道,通过上述方式,如何在处理过程中了解Kafka的真实主题名?谢谢。
--update——我需要知道主题信息的原因是我们需要这个主题名作为参数,以便在即将到来的flink sink部分中使用。

xwbd5t1u

xwbd5t1u1#

您可以实现自己的自定义kafkadeserializationschema,如下所示:

public class CustomKafkaDeserializationSchema implements KafkaDeserializationSchema<Tuple2<String, String>> {
    @Override
    public boolean isEndOfStream(Tuple2<String, String> nextElement) {
        return false;
    }

    @Override
    public Tuple2<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        return new Tuple2<>(record.topic(), new String(record.value(), "UTF-8"));
    }

    @Override
    public TypeInformation<Tuple2<String, String>> getProducedType() {
        return new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
    }
  }

使用自定义kafkadeserializationschema,可以创建元素包含主题信息的数据流。在我的示例中,元素类型是 Tuple2<String, String> ,因此您可以通过 Tuple2#f0 .

FlinkKafkaConsumer010<Tuple2<String, String>> kafkaConsumer = new FlinkKafkaConsumer010<>(
          topicPattern, new CustomKafkaDeserializationSchema, kafkaConsumerProps);
DataStream<Tuple2<String, String>> input = env.addSource(kafkaConsumer);

input.process(new ProcessFunction<Tuple2<String,String>, String>() {
            @Override
            public void processElement(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
                String topicName = value.f0;
                // your processing logic here.
                out.collect(value.f1);
            }
        });
ipakzgxi

ipakzgxi2#

有两种方法。
方案1:
您可以使用kafka客户端库访问kafka元数据,获取主题列表。添加maven依赖项或等效项。

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.3.0</version>
</dependency>

您可以从kafka集群获取主题,并使用regex进行过滤,如下所示

private static final Pattern topicPattern = Pattern.compile("(DC_TEST_([A-Z0-9_]+)");

  Properties properties = new Properties();
  properties.put("bootstrap.servers","localhost:9092");
  properties.put("client.id","java-admin-client");
  try (AdminClient client = AdminClient.create(properties)) {
     ListTopicsOptions options = new ListTopicsOptions();
     options.listInternal(false);
      Collection<TopicListing> listing =  client.listTopics(options).listings().get();
      List<String> allTopicsList = listings.stream().map(TopicListing::name)
      .collect(Collectors.toList());
      List<String> matchedTopics = allTopicsList.stream()
                            .filter(topicPattern.asPredicate())
                            .collect(Collectors.toList());
    }catch (Exception e) {
      e.printStackTrace();
    }
}

一旦你有了匹配的topics列表,你就可以把它传递给flinkkafkaconsumer。
方案2: FlinkKafkaConsumer011 在flink版本1.8中,支持基于模式的动态主题和分区发现。以下是示例:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 private static final Pattern topicPattern = Pattern.compile("(DC_TEST_([A-Z0-9_]+)");
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(
    topicPattern ,
    new SimpleStringSchema(),
    properties);

链接:https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/kafka.html#kafka-使用者主题和分区发现
在你的情况下,选择2最合适。
由于您希望作为kafkamessage的一部分访问主题元数据,因此需要实现kafkadeserializationschema接口,如下所示:

public class CustomKafkaDeserializationSchema extends KafkaDeserializationSchema<KafkaMessage> {
    /**
     * Deserializes the byte message.
     *
     * @param messageKey the key as a byte array (null if no key has been set).
     * @param message The message, as a byte array (null if the message was empty or deleted).
     * @param partition The partition the message has originated from.
     * @param offset the offset of the message in the original source (for example the Kafka offset).
     *
     * @return The deserialized message as an object (null if the message cannot be deserialized).
     */
    @Override
    public KafkaMessage deserialize(ConsumerRecord<byte[], byte[]> record) throws IOException {
        //You can access record.key(), record.value(), record.topic(), record.partition(), record.offset() to get topic information.
         KafkaMessage kafkaMessage = new KafkaMessage();
         kafkaMessage.setTopic(record.topic());
         // Make your kafka message here and assign the values like above.
        return kafkaMessage ;
    }

    @Override
    public boolean isEndOfStream(Long nextElement) {
        return false;
    }       
}

然后打电话:

FlinkKafkaConsumer010<Tuple2<String, String>> kafkaConsumer = new FlinkKafkaConsumer010<>(
          topicPattern, new CustomKafkaDeserializationSchema, kafkaConsumerProps);

相关问题