我试图在两个表上运行左外联接,并将结果转换为数据流。
我在使用flink之前所做的所有连接都是内部连接,并且我总是用一个 .toRetractStream[MyCaseClass](someQueryConfig)
. 然而,由于左连接引入了空值,我从flink文档中了解到,我不能再使用case类了,因为它们在将表转换为数据流时不支持空值。
所以,我试着用pojo来完成这个。这是我的密码:
class EnrichedTaskUpdateJoin(val enrichedTaskId: String, val enrichedTaskJobId: String, val enrichedTaskJobDate: String, val enrichedTaskJobMetadata: Json, val enrichedTaskStartedAt: String, val enrichedTaskTaskMetadata: Json, val taskUpdateMetadata: Json = Json.Null) {}
val qConfig = tableEnv.queryConfig
qConfig.withIdleStateRetentionTime(IDLE_STATE_RETENTION_TIME)
val updatedTasksUpsertTable = enrichedTasksUpsertTable
.leftOuterJoin(taskUpdatesUpsertTable, 'enrichedTaskId === 'taskUpdateId)
.select(
'enrichedTaskId,
'enrichedTaskJobId,
'enrichedTaskJobDate,
'enrichedTaskJobMetadata,
'enrichedTaskStartedAt,
'enrichedTaskTaskMetadata,
'taskUpdateMetadata
)
val updatedEnrichedTasksStream: KeyedStream[String, String] = updatedTasksUpsertTable
.toAppendStream[EnrichedTaskUpdateJoin](qConfig)
.map(toEnrichedTask(_))
.map(encodeTask(_))
.keyBy(x => parse(x).getOrElse(Json.Null).hcursor.get[String]("id").getOrElse(""))
这个编译得很好,但是当我试着运行它时 org.apache.flink.table.api.TableException: Unsupported join type 'LEFT'. Currently only non-window inner joins with at least one equality predicate are supported
. 但是,根据这些文档,似乎我应该能够运行左连接。同样值得注意的是,错误是从 .toAppendStream[EnrichedTaskUpdateJoin](qConfig)
. 我想也许 non-window
部分错误意味着空闲状态保留时间有问题,因此我删除了查询配置,但得到了相同的错误。
希望这有足够的背景,但如果我需要补充什么,请让我知道。另外,我正在运行flink1.5-snapshot和circe进行json解析。我对scala也很陌生,所以这很可能只是一些愚蠢的语法错误。
1条答案
按热度按时间nkcskrwz1#
flink 1.5-snapshot不支持非窗口外部联接。正如您在发布的链接中看到的,在“outer joins”旁边没有“streaming”标记。1.5中支持时间窗口连接(对时间属性起作用)。
flink 1.6将提供
LEFT
,RIGHT
,和FULL
外部连接(另见flink-5878)。顺便说一句,确保
EnrichedTaskUpdateJoin
实际上是一个pojo,因为pojo需要一个默认构造函数var
而不是val
.