围绕spring云流、avro、kafka和沿途数据丢失的设计问题

3lxsmp7m  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(335)

我们已经实现了一个由几个spring-boot微服务组成的系统,这些服务通过发布到kafka主题的消息进行通信。我们使用springcloudstream来处理通过kafka发送和接收消息的大量繁重工作。我们使用apacheavro作为传输协议,与模式服务器集成(springcloudstream默认实现用于本地开发,confluent用于生产)。


我们在公共库中对消息类进行建模,每个微服务都将其作为依赖项包含。当微服务充当生产者并发送消息时,在avro序列化发生之前,我们使用“动态模式生成”从消息类的形状推断avro模式。消费微服务可以根据模式版本从注册表中查找模式,并反序列化到消息类中,消息类也是它的一个依赖项。


它工作得很好,但是有一个很大的缺点,我想知道是否有人经历过,可以提供任何建议。例如,如果我们希望将一个新字段添加到一个模型类中,我们可以在公共模型类库中进行添加,并在微服务中更新该依赖项的名称。但这意味着我们需要更新链上每个微服务的依赖关系版本,即使中间的微服务不需要这个新字段。否则,该新字段的数据值将沿途丢失,因为微服务使用者沿途将反序列化为对象(可能是类的过期版本)。


举个例子,假设我们的模型公共库中有一个名为paymentrequest的模型类(@data注解是lombok,juts从字段中生成getter和setter):

@Data

class PaymentRequest {
  
  String paymentId;
  
  String customerId;
}



我们有一个叫做payservice的微服务,它向kafka主题发送paymentrequest消息:



@Output("payment-broker”)
MessageChannel paymentBrokerTopic();
...

PaymentRequest paymentRequest = getPaymentRequest();

Message<PaymentRequest> message = MessageBuilder.withPayload(paymentRequest).build();
paymentBrokerTopic().(message);


我们在spring boot应用程序的application.yaml中有这样的配置:



spring:
   cloud:
    stream:
      schema-registry-client:
        endpoint: http://localhost:8071
      schema:
        avro:
          dynamicSchemaGenerationEnabled: true
      bindings:
        Payment-broker:
          destination: paymentBroker
          contentType: application/*+avro


springcloudstream的avro-messageconverter从paymentrequest对象推断出模式,如果那里还没有匹配的模式,则向schema注册表添加一个模式,并以avro格式在kafka上发送消息。

然后我们在另一个微服务brokerservice中有一个消费者,它有这个消费者:



@Output("payment-processor”)
MessageChannel paymentProcessorTopic();



@Input(“payment-request”)
SubscribableChannel paymentRequestTopic();


@StreamListener("payment-request")
public void processNewPayment(Message<PaymentRequest> request) {
   // do some processing and then send on…
   
  paymentProcessorTopic().(message);

}



它能够将来自kafka的avro消息反序列化到paymentrequest pojo中,对其进行一些额外的处理,然后将消息发送到另一个主题,称为paymentprocessor,然后由另一个微服务paymentprocessor接收,该微服务有另一个streamlistener使用者:




@Input(“payment-processor”)
SubscribableChannel paymentProcessorTopic();



@StreamListener("payment-processor”)
public void processNewPayment(Message<PaymentRequest> request) {
   // do some processing and action request…

}



如果我们希望更新模型公共库中的paymentrequest类,以便它有一个新字段:



@Data
class PaymentRequest {
  
  String paymentId;
  
  String customerId;
  
  String processorCommand;

}



如果我们更新每个微服务中的依赖项版本,那么每次读取消息时,新字段的值都会反序列化到字段中,每次发送到下一个主题时,都会重新序列化到消息中。


但是,如果我们不更新模型公共库的版本,在第二个服务链中。例如,brokerservice会将消息反序列化为没有新字段的类的版本,因此当消息重新序列化为发送到支付处理器主题的消息时,avro消息将没有该字段的数据。
第三个微服务paymentprocessor可能具有包含新字段的model common lib的版本,但是当消息反序列化到pojo中时,该字段的值将为null。


我知道avro具有模式演化的特性,可以为新字段指定默认值,以允许向后和向前兼容,但这对我们来说还不够,我们需要真正的值。理想情况下,我们不希望出现必须在每个微服务中更新模型库的依赖版本的情况,因为这会引入大量工作和服务之间的耦合。通常,链中间的服务不需要一个新字段,例如,可能只与第一个服务和最后一个服务相关。


那么,有没有其他人面对这个问题,想到了一个好办法?我们渴望不失去avro的强大和springcloudstream的便利,但不存在这样的依赖性问题。关于自定义序列化程序/反序列化程序,我们可以试试吗?还是使用genericrecords?或者完全不同的方法?


谢谢你的帮助!


暂无答案!

目前还没有任何答案,快来回答吧!

相关问题