leftouterjoin抛出tableexception:不支持的联接类型'left'

w46czmvw  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(374)

我试图在两个表上运行左外联接,并将结果转换为数据流。
我在使用flink之前所做的所有连接都是内部连接,并且我总是用一个 .toRetractStream[MyCaseClass](someQueryConfig) . 然而,由于左连接引入了空值,我从flink文档中了解到,我不能再使用case类了,因为它们在将表转换为数据流时不支持空值。
所以,我试着用pojo来完成这个。这是我的密码:

class EnrichedTaskUpdateJoin(val enrichedTaskId: String, val enrichedTaskJobId: String, val enrichedTaskJobDate: String, val enrichedTaskJobMetadata: Json, val enrichedTaskStartedAt: String, val enrichedTaskTaskMetadata: Json, val taskUpdateMetadata: Json = Json.Null) {}

val qConfig = tableEnv.queryConfig
qConfig.withIdleStateRetentionTime(IDLE_STATE_RETENTION_TIME)

val updatedTasksUpsertTable = enrichedTasksUpsertTable
  .leftOuterJoin(taskUpdatesUpsertTable, 'enrichedTaskId === 'taskUpdateId)
  .select(
    'enrichedTaskId,
    'enrichedTaskJobId,
    'enrichedTaskJobDate,
    'enrichedTaskJobMetadata,
    'enrichedTaskStartedAt,
    'enrichedTaskTaskMetadata,
    'taskUpdateMetadata
  )

val updatedEnrichedTasksStream: KeyedStream[String, String] = updatedTasksUpsertTable
  .toAppendStream[EnrichedTaskUpdateJoin](qConfig)
  .map(toEnrichedTask(_))
  .map(encodeTask(_))
  .keyBy(x => parse(x).getOrElse(Json.Null).hcursor.get[String]("id").getOrElse(""))

这个编译得很好,但是当我试着运行它时 org.apache.flink.table.api.TableException: Unsupported join type 'LEFT'. Currently only non-window inner joins with at least one equality predicate are supported . 但是,根据这些文档,似乎我应该能够运行左连接。同样值得注意的是,错误是从 .toAppendStream[EnrichedTaskUpdateJoin](qConfig) . 我想也许 non-window 部分错误意味着空闲状态保留时间有问题,因此我删除了查询配置,但得到了相同的错误。
希望这有足够的背景,但如果我需要补充什么,请让我知道。另外,我正在运行flink1.5-snapshot和circe进行json解析。我对scala也很陌生,所以这很可能只是一些愚蠢的语法错误。

nkcskrwz

nkcskrwz1#

flink 1.5-snapshot不支持非窗口外部联接。正如您在发布的链接中看到的,在“outer joins”旁边没有“streaming”标记。1.5中支持时间窗口连接(对时间属性起作用)。
flink 1.6将提供 LEFT , RIGHT ,和 FULL 外部连接(另见flink-5878)。
顺便说一句,确保 EnrichedTaskUpdateJoin 实际上是一个pojo,因为pojo需要一个默认构造函数 var 而不是 val .

相关问题