为什么同一组中的Flink·Kafka的消费者会调查相同的信息

w8rqjzmb  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(257)

在将flinkkafka客户机部署到两个示例之后,一条消息被同一组\u id中的两个使用者使用,而预期只被一次使用。所有主题只有一个分区。
我找到了一个链接mail list link,并解释了flinkkafka客户端中的group.id与kafka客户端中的group.id不等价。然而,没有解决办法。
代码随spring boot framework提供,主要有两个类:

@Configuration
public class DataConfig {

    @Value("${data.bootstrap}")
    private String kafkaBootStrap;
    @Value("${data.group}")
    private String group;
    @Value("${data.topic}")
    private String topics;

    @Bean
    public FlinkKafkaConsumer011<String> createBatchConsumer() {

        Properties props = new Properties();
        props.setProperty("bootstrap.servers", kafkaBootStrap);
        props.setProperty("group.id", group);
        // Require more careful.
        props.setProperty("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        // earliest
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        List<String> splitTopics = Arrays.stream(topics.split(",")).map(String::trim)
                .collect(Collectors.toList());

        FlinkKafkaConsumer011<String> consumer =
                new FlinkKafkaConsumer011<>(splitTopics, new SimpleStringSchema(), props);
        consumer.setStartFromGroupOffsets();
        consumer.setCommitOffsetsOnCheckpoints(true);
        /**
         * Continue to do initiation.
         */
        return consumer;
    }
}

@Service
@Slf4j
public class DataCollectServiceImpl implements DataCollectService {

    @Autowired
    FlinkKafkaConsumer011<String> consumer;

    @Autowired
    private TidbSink tidbSink;

    @Autowired
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    @PostConstruct
    public void init() {
        StreamExecutionEnvironment environment =
                StreamExecutionEnvironment.getExecutionEnvironment();
       // Dump out the message inside of TopicFilter and pls ignore DtaaMpaper and tidbSink
        environment.addSource(consumer).filter(new TopicFilter()).map(new DataMapper())
                .addSink(tidbSink);
        try {
            Future<Integer> future = threadPoolTaskExecutor.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    environment.execute();
                    return 3;
                }
            });
        } catch (Exception e) {
            log.error("exception {}", e);
        }
    }

    @Override
    public void start() {
    }

}

@Slf4j
public class TopicFilter implements FilterFunction<String> {
    @Override
    public boolean filter(String record) throws Exception {
        // Check whether the topics in the value are on the list.
        try {
            JsonParser parser = new JsonParser();
            JsonObject header = parser.parse(record).getAsJsonObject().getAsJsonObject(HEADER);
            log.info(record);
            return true;
        } catch (Exception e) {
            log.error("Processing data filter failure", e);
        }
        return false;
    }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题