kstream连接的重复数据消除中间结果

qgelzfjb  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(311)

我有以下场景:
表a和表b使用fk连接。
事务性插入/更新到a和b中。
debezium发出一个事件 a 对于表a和一个事件 b 对于表b。
kafka streams为表a和b创建kstream。
Kafka流应用程序 leftJoin K流a和b(假设两者都是 a 以及 b 记录有相同的键并落在连接窗口中)。
输出记录将 [a, null], [a, b] .
你如何丢弃这些垃圾 [a, null] ?
一个选择是执行 innerJoin 但这仍然是一个问题,以防 update 查询。
我们尝试使用事件时间戳进行过滤(即使用最新时间戳保留事件),但时间戳的唯一性无法保证。
最终目标是能够识别最新的聚合,以便我们可以在查询时过滤出中间结果(在athena/presto或某些rdbms中)。

db2dz4w8

db2dz4w81#

目前,我发现的最好的工作方法是利用输出记录中的kafka偏移量。
该方法可概括为:
执行您想要执行的所有逻辑,并且不必担心同一个键的多个记录。
把结果写进一个中间主题,保留时间非常短(1小时等)
使用处理器阅读中间主题,并在处理器内使用kafka偏移量丰富信息 context.offset() .
将消息写到输出主题。
现在,输出主题包含同一个键的多个消息,但每个消息的偏移量不同。
在查询期间,现在可以使用子查询为每个键选择最大偏移量。
变压器供应商示例如下所示

/**
 * @param <K> key type
 * @param <V> value type
 */
public class OutputTransformSupplier<K, V> implements TransformerSupplier<K, V, KeyValue<String, String>> {
  @Override
  public Transformer<K, V, KeyValue<String, String>> get() {
    return new OutputTransformer<>();
  }

  private class OutputTransformer<K, V> implements Transformer<K, V, KeyValue<String, String>> {
    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
      this.context = context;
    }

    /**
     * @param key   the key for the record
     * @param value the value for the record
     */
    @Override
    public KeyValue<String, String> transform(K key, V value) {
      if (value != null) {
        value.setKafkaOffset(context.offset());
      }
      return new KeyValue<>(key, value);
    }

    @Override
    public KeyValue<String, String> punctuate(long timestamp) {
      return null;
    }

    @Override
    public void close() {
      // nothing to close
    }
  }
}

相关问题