在kafkastreams拓扑中重试消息

uplii1fm  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(327)

我有一个kafkastreams拓扑,其中有一个处理器api。在处理器内部,有一个调用外部api的逻辑。
如果api返回503,则需要重试尝试的消息。
现在,我尝试将此消息推送到另一个kafka主题&使用“标点”方法每分钟从失败的主题中提取一批消息,然后重试。
有没有更好的方法解决这个问题?。

3hvapo4f

3hvapo4f1#

另一种不同但健壮的方法是使用状态存储。它们被Kafka作为压缩的变更日志主题支持。
您可以将失败的消息存储在状态存储中,并通过调用schedule(标点符号)来处理它们,然后删除所有成功处理的消息。
例如:

public class MyProcessor {

    private final long schedulerIntervalMs = 60000;
    private final String entityStoreName = "failed-message-store";
    private KeyValueStore<String, Object> entityStore;

    @Override
    public void init(ProcessorContext context) {
        this.entityStore = (KeyValueStore) context().getStateStore(entityStoreName);
        context().schedule(Duration.ofMillis(this.schedulerIntervalMs), PunctuationType.WALL_CLOCK_TIME,
                timestamp -> processFailedMessagesStore());
    }

    @Override
    public void process(String key, Object value) {
        boolean apiCallSuccessful = // call API

        if (!apiCallSuccesfull) {
            entityStore.put(key, value);
        }
    }

    private void processFailedMessagesStore() {
        try (KeyValueIterator<String, Object> allItems = entityStore.all()) {
            allItems.forEachRemaining(item -> {
                boolean successfullyProcessed = // re-process

                if (successfullyProcessed) {
                    entityStore.delete(item.key);
                }
            });
        }
    }
}

相关问题