我从一个源系统接收到以下事件负载
为下面的json负载创建了stream1
事件json 1
{
"event": {
"header": {
"name":"abc",
"version":"1.0",
"producer":"123",
"channel":"lab",
"countryCode":"US"
},
"body":{"customerIdentifiers":[
{"customerIdentifier":"1234","customerIdType":"cc"},
{"customerIdentifier":"234","customerIdType":"id"}
],
"accountIdentifiers":[
{"accountIdentifier":"123","accountIdType":"no"},
{"accountIdentifier":"Primary","accountIdType":"da"}
],
"eventDetails":{
"offeramount":"40000",
"apr":"2.6%",
"minpayment":"400",
"status":"Approved"
}
}
}
事件json 2
{
"event": {
"header": {
"name":"abc",
"version":"1.0",
"producer":"123",
"channel":"lab",
"countryCode":"US"
},
"body":{"customerIdentifiers":[
{"customerIdentifier":"1234","customerIdType":"cc"},
{"customerIdentifier":"234","customerIdType":"id"}
],
"accountIdentifiers":[
{"accountIdentifier":"123","accountIdType":"no"},
{"accountIdentifier":"Primary","accountIdType":"da"}
],
"eventDetails":{
"offeramount":"70000",
"apr":"3.6%",
"minpayment":"600",
"status":"Rejected"
}
}
}
我已经在上面的stream1上创建了聚合表
CREATE TABLE EVENT_TABLE AS
SELECT
avg(minpayment) as Avg_MinPayment,
avg(apr) AS Avg_APr,
avg(offeramount) AS Avgofferamount ,
status
FROM STREAM1
GROUP BY status
EMIT CHANGES;
Status | Avg_MinPayment | Avg_APr | Avgofferamount
-----------------------------------------
Approved | 400 | 2.6% | 40000
Rejected | 600 | 3.6% | 70000
我从ktable得到上面的结果,ktable主题json如下
聚合json1
打印“事件表”;
{
"Status" : "Approved",
"Avg_Minpayment" : "400",
"Avg_APr" : "2.6%",
"offeramount" : "40000"
}
骨料json2
{
"Status" : "Rejected",
"Avg_Minpayment" : "600",
"Avg_APr" : "3.6%",
"offeramount" : "70000"
}
但我必须在输出主题上构建并发布最终的目标json,如下面的json格式。我必须将头和主体添加到聚合json1和聚合json2中。
{
"event":{
"header":{
"name":"abc",
"version":"1.0",
"producer":"123",
"channel":"lab",
"countryCode":"US"
},
"body":{
"Key":[
{"Status":"approved","Avg_Minpayment":"400","Avg_APr":"2.6%","offeramount":"40000"},
{"Status":"rejected","Avg_Minpayment":"600","Avg_APr":"3.6%","offeramount":"70000"}
]
}
}
1条答案
按热度按时间91zkwejq1#
考虑到示例sql不会产生示例输出,而给定了示例输入,所以还不太清楚您要实现什么。事实上,您的示例sql将失败,并出现未知的列错误。
类似于以下内容的内容将生成示例输出:
接下来,您的示例输出。。。
…正在为每个状态输出一行。然而,你所说的你想要达到的结果。。。
…包含两种状态,即将两个示例输入消息合并到一个输出中。
如果我对您的理解正确,并且您确实希望输出上述json,那么:
您首先需要包括
event
信息。但哪些事件信息?如果你知道它们总是一样的,那么你可以使用:这个
latest_by_offset
聚合函数将捕获event
它看到的最后一条信息。虽然我不相信这是你想要的。你能不能别想了rejected
以及accepted
具有不同event
信息?如果它是event
标识哪些消息应分组在一起的信息,然后类似的内容可能会提供接近您所需的信息:如果这很接近,那么您可能需要使用
STRUCT
建造师和AS_VALUE
函数来重新构造输出。例如: