我正在使用Flink Table API。我有一个表定义,我想选择所有字段,并将它们转换为新字段中的JSON字符串。
我的表有三个字段; a: String, b: Int, c: Timestamp.
如果我做了
INSERT INTO kinesis
SELECT a, b, c from my_table
kinesis流具有json记录;
{
"a" : value,
"b": value,
"c": value
}
不过,我想要的是类似星火的功能;
INSERT INTO kinesis
SELECT "constant_value" as my source, to_json(struct(*)) as playload from my_table
所以,预期的结果是;
{
"my_source": "constant_value",
"payload": "json string from the first example that has a,b,c"
}
我在Flink中看不到任何to_json
或struct()
函数。是否可以实现?
2条答案
按热度按时间eh57zj3b1#
您可能必须实现自己用户定义聚合函数。
这就是我所做的,这里我假设UDF的输入如下所示
to_json(“列1,”列1,“列2,”列2)
如果您希望udf可以用于group by,则必须从AggregateFunction扩展udf类
jckbn6z72#
从Flink 1.15开始,JSON_OBJECT SQL函数可以帮助您从单个列创建JSON字段:flink json函数
SELECT JSON_OBJECT('col1 '值列1,'col2'值列2)FROM表格