按多个字段聚合并Map到一个结果

mznpcxlj  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(363)

对于从出票系统流出的数据,我们尝试实现以下目标
获取按状态和客户分组的未结票证数。简化模式如下

Field               | Type                      
-------------------------------------------------
 ROWTIME             | BIGINT           (system) 
 ROWKEY              | VARCHAR(STRING)  (system) 
 ID                  | BIGINT                    
 TICKET_ID           | BIGINT                    
 STATUS              | VARCHAR(STRING)           
 TICKETCATEGORY_ID   | BIGINT                    
 SUBJECT             | VARCHAR(STRING)           
 PRIORITY            | VARCHAR(STRING)           
 STARTTIME           | BIGINT                    
 ENDTIME             | BIGINT                    
 CHANGETIME          | BIGINT                    
 REMINDTIME          | BIGINT                    
 DEADLINE            | INTEGER                   
 CONTACT_ID          | BIGINT

我们希望使用这些数据来获得每个客户具有特定状态(打开、等待、进行中等)的票证数量。这些数据必须在另一个主题中的一条消息中显示-方案可能是这样的

Field               | Type                      
-------------------------------------------------
 ROWTIME             | BIGINT           (system) 
 ROWKEY              | VARCHAR(STRING)  (system) 
 CONTACT_ID          | BIGINT                    
 COUNT_OPEN          | BIGINT                    
 COUNT_WAITING       | BIGINT                    
 COUNT_CLOSED        | BIGINT

我们计划使用此数据和其他数据来丰富客户信息,并将丰富的数据集发布到外部系统(如elasticsearch)
很容易得到第一部分——按客户和状态分组。

select contact_id,status count(*) cnt from tickets group by contact_id,status;

但现在我们陷入了困境——每个客户都有多行/多条消息,我们只是不知道如何将它们转换成一条以联系人id为键的消息。
我们试过了,但都没有结果。
例子
为按客户分组的状态为“waiting”的所有票证创建表

create table waiting_tickets_by_cust with (partitions=12,value_format='AVRO')
as select contact_id, count(*) cnt from tickets where status='waiting' group by contact_id;

为联接重新设置表键

CREATE TABLE T_WAITING_REKEYED with WITH (KAFKA_TOPIC='WAITING_TICKETS_BY_CUST',
       VALUE_FORMAT='AVRO',
       KEY='contact_id');

左(外)将该表与我们的客户表连接起来,使我们所有有票的客户都在等待。

select c.id,w.cnt wcnt from T_WAITING_REKEYED w left join CRM_CONTACTS c on w.contact_id=c.id;

但是我们需要所有的客户,等待计数为空,以便在状态处理中使用另一个带有票的join。因为我们只有等待的客户,所以我们只能得到那些对这两种状态都有价值的客户。

ksql> select c.*,t.cnt from T_PROCESSING_REKEYED t left join cust_ticket_tmp1 c on t.contact_id=c.id;
null | null | null | null | 1
1555261086669 | 1472 | 1472 | 0 | 1
1555261086669 | 1472 | 1472 | 0 | 1
null | null | null | null | 1
1555064371937 | 1474 | 1474 | 1 | 1
null | null | null | null | 1
1555064371937 | 1474 | 1474 | 1 | 1
null | null | null | null | 1
null | null | null | null | 1
null | null | null | null | 1
1555064372018 | 3 | 3 | 5 | 6
1555064372018 | 3 | 3 | 5 | 6

那么正确的方法是什么呢?
这是ksql5.2.1
谢谢您
编辑:
下面是一些示例数据
创建了一个将数据限制为测试帐户的主题

CREATE STREAM tickets_filtered
  WITH (
        PARTITIONS=12,
        VALUE_FORMAT='JSON') AS
  SELECT id,
         contact_id,
subject,
status,

         TIMESTAMPTOSTRING(changetime, 'yyyy-MM-dd HH:mm:ss.SSS') AS timestring
  FROM tickets where contact_id=1472
  PARTITION BY contact_id;

00:06:44 1 $ kafkacat-dev -C -o beginning -t TICKETS_FILTERED
{"ID":2216,"CONTACT_ID":1472,"SUBJECT":"Test Bodenbach","STATUS":"closed","TIMESTRING":"2012-11-08 10:34:30.000"}
{"ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"waiting","TIMESTRING":"2019-04-16 23:07:01.000"}
{"ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"processing","TIMESTRING":"2019-04-16 23:52:08.000"}
Changing and adding something in the ticketing-system...
{"ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"waiting","TIMESTRING":"2019-04-17 00:10:38.000"}
{"ID":8952,"CONTACT_ID":1472,"SUBJECT":"another sync ticket","STATUS":"new","TIMESTRING":"2019-04-17 00:11:23.000"}
{"ID":8952,"CONTACT_ID":1472,"SUBJECT":"another sync ticket","STATUS":"close-request","TIMESTRING":"2019-04-17 00:12:04.000"}

我们希望从这些数据中创建一个主题,其中的消息如下所示

{"CONTACT_ID":1472,"TICKETS_CLOSED":1,"TICKET_WAITING":1,"TICKET_CLOSEREQUEST":1,"TICKET_PROCESSING":0}
mepcadol

mepcadol1#

(也写在这里)
可以通过构建一个表(state)然后在该表上构建一个聚合来实现这一点。
设置测试数据

kafkacat -b localhost -t tickets -P <<EOF
{"ID":2216,"CONTACT_ID":1472,"SUBJECT":"Test Bodenbach","STATUS":"closed","TIMESTRING":"2012-11-08 10:34:30.000"}
{"ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"waiting","TIMESTRING":"2019-04-16 23:07:01.000"}
{"ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"processing","TIMESTRING":"2019-04-16 23:52:08.000"}
{"ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"waiting","TIMESTRING":"2019-04-17 00:10:38.000"}
{"ID":8952,"CONTACT_ID":1472,"SUBJECT":"another sync ticket","STATUS":"new","TIMESTRING":"2019-04-17 00:11:23.000"}
{"ID":8952,"CONTACT_ID":1472,"SUBJECT":"another sync ticket","STATUS":"close-request","TIMESTRING":"2019-04-17 00:12:04.000"}
EOF

预览主题数据

ksql> PRINT 'tickets' FROM BEGINNING;
Format:JSON
{"ROWTIME":1555511270573,"ROWKEY":"null","ID":2216,"CONTACT_ID":1472,"SUBJECT":"Test Bodenbach","STATUS":"closed","TIMESTRING":"2012-11-08 10:34:30.000"}
{"ROWTIME":1555511270573,"ROWKEY":"null","ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"waiting","TIMESTRING":"2019-04-16 23:07:01.000"}
{"ROWTIME":1555511270573,"ROWKEY":"null","ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"processing","TIMESTRING":"2019-04-16 23:52:08.000"}
{"ROWTIME":1555511270573,"ROWKEY":"null","ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"waiting","TIMESTRING":"2019-04-17 00:10:38.000"}
{"ROWTIME":1555511270573,"ROWKEY":"null","ID":8952,"CONTACT_ID":1472,"SUBJECT":"another sync ticket","STATUS":"new","TIMESTRING":"2019-04-17 00:11:23.000"}
{"ROWTIME":1555511270573,"ROWKEY":"null","ID":8952,"CONTACT_ID":1472,"SUBJECT":"another sync ticket","STATUS":"close-request","TIMESTRING":"2019-04-17 00:12:04.000"}

注册流

CREATE STREAM TICKETS (ID INT, 
                      CONTACT_ID VARCHAR, 
                      SUBJECT VARCHAR, 
                      STATUS VARCHAR, 
                      TIMESTRING VARCHAR) 
        WITH (KAFKA_TOPIC='tickets', 
        VALUE_FORMAT='JSON');

查询数据

ksql> SET 'auto.offset.reset' = 'earliest';
ksql> SELECT * FROM TICKETS;
1555502643806 | null | 2216 | 1472 | Test Bodenbach | closed | 2012-11-08 10:34:30.000
1555502643806 | null | 8945 | 1472 | sync-test | waiting | 2019-04-16 23:07:01.000
1555502643806 | null | 8945 | 1472 | sync-test | processing | 2019-04-16 23:52:08.000
1555502643806 | null | 8945 | 1472 | sync-test | waiting | 2019-04-17 00:10:38.000
1555502643806 | null | 8952 | 1472 | another sync ticket | new | 2019-04-17 00:11:23.000
1555502643806 | null | 8952 | 1472 | another sync ticket | close-request | 2019-04-17 00:12:04.000

此时我们可以使用 CASE 要旋转聚合:

SELECT CONTACT_ID, 
      SUM(CASE WHEN STATUS='new' THEN 1 ELSE 0 END) AS TICKETS_NEW, 
      SUM(CASE WHEN STATUS='processing' THEN 1 ELSE 0 END) AS TICKETS_PROCESSING, 
      SUM(CASE WHEN STATUS='waiting' THEN 1 ELSE 0 END) AS TICKETS_WAITING, 
      SUM(CASE WHEN STATUS='close-request' THEN 1 ELSE 0 END) AS TICKETS_CLOSEREQUEST ,
      SUM(CASE WHEN STATUS='closed' THEN 1 ELSE 0 END) AS TICKETS_CLOSED
  FROM TICKETS 
  GROUP BY CONTACT_ID;

  1472 | 1 | 1 | 2 | 1 | 1

但是,你会注意到答案并不像预期的那样。这是因为我们正在计算所有六个输入事件。
我们看一张票,身份证 8945 -这经历了三个状态变化( waiting -> processing -> waiting )每一个都包含在聚合中。我们可以用一个简单的 predicate 来验证这一点:

SELECT CONTACT_ID, 
      SUM(CASE WHEN STATUS='new' THEN 1 ELSE 0 END) AS TICKETS_NEW, 
      SUM(CASE WHEN STATUS='processing' THEN 1 ELSE 0 END) AS TICKETS_PROCESSING, 
      SUM(CASE WHEN STATUS='waiting' THEN 1 ELSE 0 END) AS TICKETS_WAITING, 
      SUM(CASE WHEN STATUS='close-request' THEN 1 ELSE 0 END) AS TICKETS_CLOSEREQUEST ,
      SUM(CASE WHEN STATUS='closed' THEN 1 ELSE 0 END) AS TICKETS_CLOSED
  FROM TICKETS 
  WHERE ID=8945
  GROUP BY CONTACT_ID;

1472 | 0 | 1 | 2 | 0 | 0

我们真正想要的是每张票的当前状态。所以重新划分票证id上的数据:

CREATE STREAM TICKETS_BY_ID AS SELECT * FROM TICKETS PARTITION BY ID;

CREATE TABLE TICKETS_TABLE (ID INT, 
                      CONTACT_ID INT, 
                      SUBJECT VARCHAR, 
                      STATUS VARCHAR, 
                      TIMESTRING VARCHAR) 
        WITH (KAFKA_TOPIC='TICKETS_BY_ID', 
        VALUE_FORMAT='JSON',
        KEY='ID');

比较事件流和当前状态
事件流(ksql流)

ksql> SELECT ID, TIMESTRING, STATUS FROM TICKETS;
2216 | 2012-11-08 10:34:30.000 | closed
8945 | 2019-04-16 23:07:01.000 | waiting
8945 | 2019-04-16 23:52:08.000 | processing
8945 | 2019-04-17 00:10:38.000 | waiting
8952 | 2019-04-17 00:11:23.000 | new
8952 | 2019-04-17 00:12:04.000 | close-request

当前状态(ksql表)

ksql> SELECT ID, TIMESTRING, STATUS FROM TICKETS_TABLE;
2216 | 2012-11-08 10:34:30.000 | closed
8945 | 2019-04-17 00:10:38.000 | waiting
8952 | 2019-04-17 00:12:04.000 | close-request

我们需要运行相同的表的聚合 SUM(CASE…)…GROUP BY 我们在上面所做的技巧,但是基于每张票的当前状态,而不是每个事件:

SELECT CONTACT_ID, 
      SUM(CASE WHEN STATUS='new' THEN 1 ELSE 0 END) AS TICKETS_NEW, 
      SUM(CASE WHEN STATUS='processing' THEN 1 ELSE 0 END) AS TICKETS_PROCESSING, 
      SUM(CASE WHEN STATUS='waiting' THEN 1 ELSE 0 END) AS TICKETS_WAITING, 
      SUM(CASE WHEN STATUS='close-request' THEN 1 ELSE 0 END) AS TICKETS_CLOSEREQUEST ,
      SUM(CASE WHEN STATUS='closed' THEN 1 ELSE 0 END) AS TICKETS_CLOSED
  FROM TICKETS_TABLE 
  GROUP BY CONTACT_ID;

这给了我们想要的:

1472 | 0 | 0 | 1 | 1 | 1

让我们将另一个票证的事件输入到主题中,并观察表的状态是如何变化的。当状态改变时,表中的行被重新发出;您也可以取消 SELECT 然后重新运行它以仅查看当前状态。

自己尝试的示例数据:

{"ID":8946,"CONTACT_ID":42,"SUBJECT":"","STATUS":"new","TIMESTRING":"2019-04-16 23:07:01.000"}
{"ID":8946,"CONTACT_ID":42,"SUBJECT":"","STATUS":"processing","TIMESTRING":"2019-04-16 23:07:01.000"}
{"ID":8946,"CONTACT_ID":42,"SUBJECT":"","STATUS":"waiting","TIMESTRING":"2019-04-16 23:07:01.000"}
{"ID":8946,"CONTACT_ID":42,"SUBJECT":"","STATUS":"processing","TIMESTRING":"2019-04-16 23:07:01.000"}
{"ID":8946,"CONTACT_ID":42,"SUBJECT":"","STATUS":"waiting","TIMESTRING":"2019-04-16 23:07:01.000"}
{"ID":8946,"CONTACT_ID":42,"SUBJECT":"","STATUS":"closed","TIMESTRING":"2019-04-16 23:07:01.000"}
{"ID":8946,"CONTACT_ID":42,"SUBJECT":"","STATUS":"close-request","TIMESTRING":"2019-04-16 23:07:01.000"}

如果您想进一步尝试,您可以使用mockaroo生成一个额外的虚拟数据流,通过管道传输 awk 要减慢速度以便在每条消息到达时可以看到对生成的聚合的影响,请执行以下操作:

while [ 1 -eq 1 ]
  do curl -s "https://api.mockaroo.com/api/f2d6c8a0?count=1000&key=ff7856d0" | \
      awk '{print $0;system("sleep 2");}' | \
      kafkacat -b localhost -t tickets -P
  done

相关问题