我正在尝试创建kafka connect值转换器,它用一个有效的json对象 Package 无效的json记录。
我从kinesis读取值(使用kinesissourceconnector),所以输入是base64编码。
我的实现尝试通过bytearrayconverter处理输入,bytearrayconverter对数据进行解码,并将输出委托给jsonconverter,如下所示(在configure方法中将decode初始化为true):
private final Converter delegate = new JsonConverter();
private final Converter decoder = new ByteArrayConverter();
private boolean decode = false;
@Override
public byte[] fromConnectData(String topic, Schema schema, Object value) {
try {
String decoded = new String(decoder.fromConnectData(topic, schema, value));
LOG.info("decoded string\n" + decoded);
if(decode) {
byte[] bytes = decoder.fromConnectData(topic, schema, value);
return delegate.fromConnectData(topic, schema, bytes);
}
return delegate.fromConnectData(topic, schema, value);
} catch (Exception e) {
LOG.error("something went wrong", e);
return delegate.fromConnectData(topic, schema, wrapInvalidJson(new String(decoder.fromConnectData(topic, schema, value))));
}
}
当我打印解码的字符串时,它看起来很正常(解码的json字符串)
但是当我使用输出主题时,它看起来又像base64,我不确定我遗漏了什么
1条答案
按热度按时间66bbxpm51#
不确定它是否是最佳的,但选择了这种方法