我正在从flink表api编写一个查询来检索一条记录。然后检查是否找到记录,如果找到,则获取每个记录列值的字符串值。
即
users:
|id | name | phone |
|---|------|-------|
| 01| sam | 23354 |
| 02| jake | 23352 |
| 03| kim | 23351 |
问题是flink只从查询返回表,所以我无法1:检查是否找到记录,2:获取找到的记录值的各个值
sudo代码:
foundRecord = find record by phone
if foundRecord {
create new instance of Visitor
Visitor.name = foundRecord.name
Visitor.id = foundRecord.id
} else {
throw exception
}
flink docs推荐的下面的代码给了我一个表,但不确定如何实现上面的sudo代码,因为它是作为另一个表返回的,我需要实际的记录值。
Table users = registeredUsers.select("id, name, phone").where("phone === '23354'"));
参考文件:https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableapi.html#expression-语法
1条答案
按热度按时间5gfr0r5j1#
为了知道找不到匹配的记录,输入必须是有界的——因此我们将使用
BatchTableEnvironment
,而不是StreamTableEnvironment
. (通过流式输入,匹配的记录可能最终到达并满足查询。只有批量输入才能证明没有匹配。)我用来检测结果集为空的方法感觉有点像黑客,但我想不出更好的方法了。请注意
print()
最后是必要的,尽管没有什么可打印的,因为任何没有最终输入到接收器的计算都会被优化掉,而不会执行。