使用NiFi,我生成flowfiles数据,并通过传入参数(我拥有的数据flowfiles的文件名)来检索groovy脚本文件,我想知道如何为每个数据flowfiles逐个执行groovy脚本(文件)??!!x1c 0d1x的数据谢谢你的帮助
r9f1avp51#
@daggett这是每个表的3个流文件中的数据示例:
Table_CCL +0002150736GG+0801448361KL 0001-01-01SSS+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000-000000000002860+000000000000000+000000000000000 1+0000000000000002023-05-23+0601148361 +0002152097NN+0901363467ML 0001-01-01RRR+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000011845+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000 1+0000000000000002023-05-23+0501463467 +0002155389PP+0001999097LL 0001-01-01XXX+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000+000000000000000-000000000002405+000000000000000+000000000000000 1+0000000000000002023-05-23+0401899097 Table_LCP PX37115291656+0002004736XXL30518240 915292479 +0700156745+0700156745+0700156745D+00351 082023-05-072023-04-252023-05-012023-05-07 +000000000000000+000000000000000+00001302EUR+000000000005800+000000000005800+000000000005800+000.0000512023-05-23 21 N C50ED50 I014959 2023-05-231S+0000000000+0600076990+0000001000 +000000000000000+000000000000000 0 0FJUY_AV 5FX7SI+0800156745 PX37083105408+0002059840XXL30425660 914119751 +0700757773+0700757773+0700757773D+00351 082023-04-072023-03-242023-04-012023-04-07 +000000000000000+000000000000000+00001302EUR+000000000005298+000000000005298+000000000005298+000.0000842023-05-23 21 N C50ED50 I021948 2023-05-231S+0000000000+0600361406+0000001000 +000000000000000+000000000000000 0 0FVFR_AV 5FX7SI+0900757773 PX37093149567+0002169371XXL30505273 22022657S +0702188717+0702188717+0702188717D+00550 082023-04-092023-04-032023-04-032023-04-09 +000000000000000+000000000000000+00001302EUR+000000000372598+000000000310498+000000000000000+000.0000552023-05-23 20 N C50ED50 I014959 2023-05-231S+0000000000+0600960024+0000001000 +000000000000000+00000000000000022022657S 0 0FATR_AV 3 5FX7SI+0602188717 Table_LDC PX37143489198+0000+0002175736LLM30003604 23200228S D+00804 2023-05-312023-05-31 +000000000043200+0000000000000003XXX+000000000043200+000000000043200+000000000000000+000.00000 C50ED50 2023-05-23 +00000S 0 0FACT_AV 082023-05-310 +005 0 PX37143489199+0000+0002003027LLL30633874 23210097S D+00350 2023-05-292023-05-29 +000000000002328+0000000000000003YYY+000000000002328+000000000002328+000000000002328+000.00001 C50ED50 C50ED50 2023-05-23 +00000S 0 0FACT_AV 082023-05-290 +005 0
字符串这是脚本groovy解析数据流文件中的table LDC的例子(我有3个不同的脚本来解析数据流文件中的文件数据并转换为json格式:
import groovy.json.JsonBuilder import java.nio.charset.StandardCharsets import java.time.LocalDate import java.lang.Exception import java.lang.String import groovy.lang.ObjectRange import java.time.format.DateTimeFormatter import java.text.SimpleDateFormat Number toNumber( String line, IntRange range) { try { String v = line[range]?.trim() return v ? v.toBigDecimal() : null } catch( Exception ex ) { throw new Exception("Encountered exception at ${range} in line ${line}: ${ex}", ex) } } def flowFile = session.get() if (flowFile) { def datePattern = new SimpleDateFormat('yyyy-MM-dd') def records = [] flowFile.read().withReader('ISO-8859-1') { reader -> reader.eachLine { line -> def record = [ SEQ_LD: 'ADMIN.SEQ_INFO.NEXTVAL', ID_LIG_CPTE: line[0..12]?.trim() ?: '', NO_ORD: toNumber(line, 13..17 ), NO_CCL: toNumber(line, 18..28 ), CD_PA: line[29..30]?.trim() ?: '', ID_MVT: line[31..45]?.trim() ?: '', NO_CNT: line[46..56]?.trim() ?: '', CD_SENS: line[57..57]?.trim() ?: '', ID_INTRN_MVT: toNumber(line, 58..63 ), SS_ID_INTRN_MVT: line[64..65]?.trim() ?: '', DT_EXBL: line[66..75]?.trim() != '0001-01-01' ? datePattern.parse(line[66..75]?.trim()) : null, DT_VAL_CLI: line[76..85]?.trim() != '0001-01-01' ? datePattern.parse(line[76..85]?.trim()) : null, DT_PEC_CAL_IR: line[86..95]?.trim() != '0001-01-01' ? datePattern.parse(line[86..95]?.trim()) : null, MT_SOL_MVT: toNumber(line, 96..111 ), MT_SOL_AST_IR: toNumber(line, 112..127 ), STA_MVT: line[128..128]?.trim() ?: '', CD_DEV: line[129..131]?.trim() ?: '', MT_MVT: toNumber(line, 132..147 ), MT_MVT_HT: toNumber(line, 148..163 ), MT_AST_IR: toNumber(line, 164..179 ), TX_TVA_CCL: toNumber(line, 180..188 ), IND_PEC_CAL_IR: line[189..189]?.trim() ?: '', ID_ACO: line[190..202]?.trim() ?: '', CD_TY_ETA: line[203..204]?.trim() ?: '', CD_ETA: line[205..205]?.trim() ?: '', ID_COLLA_CRE: line[206..213]?.trim() ?: '', ID_COLLA_MAJ: line[214..221]?.trim() ?: '', DT_CRE_CCL: line[222..231]?.trim() != '0001-01-01' ? datePattern.parse(line[222..231]?.trim()) : null, DT_MAJ: line[232..241]?.trim() != '0001-01-01' ? datePattern.parse(line[232..241]?.trim()) : null, NO_PRI_LTR_ACO: toNumber(line, 242..247 ), ID_INTRN_PROD: line[248..248]?.trim() ?: '', DT_TRT: datePattern.parse('2023-06-05'), CD_SOC_FOUR: line[259..262]?.trim() ?: '', IND_REPRIS_STD: line[263..263]?.trim() ?: '', CD_FAM_MVT_DEBI: line[264..273]?.trim() ?: '', MOD_PAI: line[274..275]?.trim() ?: '', DT_ECH_FACT: line[276..285]?.trim() != '0001-01-01' ? datePattern.parse(line[276..285]?.trim()) : null, NO_ECH: line[286..288]?.trim() ?: '', CD_ODM: line[289..289]?.trim() ?: '', CD_MOT_ODM: line[290..292]?.trim() ?: '', CD_MOT_REMB: line[293..300]?.trim() ?: '', DEL_IMP: toNumber(line, 301..304 ), IND_IMP: line[305..305]?.trim() ?: '', DT_VAL_IMP: line[306..315]?.trim() != '0001-01-01' ? datePattern.parse(line[306..315]?.trim()) : null, DT_EVT_IMP: line[316..325]?.trim() != '0001-01-01' ? datePattern.parse(line[316..325]?.trim()) : null, DT_RETR_IMP: line[326..335]?.trim() != '0001-01-01' ? datePattern.parse(line[326..335]?.trim()) : null, MOT_IMP: line[336..337]?.trim() ?: '', IND_PEC_EXBL: line[338..338]?.trim() ?: '' ] records << record } } def json = new JsonBuilder(records) def jsonBytes = json.toString().getBytes(StandardCharsets.UTF_8) flowFile = session.write(flowFile, { outputStream -> outputStream.write(jsonBytes) } as OutputStreamCallback) flowFile = session.putAttribute(flowFile, 'filename', 'ti_ldc.json') session.transfer(flowFile, REL_SUCCESS) }
型x1c 0d1x的数据
的输出中的结果必须是由groovy脚本处理的3 Flowfile数据作为输入。
1条答案
按热度按时间r9f1avp51#
@daggett这是每个表的3个流文件中的数据示例:
字符串
这是脚本groovy解析数据流文件中的table LDC的例子(我有3个不同的脚本来解析数据流文件中的文件数据并转换为json格式:
型
x1c 0d1x的数据
的
输出中的结果必须是由groovy脚本处理的3 Flowfile数据作为输入。