流-1:
[KSTREAM-SOURCE-0000000000]: null, {"id": 1, "name": "john", "age": 26, "updated_at": 1525774480752}
[KSTREAM-SOURCE-0000000000]: null, {"id": 2, "name": "jane", "age": 24, "updated_at": 1525774480784}
[KSTREAM-SOURCE-0000000000]: null, {"id": 3, "name": "julia", "age": 25, "updated_at": 1525774480827}
[KSTREAM-SOURCE-0000000000]: null, {"id": 4, "name": "jamie", "age": 22, "updated_at": 1525774480875}
[KSTREAM-SOURCE-0000000000]: null, {"id": 5, "name": "jenny", "age": 27, "updated_at": 1525774482927}
[KSTREAM-SOURCE-0000000000]: null, {"id": 6, "name": "kishore", "age": 27, "updated_at": 1525775063908}
[KSTREAM-SOURCE-0000000000]: null, {"id": 7, "name": "purna", "age": 27, "updated_at": 1525775072006}
[KSTREAM-SOURCE-0000000000]: null, {"id": 8, "name": "xxx", "age": 10, "updated_at": 1525783464123}
[KSTREAM-SOURCE-0000000000]: null, {"id": 9, "name": "yyy", "age": 10, "updated_at": 1525783667644}
[KSTREAM-SOURCE-0000000000]: null, {"id": 10, "name": "zzz", "age": 10, "updated_at": 1525783741814}
流-2:
[KSTREAM-SOURCE-0000000002]: null, {"id": 1, "name": "d", "age": 67}
[KSTREAM-SOURCE-0000000002]: null, {"id": 2, "name": "e", "age": 78}
[KSTREAM-SOURCE-0000000002]: null, {"id": 12, "name": "d", "age": 67}
[KSTREAM-SOURCE-0000000002]: null, {"id": 21, "name": "e", "age": 78}
现在我想对这两个流执行join操作,并且只想检索stream-1中不在stream-2中的行。我的输入流数据是avro格式的
预期产量:
[KSTREAM-SOURCE-0000000000]: null, {"id": 3, "name": "julia", "age": 25, "updated_at": 1525774480827}
[KSTREAM-SOURCE-0000000000]: null, {"id": 4, "name": "jamie", "age": 22, "updated_at": 1525774480875}
[KSTREAM-SOURCE-0000000000]: null, {"id": 5, "name": "jenny", "age": 27, "updated_at": 1525774482927}
[KSTREAM-SOURCE-0000000000]: null, {"id": 6, "name": "kishore", "age": 27, "updated_at": 1525775063908}
[KSTREAM-SOURCE-0000000000]: null, {"id": 7, "name": "purna", "age": 27, "updated_at": 1525775072006}
[KSTREAM-SOURCE-0000000000]: null, {"id": 8, "name": "xxx", "age": 10, "updated_at": 1525783464123}
[KSTREAM-SOURCE-0000000000]: null, {"id": 9, "name": "yyy", "age": 10, "updated_at": 1525783667644}
[KSTREAM-SOURCE-0000000000]: null, {"id": 10, "name": "zzz", "age": 10, "updated_at": 1525783741814}
那么我应该执行哪个连接操作,以及如何实现预期的输出呢?有人能帮我实现这个目标吗
1条答案
按热度按时间l3zydbqr1#
如果您看一下这里的文档:kafka streams join semantics,您可能会使用左连接,在stream2中的值被设置时,只在值joiner中返回null。
一些伪代码:
免责声明:我没有测试这个。