带kafka+合流模式注册表客户端的spring云流坏了吗?

lnlaulya  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(198)

我很好奇是否有人能像我现在这样工作。
我创建了简单的源和接收器应用程序来发送和接收基于avro模式的消息。消息的模式保存在合流模式注册表中。这两个应用程序都配置为使用confluentschemaregistryclient类,但我认为这里的某个地方可能有一个bug。以下是我看到的让我疑惑的东西。
如果我与confluent registry的rest api交互,我可以看到只有一个有问题的模式版本(稍加编辑以隐藏我正在处理的内容):

$ curl -i "http://schemaregistry:8081/subjects/somesubject/versions"
HTTP/1.1 200 OK
Date: Fri, 05 May 2017 16:13:37 GMT
Content-Type: application/vnd.schemaregistry.v1+json
Content-Length: 3
Server: Jetty(9.2.12.v20150709)

[1]

当源应用程序通过kafka发送消息时,我注意到标题中的版本看起来有点怪异:

contentType"application/octet-stream"originalContentType/"application/vnd.somesubject.v845+avro"

我不是100%清楚为什么 application/vnd.somesubject.v845+avro 内容类型包含在 application/octet-stream 但忽略这一点,请注意它说的是版本845而不是版本1。
在看 ConfluentSchemaRegistryClient 我看到了吗 POST s到 /subjects/(string: subject)/versions 并返回模式的id,而不是版本。然后把它放进 SchemaReferenceversion 字段:https://github.com/spring-cloud/spring-cloud-stream/blob/master/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/client/confluentschemaregistryclient.java#l81
当接收器应用程序尝试基于标头获取消息的架构时,它失败,因为它尝试获取从标头中提取的版本845:https://github.com/spring-cloud/spring-cloud-stream/blob/master/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/client/confluentschemaregistryclient.java#l87
有人对此有想法吗?提前谢谢。

更新

好吧,我确信这是个错误。拿走了 ConfluentSchemaRegistryClient 并修改了 register 方法 POST/subjects/(string: subject) (即,丢弃尾随 /versions )哪个restapi文档根据合流返回一个有效负载 version 在里面。很有魅力:

public SchemaRegistrationResponse register(String subject, String format, String schema) {
    Assert.isTrue("avro".equals(format), "Only Avro is supported");
    String path = String.format("/subjects/%s", subject);
    HttpHeaders headers = new HttpHeaders();
    headers.put("Accept",
            Arrays.asList("application/vnd.schemaregistry.v1+json", "application/vnd.schemaregistry+json",
                    "application/json"));
    headers.add("Content-Type", "application/json");
    Integer version = null;
    try {
        String payload = this.mapper.writeValueAsString(Collections.singletonMap("schema", schema));
        HttpEntity<String> request = new HttpEntity<>(payload, headers);
        ResponseEntity<Map> response = this.template.exchange(this.endpoint + path, HttpMethod.POST, request,
                Map.class);
        version = (Integer) response.getBody().get("version");
    }
    catch (JsonProcessingException e) {
        e.printStackTrace();
    }
    SchemaRegistrationResponse schemaRegistrationResponse = new SchemaRegistrationResponse();
    schemaRegistrationResponse.setId(version);
    schemaRegistrationResponse.setSchemaReference(new SchemaReference(subject, version, "avro"));
    return schemaRegistrationResponse;
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题