使用goavro处理多种类型的解码数据

zlwx9yxi  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(575)

我想做什么?

对于数据库中的每一个更改,我都试图将debezium事件转换为数据库值的csv,以便加载到redshift中。
对于下面的110更改,我正在尝试制作一个csv文件: 110,vck,desc,221.1 ```
mysql> select * from products;
+-----+-------------+---------------------------------------------------------+--------+
| id | name | description | weight |
+-----+-------------+---------------------------------------------------------+--------+
| 110 | vck | desc | 221.1 |
+-----+-------------+---------------------------------------------------------+--------+

这是我用goavro做的尝试。

#### 程序

https://play.golang.org/p/a8wd0szpueq

package main

import (
"fmt"
"encoding/json"
)

func main() {
debeziumEvent := {"before":null,"after":{"datapipe.inventory.products.Value":{"id":110,"name":"vck","description":{"string":"desc"},"weight":{"double":221.10000610351562}}},"source":{"query":null,"snapshot":{"string":"true"},"server_id":0,"gtid":null,"name":"datapipe","thread":null,"ts_ms":0,"file":"mysql-bin.000049","version":"1.2.1.Final","connector":"mysql","pos":154,"table":{"string":"products"},"row":0,"db":"inventory"},"op":"c","ts_ms":{"long":1597649700266},"transaction":null}

var data map[string]interface{}
    err := json.Unmarshal([]byte(debeziumEvent), &data)
    if err != nil {
        panic(err)
    }

after := data["after"].(map[string]interface{})
csv := make([]interface{}, 0)

for _, v := range after {
    for _, v2 := range v.(map[string]interface{}) {
        switch stype := v2.(type) {
        case map[string]interface{}:
            for _, v3 := range v2.(map[string]interface{}) {
                csv = append(csv, v3)
            }
        case string:
            csv = append(csv, v2)
        case int:
            csv = append(csv, v2)
        case float64:
            csv = append(csv, v2)
        default:
            fmt.Printf("type %s not handled\n", stype) 
            panic("unhandled type")
        }

    }
}

fmt.Println(csv)

}

有没有更好的办法?对于每种数据类型,我需要在这里有一个switch语句。。。。
相关goavro问题:https://github.com/linkedin/goavro/issues/217
5lhxktic

5lhxktic1#

fmt.Sprintf 可用于将接口转换为字符串。 str := fmt.Sprintf("%v", v) 这样做可以将case语句减少到2:

package main

import (
    "fmt"
    "encoding/json"
)

func main() {
    debeziumEvent := `{"before":null,"after":{"datapipe.inventory.products.Value":{"id":110,"name":"vck","description":{"string":"desc"},"weight":{"double":221.10000610351562}}},"source":{"query":null,"snapshot":{"string":"true"},"server_id":0,"gtid":null,"name":"datapipe","thread":null,"ts_ms":0,"file":"mysql-bin.000049","version":"1.2.1.Final","connector":"mysql","pos":154,"table":{"string":"products"},"row":0,"db":"inventory"},"op":"c","ts_ms":{"long":1597649700266},"transaction":null}`

    var data map[string]interface{}
        err := json.Unmarshal([]byte(debeziumEvent), &data)
        if err != nil {
            panic(err)
        }
    //fmt.Printf("data=%v\n", data)

    after := data["after"].(map[string]interface{})
    csv := []string{}

    for _, v := range after {
        for _, v2 := range v.(map[string]interface{}) {
            switch v2.(type) {
            case map[string]interface{}:
                for _, v3 := range v2.(map[string]interface{}) {
                    csv = append(csv, fmt.Sprintf("%v", v3))
                }
            default:
                csv = append(csv, fmt.Sprintf("%v", v2))
            }

        }
    }

    fmt.Println(csv)
}

相关问题