大家好,我从nifi的一个Kafka主题中得到了一条信息流,我正在通过一个消费过程来阅读。消息的格式为json(伪json值,json格式与原始格式相同):
{ "schema": {
"type": "struct",
"name": "emp_table",
"fields": [
{
"field": "emp_id",
"type": "string"
},
{
"field": "emp_name",
"type": "String"
},
{
"field": "city",
"type": "string"
},
{
"field": "emp_sal",
"type": "string"
},
{
"field": "manager_name",
"type": "string"
}
] }, "payload": {
"emp_id": "1",
"emp_name": "abc",
"city": "NYK",
"emp_sal": "100000",
"manager_name": "xyz" } }
正如您在这里看到的,实际的表名在schema下,列值在payload下。我能够在nifi中使用evaluatejsonpath和puthbasejson处理器解析列值并将其放入hbase表中。
我能够实现的是手动放置表名和rowid。但我的问题是我想从json中获取tablename(在上面的示例emp\u table中)和rowid(在上面的示例emp\u id中),并在运行时将这些值与列值一起提供给nifi中的puthbasejson处理器。
1条答案
按热度按时间bvpmtnay1#
您应该能够向evaluatejsonpath添加另一个json路径表达式,如下所示:
然后在puthbasejson中,使表名${table},或在evaluatejsonpath中命名的任何名称。