我对apache nifi是新手
我们创建nifi流,它使用来自kafka的json数据,结果在丰富后被发送到另一个kafka主题。但是hbase lookup不返回key的值。相反,它返回键、值对,如 MapRecord[{SERIAL_NUM=123456789}]
. 但我只要求值为'123456789'。
我无法解决这个问题。有人能帮我解决这个问题吗?
我正在分享我的流的输出。我们希望看到“hbase\u integid”的值为“123456789”,但查找服务返回 MapRecord[{SERIAL_NUM=123456789}]"
如何仅提取序列号值(“123456789”)的值??
[
{
"table" : SIEBEL.S_ASSET,
"op" : Update,
"hbase_integid" :**MapRecord[{SERIAL_NUM=123456789}]**,
"op_type" : U,
"op_ts" : 2018-04-02 05:48:35.055299,
"current_ts" : 2018-04-02T08:48:40.071000,
"pos" : 00000000020530642196,
"before" : {
"ROW_ID" : 1-G7B7EGF,
"BILLACCT_ID" : ,
"BILL_ACCNT_ID" : 1-G79GNWP,
"BILL_PROFILE_ID" : 1-1FJHFB0,
"INTEGRATION_ID" : 1-G79GU5K,
"NAME" : ,
"OWNER_ACCNT_ID" : 1-G79GEVV,
"OWNER_CON_ID" : 1-G79GEW3,
"PROD_ID" : 1-Q5B470,
"PROMOTION_ID" : ,
"PROM_INTEG_ID" : ,
"PROM_ITEM_ID" : ,
"PR_ACCNT_ID" : ,
"PR_CON_ID" : No Match Row Id,
"ROOT_ASSET_ID" : 1-G7B7EFI,
"SERIAL_NUM" : ,
"X_VF_MSISDN" : ,
"X_VF_PERMISSONS" :
},
"after" : {
"ROW_ID" : 1-G7B7EGF,
"BILLACCT_ID" : ,
"BILL_ACCNT_ID" : ,
"BILL_PROFILE_ID" : ,
"INTEGRATION_ID" : ,
"NAME" : ,
"OWNER_ACCNT_ID" : ,
"OWNER_CON_ID" : ,
"PROD_ID" : ,
"PROMOTION_ID" : ,
"PROM_INTEG_ID" : ,
"PROM_ITEM_ID" : ,
"PR_ACCNT_ID" : ,
"PR_CON_ID" : ,
"ROOT_ASSET_ID" : ,
"SERIAL_NUM" : ,
"X_VF_MSISDN" : ,
"X_VF_PERMISSONS" :
}
}
]
2条答案
按热度按时间hpcdzsge1#
查找记录作为maprecord提供输出;您可能需要随后使用其他处理器来提取序列号的值。
下面给出了一个工作示例。
hbase查找配置
一旦得到查找结果,就可以使用2个updaterecord处理器来提取和更新流文件中serial_num的值。
更新记录1配置
更新记录2配置
cxfofazt2#
如果您使用的是lookuprecord处理器,那么文档可能会有点误导。如屏幕截图所示设置处理器。确保以下两个属性设置为属性:
记录结果内容必须设置为“插入记录字段”
记录更新策略应设置为“使用属性”
最后一个结果recordpath必须设置为
/
而不是hbase_integid
(这似乎是文档中的一个bug)希望有帮助!