如何为每个流执行脚本文件Groovy使用NiFi按文件名文件数据

kq0g1dla  于 2023-08-02  发布在  其他
关注(0)|答案(1)|浏览(122)

使用NiFi,我生成flowfiles数据,并通过传入参数(我拥有的数据flowfiles的文件名)来检索groovy脚本文件,我想知道如何为每个数据flowfiles逐个执行groovy脚本(文件)??!!
x1c 0d1x的数据
谢谢你的帮助

r9f1avp5

r9f1avp51#

@daggett这是每个表的3个流文件中的数据示例:

Table_CCL

+0002150736GG+0801448361KL      0001-01-01SSS+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000-000000000002860+000000000000000+000000000000000     1+0000000000000002023-05-23+0601148361
+0002152097NN+0901363467ML      0001-01-01RRR+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000011845+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000     1+0000000000000002023-05-23+0501463467
+0002155389PP+0001999097LL      0001-01-01XXX+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000-000000000002405+000000000000000+000000000000000     1+0000000000000002023-05-23+0401899097

Table_LCP

PX37115291656+0002004736XXL30518240      915292479     +0700156745+0700156745+0700156745D+00351     082023-05-072023-04-252023-05-012023-05-07          +000000000000000+000000000000000+00001302EUR+000000000005800+000000000005800+000000000005800+000.0000512023-05-23                    21             N          C50ED50 I014959 2023-05-231S+0000000000+0600076990+0000001000                    +000000000000000+000000000000000               0   0FJUY_AV                 5FX7SI+0800156745
PX37083105408+0002059840XXL30425660      914119751     +0700757773+0700757773+0700757773D+00351     082023-04-072023-03-242023-04-012023-04-07          +000000000000000+000000000000000+00001302EUR+000000000005298+000000000005298+000000000005298+000.0000842023-05-23                    21             N          C50ED50 I021948 2023-05-231S+0000000000+0600361406+0000001000                    +000000000000000+000000000000000               0   0FVFR_AV                 5FX7SI+0900757773
PX37093149567+0002169371XXL30505273      22022657S     +0702188717+0702188717+0702188717D+00550     082023-04-092023-04-032023-04-032023-04-09          +000000000000000+000000000000000+00001302EUR+000000000372598+000000000310498+000000000000000+000.0000552023-05-23                    20             N          C50ED50 I014959 2023-05-231S+0000000000+0600960024+0000001000                    +000000000000000+00000000000000022022657S      0   0FATR_AV   3             5FX7SI+0602188717

Table_LDC

PX37143489198+0000+0002175736LLM30003604      23200228S  D+00804  2023-05-312023-05-31          +000000000043200+0000000000000003XXX+000000000043200+000000000043200+000000000000000+000.00000                C50ED50         2023-05-23          +00000S          0   0FACT_AV   082023-05-310              +005                                 0
PX37143489199+0000+0002003027LLL30633874      23210097S  D+00350  2023-05-292023-05-29          +000000000002328+0000000000000003YYY+000000000002328+000000000002328+000000000002328+000.00001                C50ED50 C50ED50 2023-05-23          +00000S          0   0FACT_AV   082023-05-290              +005                                 0

字符串
这是脚本groovy解析数据流文件中的table LDC的例子(我有3个不同的脚本来解析数据流文件中的文件数据并转换为json格式:

import groovy.json.JsonBuilder
import java.nio.charset.StandardCharsets
import java.time.LocalDate
import java.lang.Exception
import java.lang.String
import groovy.lang.ObjectRange
import java.time.format.DateTimeFormatter
import java.text.SimpleDateFormat

Number toNumber( String line, IntRange range) {
        try {
            String v = line[range]?.trim()
            return v ? v.toBigDecimal() : null   
        } catch( Exception ex ) {
            throw new Exception("Encountered exception at ${range} in line ${line}: ${ex}", ex)
            }
}
def flowFile = session.get()
if (flowFile) {

    def datePattern = new SimpleDateFormat('yyyy-MM-dd')

    def records = []

    flowFile.read().withReader('ISO-8859-1') { reader ->
        reader.eachLine { line ->
            def record = [
                SEQ_LD: 'ADMIN.SEQ_INFO.NEXTVAL',
                ID_LIG_CPTE: line[0..12]?.trim() ?: '',
                NO_ORD: toNumber(line, 13..17 ),
                NO_CCL: toNumber(line, 18..28 ),
                CD_PA: line[29..30]?.trim() ?: '',
                ID_MVT: line[31..45]?.trim() ?: '',
                NO_CNT: line[46..56]?.trim() ?: '',
                CD_SENS: line[57..57]?.trim() ?: '',
                ID_INTRN_MVT: toNumber(line, 58..63 ),
                SS_ID_INTRN_MVT: line[64..65]?.trim() ?: '',
                DT_EXBL: line[66..75]?.trim() != '0001-01-01' ? datePattern.parse(line[66..75]?.trim()) : null,
                DT_VAL_CLI: line[76..85]?.trim() != '0001-01-01' ? datePattern.parse(line[76..85]?.trim()) : null,
                DT_PEC_CAL_IR: line[86..95]?.trim() != '0001-01-01' ? datePattern.parse(line[86..95]?.trim()) : null,
                MT_SOL_MVT: toNumber(line, 96..111 ),
                MT_SOL_AST_IR: toNumber(line, 112..127 ),
                STA_MVT: line[128..128]?.trim() ?: '',
                CD_DEV: line[129..131]?.trim() ?: '',
                MT_MVT: toNumber(line, 132..147 ),
                MT_MVT_HT: toNumber(line, 148..163 ),
                MT_AST_IR: toNumber(line, 164..179 ),
                TX_TVA_CCL: toNumber(line, 180..188 ),
                IND_PEC_CAL_IR: line[189..189]?.trim() ?: '',
                ID_ACO: line[190..202]?.trim() ?: '',
                CD_TY_ETA: line[203..204]?.trim() ?: '',
                CD_ETA: line[205..205]?.trim() ?: '',
                ID_COLLA_CRE: line[206..213]?.trim() ?: '',
                ID_COLLA_MAJ: line[214..221]?.trim() ?: '',
                DT_CRE_CCL: line[222..231]?.trim() != '0001-01-01' ? datePattern.parse(line[222..231]?.trim()) : null,
                DT_MAJ: line[232..241]?.trim() != '0001-01-01' ? datePattern.parse(line[232..241]?.trim()) : null,
                NO_PRI_LTR_ACO: toNumber(line, 242..247 ),
                ID_INTRN_PROD: line[248..248]?.trim() ?: '',
                DT_TRT: datePattern.parse('2023-06-05'),
                CD_SOC_FOUR: line[259..262]?.trim() ?: '',
                IND_REPRIS_STD: line[263..263]?.trim() ?: '',
                CD_FAM_MVT_DEBI: line[264..273]?.trim() ?: '',
                MOD_PAI: line[274..275]?.trim() ?: '',
                DT_ECH_FACT: line[276..285]?.trim() != '0001-01-01' ? datePattern.parse(line[276..285]?.trim()) : null,
                NO_ECH: line[286..288]?.trim() ?: '',
                CD_ODM: line[289..289]?.trim() ?: '',
                CD_MOT_ODM: line[290..292]?.trim() ?: '',
                CD_MOT_REMB: line[293..300]?.trim() ?: '',
                DEL_IMP: toNumber(line, 301..304 ),
                IND_IMP: line[305..305]?.trim() ?: '',
                DT_VAL_IMP: line[306..315]?.trim() != '0001-01-01' ? datePattern.parse(line[306..315]?.trim()) : null,
                DT_EVT_IMP: line[316..325]?.trim() != '0001-01-01' ? datePattern.parse(line[316..325]?.trim()) : null,
                DT_RETR_IMP: line[326..335]?.trim() != '0001-01-01' ? datePattern.parse(line[326..335]?.trim()) : null,
                MOT_IMP: line[336..337]?.trim() ?: '',
                IND_PEC_EXBL: line[338..338]?.trim() ?: ''
            ]
            records << record
        }
    }

    def json = new JsonBuilder(records)
    def jsonBytes = json.toString().getBytes(StandardCharsets.UTF_8)
    flowFile = session.write(flowFile, { outputStream ->
        outputStream.write(jsonBytes)
    } as OutputStreamCallback)

    flowFile = session.putAttribute(flowFile, 'filename', 'ti_ldc.json')
    session.transfer(flowFile, REL_SUCCESS)
}


x1c 0d1x的数据



输出中的结果必须是由groovy脚本处理的3 Flowfile数据作为输入。

相关问题