Spring Cloud流partitionkeyexpression计算错误

alen0pnh  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(353)

我有一个使用kafka的基于spring云流的微服务。
我创建了一个带有4个分区的Kafka主题。
我在yml中配置了以下内容:

spring:
  cloud:
    stream:
      bindings:
        SYNC_TABLE:
          content-type: application/json
          partitionKeyExpression: payload.partitionKey
          partitionCount: 4
          destination: ${envTopicPrefix}.LEGACY_TABLE

在我的代码中,我的消息类包含(在其超类中)partitionkey变量:

@Data
@EqualsAndHashCode(callSuper=true)
@ToString(callSuper=true)
public class TransactionResponse extends GeneralOutputMessage{

}
@Data
@ToString
public class GeneralOutputMessage {

    private String operationType;
    private List<String> affectedFields;
    private Object data;
    private String eventId;
    private String eventName;
    private String partitionKey; 
}

我正在将transactionsresponse对象作为消息发送:

final TransactionResponse transactionResponse = handler.handleEvent(event);
if (transactionResponse != null) {
    outputChannels.tableSync().send(MessageBuilder.withPayload(transactionResponse).build());
    log.info("Message Sent: {}", transactionResponse);
}

我的期望是spring云流将获取key payload.partitionkey,计算其hashcode()%4,并将事件发送到该分区。
然而,逻辑是完全随机的。以下是几个例子:
math.abs(“111615631”.hashcode()%4)=1。但是,消息被发送到3号分区。
math.abs(“110019882”.hashcode()%4)=2。但是,消息被发送到0号分区。
math.abs(“943152574”.hashcode()%4)=0。此消息确实会被发送到0号分区。
math.abs(“943198862”.hashcode()%4)=0。但是,这个消息被发送到分区2。
我用的是dalston.sr1释放序列。
我错过了什么?
谢谢。
更新:
只是尝试用相同的分区键发送相同的事件(但消息体略有不同)。即使分区键相同,消息也会发送到两个不同的分区。看起来spring cloud stream完全忽略了partitionkeyexpression。

2w2cym1i

2w2cym1i1#

这是我的错误,我忘了在yml中添加producer:部分:

spring:
  cloud:
    stream:
      bindings:
        SYNC_TABLE:
          content-type: application/json
          producer:
            partitionKeyExpression: payload.partitionKey
            partitionCount: 4
          destination: ${envTopicPrefix}.LEGACY_TABLE

相关问题