apache-flink:invalidprogramexception:无法编译表程序

ztigrdn8  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(367)

我有一个带有avro对象数据流的简单程序,我想从中提取一个字符串字段。我转换 DataStream 变成一个 Table 运行一个带有简单投影的查询。

val kinesisConsumer = new FlinkKinesisConsumer(streamName, new UnifiedEventDeserializationSchema, consumerConfig)
val env = StreamExecutionEnvironment.getExecutionEnvironment
implicit val typeInfo = TypeInformation.of(classOf[UnifiedEvent])
val kinesisStream = env.addSource(kinesisConsumer)
val tableEnv = TableEnvironment.getTableEnvironment(env)
tableEnv.registerDataStream("table1", kinesisStream);
val query = "SELECT nd_key FROM table1"
val result = tableEnv.sql(query)
tableEnv.toAppendStream[org.apache.avro.util.Utf8](result).print()
env.execute()

执行程序时,出现以下异常:
2017年11月29日16:07:36来源:自定义来源->发件人:(接受队列id,管理员id,提交后,支付金额,匿名id,应用程序id,atom密钥,bd组密钥,业务地理,braavos采购id,类别,队列id,概念密钥,概念排名,上下文,上下文活动,上下文实验,优惠券代码,课程密钥,课程排名,cta目的地,cta\u位置、cta\u消息、cta\u类型、货币、决策组\u id、设备\u浏览器、设备\u os、设备\u os版本、设备\u类型、持续时间、评估\u id、事件\u类型、财务\u地理位置、合作\u、实验室\u id、实验室\u等级、标签、课程\u密钥、课程\u等级、区域设置、最大\u暂停\u持续时间、消息、消息\u id、模块\u密钥、模块\u等级、nd \u密钥、nd \u单元\u id,第二单元排名、新队列id、通知id、完成的概念数量、交互数量、完成的课程数量、旧队列id、部分密钥、部分排名、暂停持续时间、暂停原因、付款计划、付款提供商、赚取的积分、可能的积分、价格、价格表、产品密钥、产品类型、提供商收费id、提供商退款id、测验类型、推荐人,退款金额、申请的群组id、结果、奖学金组密钥、搜索条件、技能级别、订阅id、暂停时间、暂停原因、技术、时间戳、总概念、总课程、总时间秒、类型、取消注册原因、用户id、用户区域设置、用户响应、变体、版本、工作区id、工作区会话,工作区类型)->选择:(nd\u键)->到:utf8->接收器:未命名(5/8)切换到失败的org.apache.flink.api.common.invalidprogramexception:无法编译表程序。这是个虫子。请提交一个问题。在org.apache.flink.table.codegen.compiler$class.compile(compiler。scala:36)在org.apache.flink.table.runtime.crowoutputmaprunner.compile(crowoutputmaprunner。scala:33)在org.apache.flink.table.runtime.crowoutputmaprunner.open(crowoutputmaprunner。scala:48)在org.apache.flink.api.common.functions.util.functionutils.openfunction(functionutils。java:36)在org.apache.flink.streaming.api.operators.abstractudfstreamoperator.open(abstractudfstreamoperator。java:111)在org.apache.flink.streaming.runtime.tasks.streamtask.openalloperators(streamtask。java:376)在org.apache.flink.streaming.runtime.tasks.streamtask.invoke(streamtask。java:253)在org.apache.flink.runtime.taskmanager.task.run(task。java:702)在java.lang.thread.run(线程。java:748)原因:org.codehaus.commons.compiler.compileexception:第790行,第15列:无法在org.codehaus.janino.unitcompiler.compileerror(unitcompiler)处将类型“java.lang.charsequence”转换为类型“org.apache.avro.util.utf8”。java:11672)在org.codehaus.janino.unitcompiler.assignmentconversion(unitcompiler。java:10528)在org.codehaus.janino.unitcompiler.compile2(unitcompiler。java:2534)在org.codehaus.janino.unitcompiler.access$2600(unitcompiler。java:212)在org.codehaus.janino.unitcompiler$6.visitlocalvariabledeclarationstatement(unitcompiler。java:1459)在org.codehaus.janino.unitcompiler$6.visitlocalvariabledeclarationstatement(unitcompiler。java:1443)在java$localvariabledeclarationstatement.accept(java。java:3348)在org.codehaus.janino.unitcompiler.compile(unitcompiler。java:1443)在org.codehaus.janino.unitcompiler.compilestatements(unitcompiler。java:1523)在org.codehaus.janino.unitcompiler.compile(unitcompiler。java:3052)在org.codehaus.janino.unitcompiler.compiledClaredMethods(unitcompiler。java:1313)在org.codehaus.janino.unitcompiler.compiledClaredMethods(unitcompiler。java:1286)在org.codehaus.janino.unitcompiler.compile2(unitcompiler。java:785)在org.codehaus.janino.unitcompiler.compile2(unitcompiler。java:436)在org.codehaus.janino.unitcompiler.access$400(unitcompiler。java:212)在org.codehaus.janino.unitcompiler$2.visitpackagememberclassdeclaration(unitcompiler。java:390)在org.codehaus.janino.unitcompiler$2.visitpackagememberclassdeclaration(unitcompiler。java:385)在org.codehaus.janino.java$packagememberclassdeclaration.accept(java。java:1405)在编译(unitcompiler。java:385)在org.codehaus.janino.unitcompiler.compileunit(unitcompiler。java:357)在org.codehaus.janino.simplecompiler.cook(simplecompiler。java:234)在org.codehaus.janino.simplecompiler.compiletoclassloader(simplecompiler。java:446)在org.codehaus.janino.simplecompiler.cook(simplecompiler。java:213)在org.codehaus.janino.simplecompiler.cook(simplecompiler。java:204)在org.codehaus.commons.compiler.cookable.cook(cookable。java:80)在org.codehaus.commons.compiler.cookable.cook(cookable。java:75)在org.apache.flink.table.codegen.compiler$class.compile(compiler。scala:33) ... 8个以上

ogq8wdun

ogq8wdun1#

田野 nd_key avro对象的 java.lang.CharSequence 并由sql查询处理。
通过呼叫 toAppendStream[org.apache.avro.util.Utf8] ,您请求将查询结果转换为 DataStream[Utf8] . 但是,flink不能自动转换 CharSequenceUtf8 .
试着改变 toAppendStream[org.apache.avro.util.Utf8]toAppendStream[java.lang.CharSequence] .
你用的是哪种flink版本?

相关问题