如何将存储在avro文件中的org.apache.kafka.connect.data.decimal转换为python类型?

nom7f22z  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(372)

我试图用python解释debezium在kafka中存储的avro记录

{
              "name": "id",
              "type": {
                "type": "bytes",
                "scale": 0,
                "precision": 64,
                "connect.version": 1,
                "connect.parameters": {
                  "scale": "0"
                },
                "connect.name": "org.apache.kafka.connect.data.Decimal",
                "logicalType": "decimal"
              }
            }

我不确定它对应于哪个python3基元类型。如何反序列化此值?
提前谢谢!

n3ipq98p

n3ipq98p1#

如果你看
https://insight.io/github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/decimal.java

public static byte[] fromLogical(Schema schema, BigDecimal value) {
    if (value.scale() != scale(schema))
        throw new DataException("BigDecimal has mismatching scale value for given Decimal schema");
    return value.unscaledValue().toByteArray();
}

如您所见,它使用bigdecimal,这相当于 Decimal 在python中
什么是python for java的bigdecimal?
所以在这种情况下你应该寻找小数点。
第2部分-反序列化
关于反序列化,我需要反馈来更新答案。到目前为止,你在其他领域是怎么做的?

kiz8lqtg

kiz8lqtg2#

org.apache.kafka.connect.data.Decimal 是未标度整数的base64编码字节表示形式。为了将此值转换为 Decimal ,您需要将base64字符串解码为字节,获得整数,然后按 parameters.scale 价值观。
此架构:

{
  "type": "bytes",
  "name": "org.apache.kafka.connect.data.Decimal",
  "version": 1,
  "parameters": {
    "scale": "9",
    "connect.decimal.precision": "38"
  },
  "field": "amount"
}

可以使用以下代码段进行转换(在pyfiddle上尝试):

ctx = decimal.Context()
ctx.prec = 38  # connect.decimal.precision = 38
result = ctx.create_decimal(
    int.from_bytes(base64.b64decode("GZ6ZFQvYpA=="), byteorder='big')
) / 10**9  # scale = 9

相关问题