ksql表聚合为单个记录更新从所有记录发出消息

kokeuurv  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(229)

我一直试图使用kafka/ksqldb编写一个相当简单的用例,但运气不好。  我已经更新了这个例子来讨论一个更通用的用例。我所拥有的是 pagestats 在源数据库(本例中是mongodb)中更新给定页面的统计数据。数据包括:
页面名称
击打
状态
我的要求很简单。我需要的是一个按状态分组的所有页面的总和(命中)的聚合流,作为一个连续更新我的目标(在本例中是内存缓存)的流。
我有一个mongodb kafka连接器,它使用集合上的changestreams将来自mongodb的数据流传输到kafka主题中。
创建连接器:

curl -X PUT http://praveensrao:8083/connectors/pagestats001/config \
     -H "Content-Type: application/json" -d '{
         "connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
         "publish.full.document.only":"true",
         "tasks.max":"1",
         "change.stream.full.document":"default",
         "collection":"pagestats001",
         "key.converter.schemas.enable":"true",
         "database":"test",
         "name":"pagestats001",
         "connection.uri":"mongodb://***:***@****:27017/test?replicaSet=****authSource=admin&ssl=true",
         "value.converter.schemas.enable":"false",
         "copy.existing":"false",
         "value.converter":"org.apache.kafka.connect.storage.StringConverter",
         "key.converter":"org.apache.kafka.connect.storage.StringConverter",
         "errors.tolerance": "all",
         "max.message.bytes": "16777216"
    }'

  我将文档插入mongodb集合 pagestats001 ,我们有Kafka主题的信息 test.pagestats001 这种方式: 

ksql> print "test.pagestats001" from beginning;
Key format: JSON or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2020/11/16 03:33:29.780 Z, key: {"_id": {"_data": {"$binary": "gl+x8wgAAAADRjxfaWQAPDZkNmQ3YTFhLWU0NGQtNDI2Yy1iMzMzLTAwYjkzNDY0ZDQ0NAAAWhAEyYMCANg0R8K7k9TJ3Qfd3gQ=", "$type": "00"}}}, value: {"_id": "6d6d7a1a-e44d-426c-b333-00b93464d444", "pagename": "page-1", "state": "ON", "hits": 5}
rowtime: 2020/11/16 03:34:43.297 Z, key: {"_id": {"_data": {"$binary": "gl+x81EAAAAIRjxfaWQAPGVhZGNjMjMxLTNlZmQtNGMzNy05ZGEzLWQ4MGY3ZTgzOGUzNwAAWhAEyYMCANg0R8K7k9TJ3Qfd3gQ=", "$type": "00"}}}, value: {"_id": "eadcc231-3efd-4c37-9da3-d80f7e838e37", "pagename": "page-2", "state": "ON", "hits": 8}
rowtime: 2020/11/16 03:35:07.565 Z, key: {"_id": {"_data": {"$binary": "gl+x82UAAAAwRjxfaWQAPGJhNTE4YTQ5LWVhODEtNDY1Zi04ZWY3LWI3ZDZkMzk4ZmYyYwAAWhAEyYMCANg0R8K7k9TJ3Qfd3gQ=", "$type": "00"}}}, value: {"_id": "ba518a49-ea81-465f-8ef7-b7d6d398ff2c", "pagename": "page-1", "state": "SK", "hits": 2}
rowtime: 2020/11/16 03:35:13.709 Z, key: {"_id": {"_data": {"$binary": "gl+x824AAAABRjxfaWQAPDgwOWZkZDNlLTkxMmQtNDcwNC05NmE0LTIyMWE5OWI0ZTkzZQAAWhAEyYMCANg0R8K7k9TJ3Qfd3gQ=", "$type": "00"}}}, value: {"_id": "809fdd3e-912d-4704-96a4-221a99b4e93e", "pagename": "page-2", "state": "SK", "hits": 7}

  由于密钥不可用于ksqldb表,因此基于一些联机文档执行此操作以根据值按id重新分区,而不是使用密钥: 

CREATE STREAM test_pagestats001_schema (
    _id VARCHAR,
    pagename VARCHAR,
    state VARCHAR,
    hits BIGINT
  ) WITH (
     KAFKA_TOPIC='test.pagestats001', 
     VALUE_FORMAT='JSON'
  );
 
 Message        
----------------
 Stream created 
----------------
 
CREATE STREAM test_pagestats001_stream
  WITH (KAFKA_TOPIC='test.pagestats001.byid') AS
  SELECT 
    _id,
    pagename,
    state,
    hits
  FROM test_pagestats001_schema
  PARTITION BY _id
  EMIT CHANGES;
 
 Message                                                
--------------------------------------------------------
 Created query with ID CSAS_TEST_PAGESTATS001_STREAM_77 
--------------------------------------------------------
 
CREATE TABLE test_pagestats001_byid (
     _id VARCHAR PRIMARY KEY, 
    pagename VARCHAR,
    state VARCHAR,
    hits BIGINT
   ) WITH (
     KAFKA_TOPIC='test.pagestats001.byid',
     VALUE_FORMAT='JSON'
  );
 
 Message       
---------------
 Table created 
---------------
 
CREATE TABLE test_pagestats001_bystate AS
SELECT
    state,
    sum(hits) as hits
FROM test_pagestats001_byid
GROUP BY state
emit changes;
 
 Message                                                 
---------------------------------------------------------
 Created query with ID CTAS_TEST_PAGESTATS001_BYSTATE_81 
---------------------------------------------------------

  现在,我运行下面的 

ksql> select * from test_pagestats001_bystate emit changes;

    假设我在数据库中插入了一条记录,记录源主题中的一个州/省“ab” test.pagestats001 -我们有:

rowtime: 2020/11/16 04:07:48.290 Z, key: {"_id": {"_data": {"$binary": "gl+x+xAAAAABRjxfaWQAPDYxNzg0OGVlLTUxNDctNDczZS04MjU3LTE0NzA5ZDg2YzZjOQAAWhAEyYMCANg0R8K7k9TJ3Qfd3gQ=", "$type": "00"}}}, value: {"_id": "617848ee-5147-473e-8257-14709d86c6c9", "pagename": "page-1", "state": "AB", "hits": 2}

  在table上 test_pagestats001_bystate ,我们有:

ksql> select * from test_pagestats001_bystate emit changes;
+-------------------------+-----------------------+
|STATE                    |HITS                   |
+-------------------------+-----------------------+
|AB                       |2                      |

    现在,如果我在数据库中为“page-2”插入一条与hits(1)状态相同的记录,将发出以下事件: 

|AB                       |3                      |

  到现在为止还不错。  但现在问题来了。假设我更新了刚才作为hits(8)插入的同一条记录。然后我得到两个事件: 

|AB                       |2                      |
|AB                       |10                     |

  而不是表示(10)的聚合值的单个事件。换句话说,我并不期望事件的聚合值为(2)。  对我来说,这似乎是一个基本的用例。我是不是做错了什么?

ksql> version
Version: 6.0.0

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题