如何修复导致糟糕性能的缓存模式注册表查找

i86rm4rw  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(334)

编辑:几年前我发现了另一个问题(如何在cachedschemaregistryclient中填充缓存而不调用注册新模式?)。它提到cachedschemaregistryclient需要将schema注册到实际的注册表中以使其缓存,目前还没有解决这个问题的方法。所以把我的问题留在这里,但我也希望大家都知道。
我正在编写一个程序,它从kafka中提取一个字节数组,对它进行解密(这样在kafka上是安全的),将字节转换为字符串,将json字符串转换为json对象,从schema注册表中查找schema(利用cachedschemaregistryclient),使用从注册表元数据检索到的模式中的模式将json字节转换为通用记录,然后将该通用记录序列化为avro字节。
在运行了一些测试之后,cachedschemaregistyclient似乎是主要的性能消耗。但据我所知,这是获取模式元数据的最佳方法。我是否实现了一些糟糕的东西,或者是否有其他方法可以在我的用例中使用?
以下是解密后处理一切的代码:

package org.apache.flink;

import avro.fullNested.FinalMessage;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import serializers.AvroFinishedMessageSerializer;
import tech.allegro.schema.json2avro.converter.JsonAvroConverter;

public class JsonToAvroBytesParser implements FlatMapFunction<String, byte[]> {

    private transient CachedSchemaRegistryClient schemaRegistryClient;
    private transient AvroFinishedMessageSerializer avroFinishedMessageSerializer;
    private String schemaUrl;
    private Integer identityMaxCount;

    public JsonToAvroBytesParser(String passedSchemaUrl, int passedImc){
        schemaUrl = passedSchemaUrl;
        identityMaxCount = passedImc;
    }

    private void ensureInitialized() {
        if (schemaUrl.equals("")) {
            schemaUrl = "https://myschemaurl.com/";
        }
        if(identityMaxCount == null){
            identityMaxCount = 5;
        }
        if(schemaRegistryClient == null){
            schemaRegistryClient = new CachedSchemaRegistryClient(schemaUrl, identityMaxCount);
        }
        if(avroFinalMessageSerializer == null){
            avroFinalMessageSerializer = new AvroFinalMessageSerializer(FinalMessage.class);
        }
    }

    @Override
    public void flatMap(String s, Collector<byte[]> collector) throws Exception {

        ensureInitialized();

        Object obj = new JSONParser().parse(s);
        JSONObject jsonObject = (JSONObject) obj;

        try {
            String headers = jsonObject.get("headers").toString();
            JSONObject body = (JSONObject) jsonObject.get("requestBody");
            if(headers != null && body != null){
                String kafkaTopicFromHeaders = "hard_coded_name-value";
                //NOTE: this schema lookup has serious performance issues.
                SchemaMetadata schemaMetadata = schemaRegistryClient.getLatestSchemaMetadata(kafkaTopicFromHeaders);
                //TODO: need to implement recovery method if schema cannot be reached.

                JsonAvroConverter converter = new JsonAvroConverter();
                GenericRecord specificRecord = converter.convertToGenericDataRecord(body.toJSONString().getBytes(), new Schema.Parser().parse(schemaMetadata.getSchema()));
                byte[] bytesToReturn = avroFinishedMessageSerializer.serializeWithSchemaId(schemaMetadata, specificRecord);

                collector.collect(bytesToReturn);
            }
            else {
                System.out.println("json is incorrect.");
            }
        } catch (Exception e){
            System.out.println("json conversion exception caught");
        }
    }
}

谢谢你的帮助!

inkz8wg9

inkz8wg91#

getlatestschemametadata方法似乎不使用缓存。如果您希望您的调用使用缓存来提高性能,那么您可以重新组织您的程序以使用其他使用缓存的方法之一,例如按id查找schema或按名称用定义字符串注册schema。
我很难找到java(或python,或c++)文档来确认schemaregistry是如何工作的(在这里尝试过)。但是.net文档说至少在客户端api中getlatest方法没有被缓存。

相关问题