kafka流:混合匹配papi和dsl ktable而不是共分区

ncgqoxb0  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(353)

我有一个混合匹配的scala拓扑,其中主要工作是一个papi处理器,其他部分通过dsl连接。

EventsProcessor: 
INPUT: eventsTopic
OUTPUT: visitorsTopic (and others)

整个主题的数据(包括原始数据) eventsTopic )通过一个,我们称之为 DoubleKey 它有两个字段。访客被送到 visitorsTopic 通过Flume:

.addSink(VISITOR_SINK_NAME, visitorTopicName,
    DoubleKey.getSerializer(), Visitor.getSerializer(), visitorSinkPartitioner, EVENT_PROCESSOR_NAME)

在dsl中,我在这个主题上创建了一个表:

val visitorTable = builder.table(
  visitorTopicName,
  Consumed.`with`(DoubleKey.getKafkaSerde(),
  Visitor.getKafkaSerde()),
  Materialized.as(visitorStoreName))

我后来连接到 EventProcessor :

topology.connectProcessorAndStateStores(EVENT_PROCESSOR_NAME, visitorStoreName)

所有内容都是共同分区的(通过双键)。 visitorSinkPartitioner 执行典型的模运算:

Math.abs(partitionKey.hashCode % numPartitions)

在papi处理器eventsprocessor中,我查询这个表以查看是否已经存在访问者。
但是,在我的测试中(使用embeddedkafka,但这不会有什么区别),如果我用一个分区运行它们,一切都很好(eventsprocessor在同一个分区上的两个事件上检查ktable) DoubleKey ,在第二个事件中,经过一些延迟,它可以看到存在的 Visitor 但是如果我用一个更高的数字运行它,eventprocessor将永远看不到存储中的值。
但是如果我通过api检查存储(迭代 store.all() ),记录就在那里。所以我知道它一定是去了不同的分区。
由于ktable应该处理其分区上的数据,并且所有内容都发送到同一分区(使用显式分区器调用同一代码),因此ktable应该在同一分区上获取该数据。
我的假设正确吗?会发生什么?
kafkastreams 1.0.0,scala 2.12.4。
附言:当然,这样做是可行的 put 在papi上通过papi而不是 StreamsBuilder.table() ,因为这肯定会使用运行代码的同一分区,但这是不可能的。

ippsafx7

ippsafx71#

是的,假设是正确的。
以防对任何人有帮助:
我在将分区器传递给scala嵌入的kafka库时遇到了一个问题。在其中一个测试套件中,它没有做对。现在,遵循重构的健康实践,我在这个拓扑的所有套件中都使用了这个方法。

def getEmbeddedKafkaTestConfig(zkPort: Int, kafkaPort: Int) : 
    EmbeddedKafkaConfig = {
    val producerProperties = Map(ProducerConfig.PARTITIONER_CLASS_CONFIG ->
        classOf[DoubleKeyPartitioner].getCanonicalName)
    EmbeddedKafkaConfig(kafkaPort = kafkaPort, zooKeeperPort = zkPort, 
        customProducerProperties = producerProperties)
}

相关问题