选择所有字段作为Json字符串作为Flink SQL中的新字段

k3bvogb1  于 2022-12-09  发布在  Apache
关注(0)|答案(2)|浏览(514)

我正在使用Flink Table API。我有一个表定义,我想选择所有字段,并将它们转换为新字段中的JSON字符串。
我的表有三个字段; a: String, b: Int, c: Timestamp.
如果我做了

INSERT INTO kinesis
SELECT a, b, c from my_table

kinesis流具有json记录;

{
  "a" : value,
  "b": value,
  "c": value
}

不过,我想要的是类似星火的功能;

INSERT INTO kinesis
SELECT "constant_value" as my source, to_json(struct(*)) as playload from my_table

所以,预期的结果是;

{
  "my_source": "constant_value",
  "payload": "json string from the first example that has a,b,c"
}

我在Flink中看不到任何to_jsonstruct()函数。是否可以实现?

eh57zj3b

eh57zj3b1#

您可能必须实现自己用户定义聚合函数。
这就是我所做的,这里我假设UDF的输入如下所示
to_json(“列1,”列1,“列2,”列2)

public class RowToJson extends ScalarFunction {
    public String eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object... row) throws Exception {
        if(row.length % 2 != 0) {
            throw new Exception("Wrong key/value pairs!");
        }

        String json = IntStream.range(0, row.length).filter(index -> index % 2 == 0).mapToObj(index -> {
            String name = row[index].toString();
            Object value = row[index+1];
            ... ...
        }).collect(Collectors.joining(",", "{", "}"));
        return json;
    }
}

如果您希望udf可以用于group by,则必须从AggregateFunction扩展udf类

public class RowsToJson extends AggregateFunction<String, List<String>>{
    @Override
    public String getValue(List<String> accumulator) {
        return accumulator.stream().collect(Collectors.joining(",", "[", "]"));
    }

    @Override
    public List<String> createAccumulator() {
        return new ArrayList<String>();
    }

    public void accumulate(List<String> acc, @DataTypeHint(inputGroup = InputGroup.ANY) Object... row) throws Exception {
        if(row.length % 2 != 0) {
            throw new Exception("Wrong key/value pairs!");
        }
        String json = IntStream.range(0, row.length).filter(index -> index % 2 == 0).mapToObj(index -> {
            String name = row[index].toString();
            Object value = row[index+1];
            ... ...
        }).collect(Collectors.joining(",", "{", "}"));
        acc.add(json);
    }

}
jckbn6z7

jckbn6z72#

从Flink 1.15开始,JSON_OBJECT SQL函数可以帮助您从单个列创建JSON字段:flink json函数
SELECT JSON_OBJECT('col1 '值列1,'col2'值列2)FROM表格

相关问题