java融合jdbc连接器和flink使用者

wvt8vs2t  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(405)

我们正在尝试将sql server jdbc连接器与kafkaavroserializer结合使用,并提供定制的producerinterceptor,以便在将数据发送到kafka之前对其进行加密。
在使用者端,我们希望使用flink连接器来解密,然后使用适当的反序列化程序。
为了达到这个目的,我们有几个问题:
1) 如果我们提供定制的consumerinterceptor来解密数据,那么当我们在flink中创建数据流时,应该通过属性文件传递吗?

Properties properties = new Properties();
        ...
    `properties.setProperty("consumer.interceptor.classes": "OurCusromDecryptConsumerInterceptor")`;
    ...

    DataStream<String> stream = env.addSource(new FlinkKafkaConsumer011<>("sqlserver-foobar", ???, properties));

上面的配置是否正确,或者我是否需要设置任何其他属性,以便将consumerinterceptor传入flink?
2) 另一个问题是关于flink中的反序列化程序。例如,我在网上查了一下,发现很少有这样的代码片段:

public class ConfluentAvroDeserializationSchema implements DeserializationSchema<Type ??> { 

    private final String schemaRegistryUrl; 
    private final int identityMapCapacity; 
    private KafkaAvroDecoder kafkaAvroDecoder; 

    public ConfluentAvroDeserializationSchema(String schemaRegistyUrl) { 
        this(schemaRegistyUrl, 1000); 
    }

因此,如果我们使用jdbc连接器向kafka传递数据而不做任何修改(除了加密数据),那么在反序列化过程中我们应该提供什么样的数据类型?我们将在反序列化之前解密数据。

public class ConfluentAvroDeserializationSchema implements DeserializationSchema<Type ??> {

提前谢谢,

wfveoks0

wfveoks01#

只需添加最终结果,就可以帮助任何正在寻找相同结果的人:

public class ConfluentAvroDeserializationSchema
            implements DeserializationSchema<GenericRecord> {

        private final String schemaRegistryUrl;
        private final int identityMapCapacity;
        private transient KafkaAvroDecoder kafkaAvroDecoder;

        public ConfluentAvroDeserializationSchema(String schemaRegistyUrl) {
            this(schemaRegistyUrl, 1000);
        }

        public ConfluentAvroDeserializationSchema(String schemaRegistryUrl, int
                identityMapCapacity) {
            this.schemaRegistryUrl = schemaRegistryUrl;
            this.identityMapCapacity = identityMapCapacity;
        }

        @Override
        public GenericRecord deserialize(byte[] bytes) throws IOException {
            if (kafkaAvroDecoder == null) {
                SchemaRegistryClient schemaRegistry = new
                        CachedSchemaRegistryClient(this.schemaRegistryUrl,
                        this.identityMapCapacity);
                this.kafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry);
            }
            return (GenericRecord) this.kafkaAvroDecoder.fromBytes(bytes);
        }

        @Override
        public boolean isEndOfStream(GenericRecord string) {
            return false;
        }

        @Override
        public TypeInformation<GenericRecord> getProducedType() {
            return TypeExtractor.getForClass(GenericRecord.class);
        }
    }

相关问题