我们已经实现了一个由几个spring-boot微服务组成的系统,这些服务通过发布到kafka主题的消息进行通信。我们使用springcloudstream来处理通过kafka发送和接收消息的大量繁重工作。我们使用apacheavro作为传输协议,与模式服务器集成(springcloudstream默认实现用于本地开发,confluent用于生产)。
我们在公共库中对消息类进行建模,每个微服务都将其作为依赖项包含。当微服务充当生产者并发送消息时,在avro序列化发生之前,我们使用“动态模式生成”从消息类的形状推断avro模式。消费微服务可以根据模式版本从注册表中查找模式,并反序列化到消息类中,消息类也是它的一个依赖项。
它工作得很好,但是有一个很大的缺点,我想知道是否有人经历过,可以提供任何建议。例如,如果我们希望将一个新字段添加到一个模型类中,我们可以在公共模型类库中进行添加,并在微服务中更新该依赖项的名称。但这意味着我们需要更新链上每个微服务的依赖关系版本,即使中间的微服务不需要这个新字段。否则,该新字段的数据值将沿途丢失,因为微服务使用者沿途将反序列化为对象(可能是类的过期版本)。
举个例子,假设我们的模型公共库中有一个名为paymentrequest的模型类(@data注解是lombok,juts从字段中生成getter和setter):
@Data
class PaymentRequest {
String paymentId;
String customerId;
}
我们有一个叫做payservice的微服务,它向kafka主题发送paymentrequest消息:
@Output("payment-broker”)
MessageChannel paymentBrokerTopic();
...
PaymentRequest paymentRequest = getPaymentRequest();
Message<PaymentRequest> message = MessageBuilder.withPayload(paymentRequest).build();
paymentBrokerTopic().(message);
我们在spring boot应用程序的application.yaml中有这样的配置:
spring:
cloud:
stream:
schema-registry-client:
endpoint: http://localhost:8071
schema:
avro:
dynamicSchemaGenerationEnabled: true
bindings:
Payment-broker:
destination: paymentBroker
contentType: application/*+avro
springcloudstream的avro-messageconverter从paymentrequest对象推断出模式,如果那里还没有匹配的模式,则向schema注册表添加一个模式,并以avro格式在kafka上发送消息。 然后我们在另一个微服务brokerservice中有一个消费者,它有这个消费者:
@Output("payment-processor”)
MessageChannel paymentProcessorTopic();
@Input(“payment-request”)
SubscribableChannel paymentRequestTopic();
@StreamListener("payment-request")
public void processNewPayment(Message<PaymentRequest> request) {
// do some processing and then send on…
paymentProcessorTopic().(message);
}
它能够将来自kafka的avro消息反序列化到paymentrequest pojo中,对其进行一些额外的处理,然后将消息发送到另一个主题,称为paymentprocessor,然后由另一个微服务paymentprocessor接收,该微服务有另一个streamlistener使用者:
@Input(“payment-processor”)
SubscribableChannel paymentProcessorTopic();
@StreamListener("payment-processor”)
public void processNewPayment(Message<PaymentRequest> request) {
// do some processing and action request…
}
如果我们希望更新模型公共库中的paymentrequest类,以便它有一个新字段:
@Data
class PaymentRequest {
String paymentId;
String customerId;
String processorCommand;
}
如果我们更新每个微服务中的依赖项版本,那么每次读取消息时,新字段的值都会反序列化到字段中,每次发送到下一个主题时,都会重新序列化到消息中。
但是,如果我们不更新模型公共库的版本,在第二个服务链中。例如,brokerservice会将消息反序列化为没有新字段的类的版本,因此当消息重新序列化为发送到支付处理器主题的消息时,avro消息将没有该字段的数据。
第三个微服务paymentprocessor可能具有包含新字段的model common lib的版本,但是当消息反序列化到pojo中时,该字段的值将为null。
我知道avro具有模式演化的特性,可以为新字段指定默认值,以允许向后和向前兼容,但这对我们来说还不够,我们需要真正的值。理想情况下,我们不希望出现必须在每个微服务中更新模型库的依赖版本的情况,因为这会引入大量工作和服务之间的耦合。通常,链中间的服务不需要一个新字段,例如,可能只与第一个服务和最后一个服务相关。
那么,有没有其他人面对这个问题,想到了一个好办法?我们渴望不失去avro的强大和springcloudstream的便利,但不存在这样的依赖性问题。关于自定义序列化程序/反序列化程序,我们可以试试吗?还是使用genericrecords?或者完全不同的方法?
谢谢你的帮助!
暂无答案!
目前还没有任何答案,快来回答吧!