Flink动态生成TypeInformation

qyswt5oh  于 2023-01-19  发布在  Apache
关注(0)|答案(1)|浏览(232)

我正在尝试通过RichMapFunction〈Row,Row〉解析数据流中一行的嵌套字段,这个的输入和输出都是Row类型,一行中的这个嵌套列可以有任意数量的字段。

DataStream<Row> outStream =  stream.map(new ParsePayload(functionMap, inputTypeInformation))
        .returns(<output TypeInformation>)
        .uid("ParseNestedColumn");

private static class ParsePayload extends RichMapFunction<Row, Row> implements Serializable
{
@Override
public Row map(Row row) throws Exception {
    <business logic>
    …….
    return resultRow;
}
}

问题是,我只想在计算map函数或创建输出行之后返回行的类型信息,因为行中的字段不是固定的。
我已经尝试了Types.ROW_NAMED()和ResultTypeQueryable接口,但是这两个接口都在计算map函数之前检查类型信息,这样我就无法向流提供类型信息。
我不想为我的作业启用泛型类型。

a64a0gku

a64a0gku1#

我觉得这不可能。
例如,如果您考虑使用TypeInformation生成源代码所需的序列化程序,那么您将在此处创建一个循环依赖(源代码需要TypeInformation将数据传递给map,但它不能,因为map需要首先提供TypeInformation)。
有不同的方法可以实现这一点,但所有这些方法都需要使用Generic*类,或者以byte[]String的形式获取数据并手动解析为预期类型,或者尝试实现可以表示您需要处理的所有可能性的自定义类型。

相关问题