hbase计数器与apachenifi

92dk7w1h  于 2021-06-10  发布在  Hbase
关注(0)|答案(1)|浏览(452)

我的任务很简单-我想使用apachenifi在hbase(counter)中增加列值。
我有一个accountid作为行键,我想根据流值增加/减少余额列。用nifi最好的方法是什么。
例如,账户a的余额起始值=100。我得到(a,-20)作为一个事件。什么是最好的开箱即用处理器来完成这项工作(平衡=80)。似乎所有这些都将取代价值。我也愿意改变我的模式。。。
我尝试编写groovy脚本,但在nifi中出现了这个错误。我只是想知道我的基本结构是不是错了。
2017-03-10 06:38:54067错误[timer-driven process thread-6]o.a.nifi.processors.script.executescript executescript[id=b5a0e7b7-015a-1000-ab9c-0696c8297e8d]executescript[id=b5a0e7b7-015a-1000-ab9c-0696c8297e8d]由于java.lang无法处理。noclassdeffounderror:org/apache/hadoop/conf/configuration;正在回滚会话:

import org.apache.nifi.controller.ControllerService
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Connection
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.client.ResultScanner
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.client.Table
import org.apache.hadoop.hbase.util.Bytes

def lookup = context.controllerServiceLookup
def HbaseServiceName =HBaseClient.value
def HBaseServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find { 
    cs -> lookup.getControllerServiceName(cs) == HBaseServiceName
}
def conn = lookup.getControllerService(HBaseServiceId)?.getConnection()
try {
    flowFile = session.create()
    def table = conn.getTable(TableName.valueOf("crap"))
    myfile = flowFile.getAttribute("filename")
    def p = new Put(Bytes.toBytes("crap")); 
    p.add(Bytes.toBytes("crap"), Bytes.toBytes("cf1"),Bytes.toBytes("SomeValue"))
    table.put(p);
    session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
    log.error('Scripting error', e)
    session.transfer(flowFile, REL_FAILURE)
}
conn?.close()
frebpwbc

frebpwbc1#

你说得对 PutHBaseCell 以及 PutHBaseJSON 将流文件内容放入各自的hbase目标中。你可能想做的是 GetHBase 要检索初始值,请使用counters保持一个正在运行的计数器(参见此处的教程),然后用正确的值更新hbase单元。您也可以使用 DistributedMapCache 在共享内存空间中获取/计算/存储值的系统。

相关问题