我对使用Jolt转换JSON还很陌生,在这一点上我卡住了。我有一个来自Kafka的初始JSON,我想使用JoltSpec转换它,以获得所需的最终JSON。
初始JSON:
{
"delta": {
"base_price": 10094,
"base_price_with_vat": 12819.38,
"sale_price": 10094,
"sale_price_with_vat": 12819.38
},
"microtime": 1695024729.857733,
"offer_id": 83466561,
"platform_id": 3,
"vendor_id": 54619,
"occurred_on": {
"date": "2023-09-18 10:12:09.857738",
"timezone_type": 3,
"timezone": "Europe/Budapest"
}
}
我的JoltSpec:
[
{
"operation": "shift",
"spec": {
"delta": {
"*": {
"@(1,&)": "delta\\.&"
}
},
"microtime": "microtime",
"offer_id": "offer_id",
"platform_id": "platform_id",
"vendor_id": "vendor_id",
"occurred_on": {
"*": {
"@(1,&)": "occurred_on\\.&"
}
}
}
},
{
"operation": "modify-overwrite-beta",
"spec": {
"delta*": "=toString"
}
},
{
"operation": "shift",
"spec": {
"offer_id": "offer_id",
"microtime": "microtime",
"platform_id": "platform_id",
"vendor_id": "vendor_id",
"delta.*": "&(0,1)",
"occurred_on.*": "&(0,1)"
}
},
{
"operation": "shift",
"spec": {
"offer_id": "offer_id",
"platform_id": "platform_id",
"microtime": "microtime",
"date": "occurred_on_date",
"vendor_id": "vendor_id",
"timezone_type": "timezone_type",
"timezone": "timezone",
"*": {
"$": "&1.column_name",
"@": "&1.column_value"
}
}
},
{
"operation": "shift",
"spec": {
"offer_id": "offer_id",
"platform_id": "platform_id",
"microtime": "microtime",
"vendor_id": "vendor_id",
"occurred_on_date": "occurred_on_date",
"timezone_type": "timezone_type",
"timezone": "timezone",
"*": {
"@": "records.[]"
}
}
},
{
"operation": "shift",
"spec": {
"records": {
"*": {
"column_name": "column_name",
"column_value": "column_value"
}
},
"platform_id": "platform_id",
"vendor_id": "vendor_id",
"offer_id": "offer_id",
"occurred_on_date": "occurred_on_date",
"timezone_type": "timezone_type",
"timezone": "timezone"
}
},
{
"operation": "cardinality",
"spec": {
"column_name": "ONE",
"column_value": "ONE"
}
},
{
"operation": "modify-overwrite-beta",
"spec": {
"platform_id": "=toString",
"country_id": "=toString",
"timezone_type": "=toString",
"vendor_id": "=toString",
"offer_id": "=toString"
}
},
{
"operation": "shift",
"spec": {
"column_name": "column_name",
"column_value": "column_value",
"platform_id": "platform_id",
"country_id": "country_id",
"timezone_type": "timezone_type",
"timezone": "timezone",
"occurred_on_date": "occurred_on_date",
"vendor_id": "vendor_id",
"offer_id": "offer_id"
}
},
{
"operation": "modify-overwrite-beta",
"spec": {
"kafka_offset": "${kafka.offset}",
"unique_id": "${uuid}",
"insert_datetime": "${now():format('yyyy-MM-dd HH:mm:ss.SSS')}",
"insert_date": "${now():format('yyyy-MM-dd')}"
}
}
]
以上规格的输出:
{
"column_name" : "base_price",
"column_value" : "10094",
"platform_id" : "3",
"timezone_type" : "3",
"timezone" : "Europe/Budapest",
"occurred_on_date" : "2023-09-18 10:12:09.857738",
"vendor_id" : "54619",
"offer_id" : "83466561",
"kafka_offset" : "${kafka.offset}",
"unique_id" : "${uuid}",
"insert_datetime" : "${now():format('yyyy-MM-dd HH:mm:ss.SSS')}",
"insert_date" : "${now():format('yyyy-MM-dd')}"
}
但是,我需要有4个不同的JSON,其中column_name和column_value需要从初始JSON中获取所有值(base_price 和 10094,base_price_with_vat 和 12819.38,* 销售_price* 和 10094,* 销售_price_with_vat* 和 12819.38)。
我希望我解释得足够清楚,让自己明白我需要什么。
最好的问候,安德烈
1条答案
按热度按时间unguejic1#
请查看以下详细信息,以确认它们是否符合您的期望。
震动规格:
output.json