带验证的kafka connect jsonconverter

xu3bshqb  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(309)

我正在尝试创建kafka connect值转换器,它用一个有效的json对象 Package 无效的json记录。
我从kinesis读取值(使用kinesissourceconnector),所以输入是base64编码。
我的实现尝试通过bytearrayconverter处理输入,bytearrayconverter对数据进行解码,并将输出委托给jsonconverter,如下所示(在configure方法中将decode初始化为true):

private final Converter delegate = new JsonConverter();
    private final Converter decoder = new ByteArrayConverter();
    private boolean decode = false;

    @Override
    public byte[] fromConnectData(String topic, Schema schema, Object value) {
        try {
            String decoded = new String(decoder.fromConnectData(topic, schema, value));
            LOG.info("decoded string\n" + decoded);

            if(decode) {
                byte[] bytes = decoder.fromConnectData(topic, schema, value);
                return delegate.fromConnectData(topic, schema, bytes);
            }
            return delegate.fromConnectData(topic, schema, value);
        } catch (Exception e) {
            LOG.error("something went wrong", e);
            return delegate.fromConnectData(topic, schema, wrapInvalidJson(new String(decoder.fromConnectData(topic, schema, value))));
        }
    }

当我打印解码的字符串时,它看起来很正常(解码的json字符串)
但是当我使用输出主题时,它看起来又像base64,我不确定我遗漏了什么

66bbxpm5

66bbxpm51#

不确定它是否是最佳的,但选择了这种方法

private final Converter delegate = new JsonConverter();
    private final Converter decoder = new ByteArrayConverter();
    private final Converter stringConverter = new StringConverter();
    private final ObjectMapper mapper = new ObjectMapper();
    private boolean decode = false;

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        delegate.configure(Collections.singletonMap("schemas.enable", false), false);
        if (configs.containsKey("ni.decode.data") && Boolean.valueOf((String) configs.get("ni.decode.data"))) {
            decode = true;
        }
    }

    @Override
    public byte[] fromConnectData(String topic, Schema schema, Object value) {

        if (decode) {
            String decoded = new String(decoder.fromConnectData(topic, schema, value));
            try {
                return mapper.readTree(decoded).toString().getBytes();
            } catch (Exception e) {
                return wrapInvalidJson(decoded).getBytes();
            }
        } else {
            try {
                return delegate.fromConnectData(topic, schema, value);
            } catch (Exception e) {
                byte[] msg = stringConverter.fromConnectData(topic, schema, value);
                return wrapInvalidJson(new String(msg)).getBytes();
            }
        }
    }

相关问题