在将flinkkafka客户机部署到两个示例之后,一条消息被同一组\u id中的两个使用者使用,而预期只被一次使用。所有主题只有一个分区。
我找到了一个链接mail list link,并解释了flinkkafka客户端中的group.id与kafka客户端中的group.id不等价。然而,没有解决办法。
代码随spring boot framework提供,主要有两个类:
@Configuration
public class DataConfig {
@Value("${data.bootstrap}")
private String kafkaBootStrap;
@Value("${data.group}")
private String group;
@Value("${data.topic}")
private String topics;
@Bean
public FlinkKafkaConsumer011<String> createBatchConsumer() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaBootStrap);
props.setProperty("group.id", group);
// Require more careful.
props.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
// earliest
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
List<String> splitTopics = Arrays.stream(topics.split(",")).map(String::trim)
.collect(Collectors.toList());
FlinkKafkaConsumer011<String> consumer =
new FlinkKafkaConsumer011<>(splitTopics, new SimpleStringSchema(), props);
consumer.setStartFromGroupOffsets();
consumer.setCommitOffsetsOnCheckpoints(true);
/**
* Continue to do initiation.
*/
return consumer;
}
}
@Service
@Slf4j
public class DataCollectServiceImpl implements DataCollectService {
@Autowired
FlinkKafkaConsumer011<String> consumer;
@Autowired
private TidbSink tidbSink;
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@PostConstruct
public void init() {
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
// Dump out the message inside of TopicFilter and pls ignore DtaaMpaper and tidbSink
environment.addSource(consumer).filter(new TopicFilter()).map(new DataMapper())
.addSink(tidbSink);
try {
Future<Integer> future = threadPoolTaskExecutor.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
environment.execute();
return 3;
}
});
} catch (Exception e) {
log.error("exception {}", e);
}
}
@Override
public void start() {
}
}
@Slf4j
public class TopicFilter implements FilterFunction<String> {
@Override
public boolean filter(String record) throws Exception {
// Check whether the topics in the value are on the list.
try {
JsonParser parser = new JsonParser();
JsonObject header = parser.parse(record).getAsJsonObject().getAsJsonObject(HEADER);
log.info(record);
return true;
} catch (Exception e) {
log.error("Processing data filter failure", e);
}
return false;
}
}
暂无答案!
目前还没有任何答案,快来回答吧!