invalidtypesexception

uemypmqf  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(272)

我对Apache·Flink有意见。我想要一个使用流的抽象类。然而,应用于这个流的模式应该是可互换的。

public abstract class AbstractVisitConsumer<TEventType>

teventtype标记从模式生成的事件的类型。每个模式都必须实现一个称为ieventpattern的接口

public interface IEventPattern<TStreamInput, TMatchedEvent> extends Serializable {

TMatchedEvent create(Map<String, List<TStreamInput>> pattern);

Pattern<TStreamInput, ?> getEventPattern();

抽象类有一个名为applypatternselecttostream()的方法

DataStream<TEventType> applyPatternSelectToStream(DataStream<VisitEvent> stream, IEventPattern<VisitEvent, TEventType> pattern) {
    DataStream<TEventType> patternStream = CEP.pattern(stream, pattern.getEventPattern()).select(new PatternSelectFunction<VisitEvent, TEventType>() {
        @Override
        public TEventType select(Map<String, List<VisitEvent>> map) throws Exception {
            return pattern.create(map);
        }
    }).returns(this.typeInformation);
    return patternStream;
}

flink编译器总是给我错误

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'TEventType' in 'class com.felix.AbstractVisitConsumer' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s)

我的类workplaceconsumer扩展了前面提到的抽象类,以指定从流生成的所需事件。

public class WorkPlaceConsumer extends AbstractVisitConsumer<WorkPlaceEvent> {

public WorkPlaceConsumer(TypeInformation typeInfo) {
    super(TypeInformation.of(WorkPlaceEvent.class));
}

public static void main(String[] args) {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    WorkPlaceConsumer consumer = new WorkPlaceConsumer();
    DataStream<VisitEvent> visitStream = consumer.getVisitStream(env);
    DataStream<WorkPlaceEvent> workPlaceStream = consumer.applyPatternSelectToStream(visitStream, new WorkPlacePattern());

    visitStream.print();
    workPlaceStream
            .keyBy((KeySelector<WorkPlaceEvent, Integer>) event -> event.getUserId())
            .filter(new NewWorkPlaceEventFilter())
            .print();

    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

我已经尝试实现resulttypequeryable接口,并尝试通过在运行时传递子类的类型信息,将类型信息存储在抽象类中。我还使用了.returns作为方法来提供手动类型信息。也许我只是做错了。有人对流上的泛型转换有类似的问题吗?
提前谢谢。

rkkpypqq

rkkpypqq1#

好吧,我又看了你的问题,你是对的,它和lambda表达式无关。问题是类型擦除应用于 PatternSelectFunction .
您可以通过实现 ResultTypeQueryable 接口。你可以这样做,例如:

public interface InnerPatternSelectFunction<T> extends PatternSelectFunction<String, T>, ResultTypeQueryable<T>{};

public DataStream<T> applyPatternSelectToStream(DataStream<String> stream, IEventPattern<String, T> pattern) {
    TypeInformation<T> producedType = this.typeInformation;
    return CEP.pattern(stream, pattern.getEventPattern()).select(new InnerPatternSelectFunction<T>() {
        @Override
        public TypeInformation<T> getProducedType() {
            return producedType;
        }

        @Override
        public T select(Map<String, List<String>> map) throws Exception {
            return pattern.create(map);
        }
    });
}

当然,这只是一个建议,我认为您可以改进代码;)。但是你得到了实施 ResultTypeQueryable 接口。

相关问题