kafka streams处理器api:根据密钥将记录转发到特定的流任务

b1zrtrql  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(248)

我正在使用以下处理器api拓扑:

Source: STopics (topics: [A, B, C])
  --> P1
Processor: P1 (stores: [P1_Store])
  --> CSink
  <-- STopics
Sink: CSink (topic: Result)
  <-- P1

主题有整数键,比如id。每个主题a、b和c有相同数量的分区,比如n。我有一个场景,其中具有相同键的记录可以出现在任何源主题中。假设我们创建了n个p1处理器示例(或n个流任务),每个示例都有一个state store p1\u store的本地副本。
有没有一种方法可以确定p1在运行时的示例数,这样我就可以将具有相同密钥的记录发送到处理器的相同示例,就像使用id%n来利用本地状态存储中id密钥的前一个值一样?
更新。我正在重新键入所有在主题a有空键的记录,并且新的键id可以出现在任何主题中,这里是a、b或c。
后续问题:如果我有一个处理器p1的两个父处理器p00和p01。p00在示例x上运行,将键更改为123并向前,而p01在示例y上运行,并向前键123。如何保证p00-x和p01-y中的键123总是到达p1的同一个示例(比如p1-z以便123的本地状态存储总是在z处可用)?我不希望p00和p01先写入中间主题,然后p1从中间主题读取。实现单一拓扑设计的任何替代方案?

uqdfh47h

uqdfh47h1#

如果您的输入主题已经按键进行了分区(如果id设置为message key,那么缺省值是多少),则无需执行任何操作。kafka streams会将分区分配给任务,以便保留分区。
特别是,如果每个主题有n个分区,那么将有n个任务,任务0将分配分区a-0、b-0和c-0,依此类推(即,来自不同主题的具有相同编号的分区将自动合并)。此外,处理器的示例数与任务数相同。处理器窗体task x将处理来自分区a-x、b-x和c-x的所有记录。
如果id不是输入主题中的键,则需要在通过其他主题将id设置为键后重新划分数据:

// using the DSL
stream.selectKey(...)
      .groupByKey()
      .aggregate(...)

// using Processor API
topology.addSource(...); // read input topics
topology.addProcessor(...); // set ID as key
topology.addSink(...); write to new topic for repartitioning
topology.addSource(...); // read from repartition topic
topology.addProcessor(...); // your processor updating the state

相关问题