如何从kafka json消息中获取org.apache.kafka.connect.data.decimal值

ukxgm1gy  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(428)

这个问题在这里已经有了答案

如何通过debeziumcdc机制反序列化从kafka代理接收的bigdecimal值(1个答案)
去年关门了。
我使用debizium将postgresql数据流传输到kafka,并使用java订阅kafka主题。
我收到kafka消息并得到一个json字符串,但有一个数值无法识别。
json是:

{
    "schema":
    {
        "type": "struct",
        "fields": [
        {
            "type": "struct",
            "fields": [
            {
                "type": "string",
                "optional": true,
                "field": "creator"
            },
            {
                "type": "int64",
                "optional": true,
                "name": "io.debezium.time.MicroTimestamp",
                "version": 1,
                "field": "createtime"
            },
            {
                "type": "bytes",
                "optional": true,
                "name": "org.apache.kafka.connect.data.Decimal",
                "version": 1,
                "parameters":
                {
                    "scale": "5",
                    "connect.decimal.precision": "32"
                },
                "field": "familyprice"
            }],
            "optional": true,
            "name": "pssdev.public.order.Value",
            "field": "before"
        },
        {
            "type": "struct",
            "fields": [
            {
                "type": "string",
                "optional": true,
                "field": "creator"
            },
            {
                "type": "int64",
                "optional": true,
                "name": "io.debezium.time.MicroTimestamp",
                "version": 1,
                "field": "createtime"
            },
            {
                "type": "bytes",
                "optional": true,
                "name": "org.apache.kafka.connect.data.Decimal",
                "version": 1,
                "parameters":
                {
                    "scale": "5",
                    "connect.decimal.precision": "32"
                },
                "field": "familyprice"
            }],
            "optional": true,
            "name": "pssdev.public.order.Value",
            "field": "after"
        },
        {
            "type": "struct",
            "fields": [
            {
                "type": "string",
                "optional": true,
                "field": "version"
            },
            {
                "type": "string",
                "optional": true,
                "field": "connector"
            },
            {
                "type": "string",
                "optional": false,
                "field": "name"
            },
            {
                "type": "string",
                "optional": false,
                "field": "db"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_usec"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "txId"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "lsn"
            },
            {
                "type": "string",
                "optional": true,
                "field": "schema"
            },
            {
                "type": "string",
                "optional": true,
                "field": "table"
            },
            {
                "type": "boolean",
                "optional": true,
                "default": false,
                "field": "snapshot"
            },
            {
                "type": "boolean",
                "optional": true,
                "field": "last_snapshot_record"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "xmin"
            }],
            "optional": false,
            "name": "io.debezium.connector.postgresql.Source",
            "field": "source"
        },
        {
            "type": "string",
            "optional": false,
            "field": "op"
        },
        {
            "type": "int64",
            "optional": true,
            "field": "ts_ms"
        }],
        "optional": false,
        "name": "pssdev.public.order.Envelope"
    },
    "payload":
    {
        "before":
        {
            "creator": null,
            "createtime": null,
            "familyprice": null
        },
        "after":
        {
            "creator": "USER1E",
            "createtime": 1554292597815000,
            "familyprice": "W42A"
        },
        "source":
        {
            "version": "0.9.5.Final",
            "connector": "postgresql",
            "name": "pssdev",
            "db": "pf",
            "ts_usec": 1561459811737920,
            "txId": 771604,
            "lsn": 88282458880,
            "schema": "public",
            "table": "order",
            "snapshot": false,
            "last_snapshot_record": null,
            "xmin": null
        },
        "op": "u",
        "ts_ms": 1561459811747
    }
}

familyprice值为 W42A 我不知道怎么转换它。
数据库中的实际familyprice值为60.00000,列类型为数字(32,5)

rkttyhzu

rkttyhzu1#

请看https://debezium.io/docs/faq/#how_to_retrieve_decimal_field_from_binary_representation 对于java客户机。
您还可以设置 decimal.handling.mode 设置为不同的值,以便您可以将十进制数作为字符串或双精度(如果您更方便的话)。

相关问题