我正在寻找一个使用kafka streams的例子来说明如何做这类事情,即将customers表与addresses表连接起来,并将数据接收到es:-
客户
+------+------------+----------------+-----------------------+
| id | first_name | last_name | email |
+------+------------+----------------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Davidson | ed@walker.com |
| 1004 | Anne | Kim | annek@noanswer.org |
+------+------------+----------------+-----------------------+
地址
+----+-------------+---------------------------+------------+--------------+-------+----------+
| id | customer_id | street | city | state | zip | type |
+----+-------------+---------------------------+------------+--------------+-------+----------+
| 10 | 1001 | 3183 Moore Avenue | Euless | Texas | 76036 | SHIPPING |
| 11 | 1001 | 2389 Hidden Valley Road | Harrisburg | Pennsylvania | 17116 | BILLING |
| 12 | 1002 | 281 Riverside Drive | Augusta | Georgia | 30901 | BILLING |
| 13 | 1003 | 3787 Brownton Road | Columbus | Mississippi | 39701 | SHIPPING |
| 14 | 1003 | 2458 Lost Creek Road | Bethlehem | Pennsylvania | 18018 | SHIPPING |
| 15 | 1003 | 4800 Simpson Square | Hillsdale | Oklahoma | 73743 | BILLING |
| 16 | 1004 | 1289 University Hill Road | Canehill | Arkansas | 72717 | LIVING |
+----+-------------+---------------------------+------------+--------------+-------+----------+
输出elasticsearch索引
"hits": [
{
"_index": "customers_with_addresses",
"_type": "_doc",
"_id": "1",
"_score": 1.3278645,
"_source": {
"first_name": "Sally",
"last_name": "Thomas",
"email": "sally.thomas@acme.com",
"addresses": [{
"street": "3183 Moore Avenue",
"city": "Euless",
"state": "Texas",
"zip": "76036",
"type": "SHIPPING"
}, {
"street": "2389 Hidden Valley Road",
"city": "Harrisburg",
"state": "Pennsylvania",
"zip": "17116",
"type": "BILLING"
}],
}
}, ….
表数据来自debezium主题,我是否正确地认为我需要一些java在中间连接流,将其输出到一个新主题,然后将其放入es?
有没有人有这样的代码?
谢谢。
3条答案
按热度按时间ktecyv1j1#
不久前,我们在debezium博客上构建了一个关于这个用例的演示和博客文章(流聚合到elasticsearch)。
需要记住的一个问题是,这个解决方案(基于kafka流,但我认为对于ksql也是如此)容易暴露中间连接结果。e、 g.假设您在一个事务中插入一个客户和10个地址。流连接方法可能首先生成客户及其前5个地址的聚合,然后不久生成包含所有10个地址的完整聚合。对于您的特定用例,这可能是或者可能不是所需要的。我还记得,处理删除并不是一件小事(例如,如果您删除了10个地址中的一个,那么您必须再次生成聚合,其中剩下的9个地址可能未被触及)。
另一种可以考虑的模式是发件箱模式,在发件箱模式中,基本上生成一个显式事件,其中预计算的事件从应用程序本身聚合而来。i、 它需要应用程序的一点帮助,但是它避免了事后产生连接结果的微妙之处。
x0fgdtte2#
根据您对在一个客户节点中嵌套多个地址的要求有多严格,您可以在ksql(构建在kafka流之上)中实现这一点。
将一些测试数据填充到kafka中(在您的情况下,这已经通过debezium完成了):
启动ksql,首先检查数据:
现在我们宣布
STREAM
(Kafka主题+模式),以便我们可以进一步操作它:我们要模拟
customers
作为一个TABLE
,要做到这一点,kafka消息需要正确设置密钥(从"ROWKEY":"null"
在PRINT
以上输出)。您可以将debezium配置为设置消息密钥,以便在ksql中不必执行此步骤:现在我们宣布
TABLE
(给定键的状态,从kafka主题+模式示例化):现在我们可以加入数据:
这将创建一个新的ksql流,该流反过来填充一个新的kafka主题。
流具有架构:
我们可以查询流:
我们还可以使用kafka connect将其流式传输到elasticsearch:
结果:
lokaqttq3#
是的,您可以通过以下方式在java中使用kafka streams api实现该解决方案。
将主题作为流使用。
使用customer id聚合列表中的地址流,并将其转换为表。
使用地址表连接客户流
以下是示例(考虑到数据是以json格式使用的):
您可以在此处查看有关所有类型联接的详细信息:
https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#joining