使用flink sql如何unest一个数组并与另一个表连接,然后将结果聚合到json数组

d7v8vwbk  于 2023-08-01  发布在  Apache
关注(0)|答案(1)|浏览(210)

我有一个表1(源代码是Kafka)事件作为

{
"id": "id-1",
"segs": ["seg-id-1", "segs-id-2"]
}

字符串
我有表2(源代码是Kafka)事件作为

{
"id": "seg-id-1",
"name": "segs-name-1"
}


我有我的sql作为

CREATE VIEW IF NOT EXISTS FINAL_UNESTED_WITH_SEGMENTS AS (
SELECT 
       id
       segs_id,
       sc.name                                                            as segs_name,
from table1 r CROSS JOIN unnest(json_string_array_to_array(segs)) AS SegmentContentTable (unnested_segments_id)  
         LEFT JOIN table2 sc on (unnested_segments_id = sc.id));


这里json_string_array_to_array是一个自定义UDF,它将JSON字符串转换为数组。
一旦我有了这个,我就可以在这个上面创建一个视图,然后运行select as

inert into final_table 
select json_agg_custom('seg_id', unnested_segments_id, 'seg_name', segment_name) as segs
from FINAL_UNESTED_WITH_SEGMENTS group by id;


这里的最终汇是Kafka的主题。关于Kafka的主题。我看到两条不同消息出现

{
         "id": "id-1"
         "segs": [{"id":"seg-id-1", "name":"segs-name-1"}]
         }

         {
         "id": "id-1"
         "segs": [{"id":"seg-id-2", "name":"segs-name-2"}]
         }


要我想看到的是这个,我怎么才能实现这个呢?

{
  "id": "id-1",
  "segs": [
    {
      "id": "seg-id-1",
      "name": "segs-name-1"
    },
    {
      "id": "seg-id-2",
      "name": "segs-name-2"
    }
  ]
}

zsohkypk

zsohkypk1#

自定义udf json_agg_custom定义为

package com.sailpoint.udf;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.dataview.ListView;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.types.Row;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;

public class ObjectArrayAggregationsFunction extends AggregateFunction<List<String>, ObjectArrayAggregationsFunction.Accumulator> {

    private static final JsonFactory JSON_FACTORY = new JsonFactory();
    private static final ObjectMapper MAPPER =
            new ObjectMapper(JSON_FACTORY)
                    .configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);

    @Override
    public Accumulator createAccumulator() {
        return new Accumulator();
    }

    public void resetAccumulator(Accumulator acc) {
        acc.listView.clear();
    }

    public void accumulate(Accumulator accumulator, String... strings) throws Exception {
        System.out.println("accu strings : " + Arrays.toString(strings));

        String[] clone = strings.clone();
        accumulator.listView.add(String.join(",", clone));

    }

    @Override
    public List<String> getValue(Accumulator accumulator) {

        List<String> listout = new ArrayList<>();
        try {
            for (final String item : accumulator.listView.get()) {
                String[] split = item.split(",");
                ObjectNode objectNode = MAPPER.createObjectNode();
                for (int i = 0; i < split.length; i += 2) {
                    String key = split[i];
                    String value = split[i + 1];
                    objectNode.put(key, value);
                }
                listout.add(MAPPER.writeValueAsString(objectNode));

            }
            System.out.println("listout " + listout);
            return listout;
        } catch (Exception e) {
            throw new TableException("The accumulator state could not be serialized.", e);
        }

    }

    public void retract(Accumulator acc, String... strings) throws Exception {
        String[] clone = strings.clone();
        acc.listView.remove(String.join(",", clone));
    }

    public void merge(Accumulator acc, Iterable<Accumulator> others) throws Exception {
        for (final Accumulator other : others) {
            acc.listView.addAll(other.listView.getList());
        }
    }

    public static class Accumulator {
        public ListView<String> listView = new ListView<>();

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            Accumulator that = (Accumulator) o;
            return Objects.equals(listView, that.listView);
        }

        @Override
        public int hashCode() {
            return Objects.hash(listView);
        }
    }
}
package com.sailpoint.udf;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.dataview.ListView;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.types.Row;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;

public class ObjectArrayAggregationsFunction extends AggregateFunction<List<String>, ObjectArrayAggregationsFunction.Accumulator> {

    private static final JsonFactory JSON_FACTORY = new JsonFactory();
    private static final ObjectMapper MAPPER =
            new ObjectMapper(JSON_FACTORY)
                    .configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);

    @Override
    public Accumulator createAccumulator() {
        return new Accumulator();
    }

    public void resetAccumulator(Accumulator acc) {
        acc.listView.clear();
    }

    public void accumulate(Accumulator accumulator, String... strings) throws Exception {
        System.out.println("accu strings : " + Arrays.toString(strings));

        String[] clone = strings.clone();
        accumulator.listView.add(String.join(",", clone));

    }

    @Override
    public List<String> getValue(Accumulator accumulator) {

        List<String> listout = new ArrayList<>();
        try {
            for (final String item : accumulator.listView.get()) {
                String[] split = item.split(",");
                ObjectNode objectNode = MAPPER.createObjectNode();
                for (int i = 0; i < split.length; i += 2) {
                    String key = split[i];
                    String value = split[i + 1];
                    objectNode.put(key, value);
                }
                listout.add(MAPPER.writeValueAsString(objectNode));

            }
            System.out.println("listout " + listout);
            return listout;
        } catch (Exception e) {
            throw new TableException("The accumulator state could not be serialized.", e);
        }

    }

    public void retract(Accumulator acc, String... strings) throws Exception {
        String[] clone = strings.clone();
        acc.listView.remove(String.join(",", clone));
    }

    public void merge(Accumulator acc, Iterable<Accumulator> others) throws Exception {
        for (final Accumulator other : others) {
            acc.listView.addAll(other.listView.getList());
        }
    }

    public static class Accumulator {
        public ListView<String> listView = new ListView<>();

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            Accumulator that = (Accumulator) o;
            return Objects.equals(listView, that.listView);
        }

        @Override
        public int hashCode() {
            return Objects.hash(listView);
        }
    }
}
``` i able to get it working.

字符串

相关问题