apache flink-使用带有泛型类型参数的类

3j86kqsm  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(738)

如何在flink中使用具有泛型类型的类?我遇到了错误:
函数“main(streamingjob)”的返回类型。java:63)'由于类型擦除,无法自动确定。通过对转换调用的结果使用returns(…)方法,或者让函数实现“resulttypequeryable”接口,可以给出类型信息提示。
我使用的类的形式如下:

class MaybeProcessable<T> {
    private final T value;

    public MaybeProcessable(T value) {
        this.value = value;
    }

    public T get() {
        return value;
    }
}

我以Flink的工作为例:

public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.addSource(new PubSubSource(PROJECT_ID, SUBSCRIPTION_NAME))
        .map(MaybeProcessable::new)
        .map(MaybeProcessable::get)
        .writeAsText("/tmp/flink-output", FileSystem.WriteMode.OVERWRITE);

    // execute program
    env.execute("Flink Streaming Java API Skeleton");
}

现在我可以使用 .returns() 功能:

.map(MaybeProcessable::new).returns(new MyCustomTypeInformationClass(String.class))

但这需要我编写自己的序列化程序。难道没有更简单的方法来实现这一点吗?

dwbf0jvd

dwbf0jvd1#

你可以用 .returns(TypeInformation.of(new TypeHint<MaybeProcessing<#CONCRETE_TYPE_HERE>>{}) 对于每次重复使用泛型mapfunction来设置返回类型而不创建任何自定义类。

相关问题