如何在ksqldb中构建输出主题上的嵌套json消息

vsaztqbk  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(360)

我从一个源系统接收到以下事件负载
为下面的json负载创建了stream1
事件json 1

{
 "event": {
  "header": {
   "name":"abc",
   "version":"1.0",
   "producer":"123",
   "channel":"lab",
   "countryCode":"US"
  },
  "body":{"customerIdentifiers":[
   {"customerIdentifier":"1234","customerIdType":"cc"},
   {"customerIdentifier":"234","customerIdType":"id"}
  ],
  "accountIdentifiers":[
   {"accountIdentifier":"123","accountIdType":"no"},
   {"accountIdentifier":"Primary","accountIdType":"da"}
  ],
  "eventDetails":{
   "offeramount":"40000",
   "apr":"2.6%",
   "minpayment":"400",
   "status":"Approved"
  }
 }
}

事件json 2

{
 "event": {
  "header": {
   "name":"abc",
   "version":"1.0",
   "producer":"123",
   "channel":"lab",
   "countryCode":"US"
  },
  "body":{"customerIdentifiers":[
   {"customerIdentifier":"1234","customerIdType":"cc"},
   {"customerIdentifier":"234","customerIdType":"id"}
  ],
  "accountIdentifiers":[
   {"accountIdentifier":"123","accountIdType":"no"},
   {"accountIdentifier":"Primary","accountIdType":"da"}
  ],
  "eventDetails":{
   "offeramount":"70000",
   "apr":"3.6%",
   "minpayment":"600",
   "status":"Rejected"
  }
 }
}

我已经在上面的stream1上创建了聚合表

CREATE TABLE EVENT_TABLE AS 
  SELECT 
    avg(minpayment) as Avg_MinPayment, 
    avg(apr) AS Avg_APr, 
    avg(offeramount) AS Avgofferamount , 
    status 
  FROM STREAM1 
  GROUP BY status 
  EMIT CHANGES;
Status | Avg_MinPayment | Avg_APr | Avgofferamount 
-----------------------------------------
Approved | 400 | 2.6% | 40000

Rejected | 600 | 3.6% | 70000

我从ktable得到上面的结果,ktable主题json如下
聚合json1
打印“事件表”;

{
  "Status" : "Approved", 
  "Avg_Minpayment" : "400", 
  "Avg_APr" : "2.6%", 
  "offeramount" : "40000"
}

骨料json2

{
  "Status" : "Rejected", 
  "Avg_Minpayment" : "600", 
  "Avg_APr" : "3.6%", 
  "offeramount" : "70000"
}

但我必须在输出主题上构建并发布最终的目标json,如下面的json格式。我必须将头和主体添加到聚合json1和聚合json2中。

{
 "event":{
  "header":{
   "name":"abc",
   "version":"1.0",
   "producer":"123",
   "channel":"lab",
   "countryCode":"US"
  },
  "body":{
   "Key":[
{"Status":"approved","Avg_Minpayment":"400","Avg_APr":"2.6%","offeramount":"40000"},
{"Status":"rejected","Avg_Minpayment":"600","Avg_APr":"3.6%","offeramount":"70000"}
  ]
 }
}
91zkwejq

91zkwejq1#

考虑到示例sql不会产生示例输出,而给定了示例输入,所以还不太清楚您要实现什么。事实上,您的示例sql将失败,并出现未知的列错误。
类似于以下内容的内容将生成示例输出:

CREATE TABLE EVENT_TABLE AS 
  SELECT 
    status,
    avg(eventDetails->minpayment) as Avg_MinPayment, 
    avg(eventDetails->apr) AS Avg_APr, 
    avg(eventDetails->offeramount) AS Avgofferamount
  FROM STREAM1 
  GROUP BY status 
  EMIT CHANGES;

接下来,您的示例输出。。。

Status | Avg_MinPayment | Avg_APr | Avgofferamount 
-----------------------------------------
Approved | 400 | 2.6% | 40000

Rejected | 600 | 3.6% | 70000

…正在为每个状态输出一行。然而,你所说的你想要达到的结果。。。

{
 "event":{
  "header":{
   "name":"abc",
   "version":"1.0",
   "producer":"123",
   "channel":"lab",
   "countryCode":"US"
  },
  "body":{
   "Key":[
{"Status":"approved","Avg_Minpayment":"400","Avg_APr":"2.6%","offeramount":"40000"},
{"Status":"rejected","Avg_Minpayment":"600","Avg_APr":"3.6%","offeramount":"70000"}
  ]
 }
}

…包含两种状态,即将两个示例输入消息合并到一个输出中。
如果我对您的理解正确,并且您确实希望输出上述json,那么:
您首先需要包括 event 信息。但哪些事件信息?如果你知道它们总是一样的,那么你可以使用:

CREATE TABLE EVENT_TABLE AS 
  SELECT 
    status,
    latest_by_offset(event) as event,
    avg(eventDetails->minpayment) as Avg_MinPayment, 
    avg(eventDetails->apr) AS Avg_APr, 
    avg(eventDetails->offeramount) AS Avgofferamount
  FROM STREAM1 
  GROUP BY status 
  EMIT CHANGES;

这个 latest_by_offset 聚合函数将捕获 event 它看到的最后一条信息。虽然我不相信这是你想要的。你能不能别想了 rejected 以及 accepted 具有不同 event 信息?如果它是 event 标识哪些消息应分组在一起的信息,然后类似的内容可能会提供接近您所需的信息:

CREATE TABLE EVENT_TABLE AS 
  SELECT 
    event,
    collect_list(eventDetails) as body
  FROM STREAM1 
  GROUP BY event 
  EMIT CHANGES;

如果这很接近,那么您可能需要使用 STRUCT 建造师和 AS_VALUE 函数来重新构造输出。例如:

CREATE TABLE EVENT_TABLE AS 
  SELECT 
    event as key,
    AS_VALUE(event) as event,
    STRUCT(
      keys := collect_list(eventDetails)
    ) as body
  FROM STREAM1 
  GROUP BY event 
  EMIT CHANGES;

相关问题