ksql流-在json上迭代

0lvr5msh  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(348)

如何在ksql流中迭代json:
我的json看起来像:

{
   "Obj" {
       "ID" : "1"
    },
    "KeyValues": {
        "Key1": "value1",
        "Key2": "value2",
        "Key3": "value3",
        "Key4": "value4",
        "Key5": "value5",
        "Key6": "value6",
        "Key7": "value7",
        "Key8": "value8",
        "Key9": "value9",
        "Key10": "value10",
            |
            |
            |
            |

        "KeyN": "valueN"
  }
}

如何在ksql中低于o/p。需要在json对象上迭代n个元素,如下所示。

ID    KEY              VALUE
----------------------------------    
   1     Key1            value1
   1     Key2            value2
   1     Key3            value3
   1     Key4            value4
   1     Key5            value5
   1     Key6            value6
   1     Key7            value7
   1     Key8            value8
   1     Key9            value9
   1     Key10           value10
   1            |
   1            |
   1            |
   1            |
   1 
   1      KeyN          valueN

提前谢谢。

zxlwwiss

zxlwwiss1#

似乎udtf是适合你的解决方案。您可以将explode看作udtf的一个示例,它接收一个数组,然后输出n行,每个元素一行。
udtf的签名类似:

@Udtf(schema = "STRUCT<key VARCHAR, value VARCHAR>")
public <T> List<Struct> expandMapEntries(final Map<String, String> input) {
  // output a list of key value pairs as a struct from 'input'
}

然后您可以使用这个udtf并从中选择字段(如下所示):

CREATE STREAM expanded AS SELECT EXPAND_MAP_ENTRIES(KeyValues) AS keyVals FROM source;
CREATE STREAM flattened AS keyVals->key as `KEY`, keyVals->value AS VALUE FROM expanded;

让我知道这是否对您有效,并随时联系社区slack(@almog)-我对这个用例非常感兴趣。

相关问题