有没有一种有效的方法来连接几个(多于2个)Kafka主题?

guz6ccqo  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(301)

我想按键连接几个(通常是2-10个)kafka主题,最好使用流式api。所有主题都将具有相同的键和分区。执行此联接的一种方法是创建 KStream 对于每个主题和链调用 KStream.outerJoin :

stream1
    .outerJoin(stream2, ...)
    .outerJoin(stream3, ...)
    .outerJoin(stream4, ...)

但是 KStream.outerJoin 建议每次呼叫 outerJoin 将具体化其两个输入流,因此上面的示例不仅将具体化流1到4,而且还将具体化 stream1.outerJoin(stream2, ...) 以及 stream1.outerJoin(stream2, ...).outerJoin(stream3, ...) . 与直接连接4个流相比,会有很多不必要的序列化、反序列化和i/o。
上述方法的另一个问题是 JoinWindow 不会在所有4个输入流中保持一致:一个 JoinWindow 将用于连接流1和流2,但随后将使用一个单独的连接窗口来连接此流和流3等。例如,我为每个连接指定一个10秒的连接窗口,并且具有特定键的条目在0秒时出现在流1中,在6秒时出现在流2中,在12秒时出现在流3中,在18秒时出现在流4中,连接的项将在18秒后获得输出,导致延迟过高。结果取决于连接的顺序,这看起来很不自然。
有没有更好的方法来使用kafka实现多路连接?

2cmtqfgy

2cmtqfgy1#

我不知道Kafka流目前有什么更好的方法,但它正在形成:
https://cwiki.apache.org/confluence/display/kafka/kip-150+-+kafka-streams+cogroup

xiozqbni

xiozqbni2#

最终,我决定创建一个定制的轻量级joiner,它避免了物化并严格遵守过期时间。平均应该是o(1)。与流api相比,它更适合于consumer api:对于每个consumer,重复轮询并使用任何接收到的数据更新joiner;如果joiner返回一个完整的属性集,那么就转发它。代码如下:

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;

/**
 * Inner joins multiple streams of data by key into one stream. It is assumed
 * that a key will appear in a stream exactly once. The values associated with
 * each key are collected and if all values are received within a certain
 * maximum wait time, the joiner returns all values corresponding to that key.
 * If not all values are received in time, the joiner never returns any values
 * corresponding to that key.
 * <p>
 * This class is not thread safe: all calls to
 * {@link #update(Object, Object, long)} must be synchronized.
 * @param <K> The type of key.
 * @param <V> The type of value.
 */
class StreamInnerJoiner<K, V> {

    private final Map<K, Vals<V>> idToVals = new LinkedHashMap<>();
    private final int joinCount;
    private final long maxWait;

    /**
     * Creates a stream inner joiner.
     * @param joinCount The number of streams being joined.
     * @param maxWait The maximum amount of time after an item has been seen in
     * one stream to wait for it to be seen in the remaining streams.
     */
    StreamInnerJoiner(final int joinCount, final long maxWait) {
        this.joinCount = joinCount;
        this.maxWait = maxWait;
    }

    private static class Vals<A> {
        final long firstSeen;
        final Collection<A> vals = new ArrayList<>();
        private Vals(final long firstSeen) {
            this.firstSeen = firstSeen;
        }
    }

    /**
     * Updates this joiner with a value corresponding to a key.
     * @param key The key.
     * @param val The value.
     * @param now The current time.
     * @return If all values for the specified key have been received, the
     * complete collection of values for thaht key; otherwise
     * {@link Optional#empty()}.
     */
    Optional<Collection<V>> update(final K key, final V val, final long now) {
        expireOld(now - maxWait);
        final Vals<V> curVals = getOrCreate(key, now);
        curVals.vals.add(val);
        return expireAndGetIffFull(key, curVals);
    }

    private Vals<V> getOrCreate(final K key, final long now) {
        final Vals<V> existingVals = idToVals.get(key);
        if (existingVals != null)
            return existingVals;
        else {
            /*
            Note: we assume that the item with the specified ID has not already
            been seen and timed out, and therefore that its first seen time is
            now. If the item has in fact already timed out, it is doomed and
            will time out again with no ill effect.
             */
            final Vals<V> curVals = new Vals<>(now);
            idToVals.put(key, curVals);
            return curVals;
        }
    }

    private void expireOld(final long expireBefore) {
        final Iterator<Vals<V>> i = idToVals.values().iterator();
        while (i.hasNext() && i.next().firstSeen < expireBefore)
            i.remove();
    }

    private Optional<Collection<V>> expireAndGetIffFull(final K key, final Vals<V> vals) {
        if (vals.vals.size() == joinCount) {
            // as all expired entries were already removed, this entry is valid
            idToVals.remove(key);
            return Optional.of(vals.vals);
        } else
            return Optional.empty();
    }
}

相关问题