请参阅下面的更新以显示潜在的解决方法
我们的应用程序将2个主题作为ktable使用,执行左连接,并输出到一个主题。在测试期间,我们发现当我们的输出主题只有一个分区时,这种方法可以正常工作。当我们增加分区的数量时,我们注意到生成到输出主题的消息的数量会减少。
在启动应用程序之前,我们用多个分区配置测试了这个理论。对于1个分区,我们可以看到100%的消息。使用2,我们可以看到一些消息(小于50%)。有10个,我们几乎看不到(少于10%)。
因为我们没有加入,所以主题1中消耗的每一条消息都应该写入我们的输出主题,但我们发现这并没有发生。似乎消息被困在从ktables的外键连接创建的“中间”主题中,但是没有错误消息。
任何帮助都将不胜感激!
服务.java
@Bean
public BiFunction<KTable<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {
return (topicOne, topicTwo) ->
topicOne
.leftJoin(topicTwo,
value -> MyOtherKey.newBuilder()
.setFieldA(value.getFieldA())
.setFieldB(value.getFieldB())
.build(),
this::enrich)
.toStream();
}
构建.gradle
plugins {
id 'org.springframework.boot' version '2.3.1.RELEASE'
id 'io.spring.dependency-management' version '1.0.9.RELEASE'
id 'com.commercehub.gradle.plugin.avro' version '0.9.1'
}
...
ext {
set('springCloudVersion', "Hoxton.SR6")
}
...
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
implementation 'io.confluent:kafka-streams-avro-serde:5.5.1'
注意:我们排除了org.apache.kafka依赖项,因为springcloudstream中包含的版本中有一个bug
应用程序.yml
spring:
application:
name: app-name
stream:
bindings:
process-in-0:
destination: topic1
group: ${spring.application.name}
process-in-1:
destination: topic2
group: ${spring.application.name}
process-out-0:
destination: outputTopic
kafka:
streams:
binder:
applicationId: ${spring.application.name}
brokers: ${KAFKA_BROKERS}
configuration:
commit.interval.ms: 1000
producer:
acks: all
retries: 20
default:
key:
serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
value:
serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
min-partition-count: 2
测试场景:
为了提供一个具体的示例,如果我将以下3条消息发布到主题1:
{"fieldA": 1, "fieldB": 1},,{"fieldA": 1, "fieldB": 1}
{"fieldA": 2, "fieldB": 2},,{"fieldA": 2, "fieldB": 2}
{"fieldA": 3, "fieldB": 3},,{"fieldA": 3, "fieldB": 3}
{"fieldA": 4, "fieldB": 4},,{"fieldA": 4, "fieldB": 4}
输出主题将只接收2条消息。
{"fieldA": 2, "fieldB": 2},,{"fieldA": 2, "fieldB": 2}
{"fieldA": 3, "fieldB": 3},,{"fieldA": 3, "fieldB": 3}
另外两个怎么了?似乎某些键/值对无法写入输出主题。重试这些“丢失”的消息也不起作用。
更新:
通过将主题1作为kstream(而不是ktable)使用并调用 toTable()
在继续执行ktable连接之前。我仍然不确定为什么我原来的解决方案不起作用,但希望这个解决方法能够对实际问题有所帮助。
@Bean
public BiFunction<KStream<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {
return (topicOne, topicTwo) ->
topicOne
.map(...)
.toTable()
.leftJoin(topicTwo,
value -> MyOtherKey.newBuilder()
.setFieldA(value.getFieldA())
.setFieldB(value.getFieldB())
.build(),
this::enrich)
.toStream();
}
3条答案
按热度按时间gblwokeq1#
这是一个奇怪的问题,我从未听说过有许多输出主题分区控制数据写入频率。但是我知道
toStream()
仅当缓存已满时才将数据写入下游,因此请尝试设置cache.max.bytes.buffering = 0
. 此外,ktable只保留每个键的最新记录,因此如果对同一个键有多个值,则只有最新的值会保留并写入下游。rjjhvcjd2#
根据对问题的描述,(左)ktable输入主题中的数据似乎没有按其键正确分区。对于单个分区的主题,只有一个分区,所有的数据都到这个分区,连接结果就完成了。
但是,对于多分区的输入主题,您需要确保按键对数据进行分区,否则,具有相同键的两个记录可能会在不同的分区中结束,从而导致连接失败(因为连接是按分区进行的)。
请注意,即使外键联接不要求两个输入主题都是共分区的,也仍然要求每个输入主题本身按其键进行分区!
如果你使用
map().toTable()
您基本上会触发数据的内部重新分区,以确保数据按键进行分区,这就解决了问题。wpcxdonn3#
选择连接主题的键可能会有所帮助。主题的分区配置应该相同。