如何在nifi中更快地将数据从db2转储到cassandra

avwztpqn  于 2021-06-10  发布在  Cassandra
关注(0)|答案(1)|浏览(424)

我需要使用apachenifi将数据从db2加载到cassandra。我的db2表有大约40k条记录,完成到cassandra的数据转储大约需要15分钟。我为这个用例附上了两张当前nifi流的图片。观察到每秒仅读取100多条记录。谁能让我知道-如何调整流/处理器,以便我们可以提高速度(减少时间)的数据转储。
db2到cassandra nifi流-在执行脚本开始之前
执行脚本启动后
我附加了执行脚本,我们正在为cassandra转储准备insert语句。

import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import json
import csv
import io
import datetime

class TransformCallback(StreamCallback):
    def _init_(self):
        pass
    def process(self,inputStream,outputStream):
        inputdata = IOUtils.toString(inputStream,StandardCharsets.UTF_8)
        text = csv.reader(io.StringIO(inputdata))
        l = []
        for row in text:
            mon = row[0].strip()
            modified_date = str(datetime.datetime.strptime(str(mon), "%d%b%Y").strftime("%Y-%m-%d"))
            row[0] = modified_date
            row[1] = row[1].strip()
            row[2] = row[2].strip()
            l.append(row)
        values_str = json.dumps(l)
        leng = len(l)
        for i in range(leng):
            obj = json.loads(values_str)[i]  ## obj = dict
            newObj = {
                  "date": obj[0],
                  "max": obj[1],
                  "city": obj[2]
                }
            insert_query = ("INSERT INTO model.test_data JSON '"+json.dumps(newObj , indent=4)+"';").encode('utf-8')
            outputStream.write(bytearray(insert_query))

flowFile = session.get()
if flowFile != None:
    flowFile = session.write(flowFile,TransformCallback())
    flowFile = session.putAttribute(flowFile, "filename",flowFile.getAttribute('filename').split('.')[0]+'_result.json')
    session.transfer(flowFile, REL_SUCCESS)
    session.commit()
iyr7buue

iyr7buue1#

我不得不说,所需的转换可能与两个标准处理器有关: ConvertRecord 将csv记录转换为json ReplaceText 添加插入到。。。
如果您还想使用脚本,我可以帮助您使用groovy。以下脚本用于 ExecuteGroovyScript 处理器。
每次调用处理一个流文件
它转换流文件中的所有行,因此,不需要在此处理器之前按行拆分文件。

import groovy.json.JsonOutput

def ff=session.get()
if(!ff)return

ff.write{rawIn, rawOut->
    rawOut.withWriter("UTF-8"){w->
        rawIn.withReader("UTF-8"){r->
            //iterate lines from input reader and split each with coma
            r.splitEachLine( ',' ){row->
                //build object (map)
                def obj = [
                    "date": row[0],
                    "max" : row[1],
                    "city": row[2]
                ]
                //convert obj to json string
                def json = JsonOutput.toJson(obj)
                //write data to output
                w << "INSERT INTO model.test_data JSON '" << json << "';" << '\n'
            }
        }
    }
}

REL_SUCCESS << ff

每次调用处理多个流文件
与前面的类似,但具有流文件列表处理 fflist.each{ff-> ...} ```
import groovy.json.JsonOutput

def fflist=session.get(1000)
if(!fflist)return

fflist.each{ff->
ff.write{rawIn, rawOut->
rawOut.withWriter("UTF-8"){w->
rawIn.withReader("UTF-8"){r->
//iterate lines from input reader and split each with coma
r.splitEachLine( ',' ){row->
//build object (map)
def obj = [
"date": row[0],
"max" : row[1],
"city": row[2]
]
//convert obj to json string
def json = JsonOutput.toJson(obj)
//write data to output
w << "INSERT INTO model.test_data JSON '" << json << "';" << '\n'
}
}
}
}

REL_SUCCESS << ff

}

相关问题