我们使用kafka流插入到postgresql中,因为流太高,所以避免了直接插入。消费者似乎工作得很好,但偶尔会陷入困境,找不到相同的根本原因。
消费者已经运行了大约6个月,已经消耗了数十亿张唱片。我不明白为什么它最近被卡住了。我甚至不知道从哪里开始调试。
下面是处理记录的代码:`private static void readfromtopic(datasource datasource,consumeroptions options){
KafkaConsumer<String, String> consumer = KafkaConsumerConfig.createConsumerGroup(options);
Producer<Long, String> producer = KafkaProducerConfig.createKafkaProducer(options);
if (options.isReadFromAnOffset()) {
// if want to assign particular offsets to consume from
// will work for only a single partition for a consumer
List<TopicPartition> tpartition = new ArrayList<TopicPartition>();
tpartition.add(new TopicPartition(options.getTopicName(), options.getPartition()));
consumer.assign(tpartition);
consumer.seek(tpartition.get(0), options.getOffset());
} else {
// use auto assign partition & offsets
consumer.subscribe(Arrays.asList(options.getTopicName()));
log.debug("subscribed to topic {}", options.getTopicName());
}
List<Payload> payloads = new ArrayList<>();
while (true) {
// timer is the time to wait for messages to be received in the broker
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(50));
if(records.count() != 0 )
log.debug("poll size is {}", records.count());
Set<TopicPartition> partitions = records.partitions();
// reading normally as per round robin and the last committed offset
for (ConsumerRecord<String, String> r : records) {
log.debug(" Parition : {} Offset : {}", r.partition(), r.offset());
try {
JSONArray arr = new JSONArray(r.value());
for (Object o : arr) {
Payload p = JsonIterator.deserialize(((JSONObject) o).toString(), Payload.class);
payloads.add(p);
}
List<Payload> steplist = new ArrayList<>();
steplist.addAll(payloads);
// Run a task specified by a Runnable Object asynchronously.
CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
try {
Connection conn = datasource.getConnection();
PgInsert.insertIntoPg(steplist, conn, consumer, r, options.getTopicName(),
options.getErrorTopic(), producer);
} catch (Exception e) {
log.error("error in processing future {}", e);
}
}
}, executorService);
// used to combine all futures
allfutures.add(future);
payloads.clear();
} catch (Exception e) {
// pushing into new topic for records which have failed
log.debug("error in kafka consumer {}", e);
ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(options.getErrorTopic(),
r.offset(), r.value());
producer.send(record);
}
}
// commiting after every poll
consumer.commitSync();
if (records.count() != 0) {
Map<TopicPartition, OffsetAndMetadata> metadata = consumer.committed(partitions);
// reading the committed offsets for each partition after polling
for (TopicPartition tpartition : partitions) {
OffsetAndMetadata offsetdata = metadata.get(tpartition);
if (offsetdata != null && tpartition != null)
log.debug("committed offset is " + offsetdata.offset() + " for topic partition "
+ tpartition.partition());
}
}
// waiting for all threads to complete after each poll
try {
waitForFuturesToEnd();
allfutures.clear();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}`
之前我认为它卡住的原因是被消耗的记录的大小,所以我减少了 MAX_POLL_RECORDS_CONFIG
至 10
. 这将确保在轮询中获取的记录不会超过200kb,因为每个记录的最大大小可以是20kb。
正在考虑使用spring框架来解决这个问题,但在此之前,我想知道消费者到底为什么会陷入困境。对此的任何见解都会有所帮助。
暂无答案!
目前还没有任何答案,快来回答吧!