本文整理了Java中org.apache.nifi.serialization.RecordReader
类的一些代码示例,展示了RecordReader
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。RecordReader
类的具体详情如下:
包路径:org.apache.nifi.serialization.RecordReader
类名称:RecordReader
[英]A RowRecordReader is responsible for parsing data and returning a record at a time in order to allow the caller to iterate over the records individually.
PLEASE NOTE: This interface is still considered 'unstable' and may change in a non-backward-compatible manner between minor or incremental releases of NiFi.
[中]RowRecordReader负责解析数据并一次返回一条记录,以允许调用方逐个遍历记录。
请注意:此接口仍被视为“不稳定”,可能在NiFi的次要或增量版本之间以不向后兼容的方式更改。
代码示例来源:origin: apache/nifi
throws IllegalArgumentException, MalformedRecordException, IOException, SQLException {
final RecordSchema recordSchema = recordParser.getSchema();
while ((currentRecord = recordParser.nextRecord()) != null) {
Object sql = currentRecord.getValue(sqlField);
if (sql == null || StringUtils.isEmpty((String) sql)) {
代码示例来源:origin: apache/nifi
try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, getLogger())) {
final RecordSchema schema = writerFactory.getSchema(originalAttributes, reader.getSchema());
final RecordSet recordSet = reader.createRecordSet();
final PushBackRecordSet pushbackSet = new PushBackRecordSet(recordSet);
代码示例来源:origin: apache/nifi
@Override
public void close() {
if (recordParser != null) {
try {
recordParser.close();
} catch (final Exception e) {
logger.warn("Failed to close decorated source for " + flowFile, e);
}
}
try {
rawIn.close();
} catch (final Exception e) {
logger.warn("Failed to close InputStream for " + flowFile, e);
}
}
}
代码示例来源:origin: apache/nifi
/**
* Returns the next record in the stream or <code>null</code> if no more records are available. Types will be coerced and any unknown fields will be dropped.
*
* @return the next record in the stream or <code>null</code> if no more records are available.
*
* @throws IOException if unable to read from the underlying data
* @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record
* @throws SchemaValidationException if a Record contains a field that violates the schema and cannot be coerced into the appropriate field type.
*/
default Record nextRecord() throws IOException, MalformedRecordException {
return nextRecord(true, true);
}
代码示例来源:origin: apache/nifi
@Override
public RecordSchema getSchema() throws IOException {
try {
return RecordReader.this.getSchema();
} catch (final MalformedRecordException mre) {
throw new IOException(mre);
}
}
代码示例来源:origin: apache/nifi
while ((record = recordReader.nextRecord()) != null) {
if (recordWriter == null) {
final OutputStream rawOut = session.write(merged);
recordReader.close();
flowFileSession.migrate(this.session, Collections.singleton(flowFile));
flowFileMigrated = true;
recordReader.close();
代码示例来源:origin: apache/nifi
@Override
public Record next() throws IOException {
try {
return RecordReader.this.nextRecord();
} catch (final MalformedRecordException mre) {
throw new IOException(mre);
}
}
};
代码示例来源:origin: apache/nifi
RecordSchema recordSchema = recordReader.getSchema();
for (RecordField field : recordSchema.getFields()) {
String fieldName = field.getFieldName();
代码示例来源:origin: org.apache.nifi/nifi-standard-processors
while ((record = recordReader.nextRecord()) != null) {
if (recordWriter == null) {
final OutputStream rawOut = session.write(merged);
recordReader.close();
flowFileSession.migrate(this.session, Collections.singleton(flowFile));
flowFileMigrated = true;
recordReader.close();
代码示例来源:origin: apache/nifi
@Override
public void process(final InputStream in) throws IOException {
try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, getLogger())) {
final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, reader.getSchema());
Record record;
while ((record = reader.nextRecord()) != null) {
final Set<Relationship> relationships = route(record, writeSchema, original, context, flowFileContext);
numRecords.incrementAndGet();
for (final Relationship relationship : relationships) {
final RecordSetWriter recordSetWriter;
Tuple<FlowFile, RecordSetWriter> tuple = writers.get(relationship);
if (tuple == null) {
FlowFile outFlowFile = session.create(original);
final OutputStream out = session.write(outFlowFile);
recordSetWriter = writerFactory.createWriter(getLogger(), writeSchema, out);
recordSetWriter.beginRecordSet();
tuple = new Tuple<>(outFlowFile, recordSetWriter);
writers.put(relationship, tuple);
} else {
recordSetWriter = tuple.getValue();
}
recordSetWriter.write(record);
}
}
} catch (final SchemaNotFoundException | MalformedRecordException e) {
throw new ProcessException("Could not parse incoming data", e);
}
}
});
代码示例来源:origin: apache/nifi
@Override
public void write(long writeId, InputStream inputStream) throws StreamingException {
// The inputStream is already available to the recordReader, so just iterate through the records
try {
Record record;
while ((record = recordReader.nextRecord()) != null) {
write(writeId, record);
}
} catch (MalformedRecordException | IOException e) {
throw new StreamingException(e.getLocalizedMessage(), e);
}
}
代码示例来源:origin: apache/nifi
@Override
public RelDataType getRowType(final RelDataTypeFactory typeFactory) {
if (relDataType != null) {
return relDataType;
}
RecordSchema schema;
try (final InputStream in = session.read(flowFile)) {
final RecordReader recordParser = recordParserFactory.createRecordReader(flowFile, in, logger);
schema = recordParser.getSchema();
} catch (final Exception e) {
throw new ProcessException("Failed to determine schema of data records for " + flowFile, e);
}
final List<String> names = new ArrayList<>();
final List<RelDataType> types = new ArrayList<>();
final JavaTypeFactory javaTypeFactory = (JavaTypeFactory) typeFactory;
for (final RecordField field : schema.getFields()) {
names.add(field.getFieldName());
final RelDataType relDataType = getRelDataType(field.getDataType(), javaTypeFactory);
types.add(javaTypeFactory.createTypeWithNullability(relDataType, field.isNullable()));
}
logger.debug("Found Schema: {}", new Object[] {schema});
if (recordSchema == null) {
recordSchema = schema;
}
relDataType = typeFactory.createStructType(Pair.zip(names, types));
return relDataType;
}
代码示例来源:origin: apache/nifi
try (final InputStream in = session.read(flowFile);
final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
final List<String> fieldNames = recordReader.getSchema().getFieldNames();
final RecordSet recordSet = recordReader.createRecordSet();
代码示例来源:origin: org.apache.nifi/nifi-standard-processors
@Override
public void close() {
if (recordParser != null) {
try {
recordParser.close();
} catch (final Exception e) {
logger.warn("Failed to close decorated source for " + flowFile, e);
}
}
try {
rawIn.close();
} catch (final Exception e) {
logger.warn("Failed to close InputStream for " + flowFile, e);
}
}
}
代码示例来源:origin: apache/nifi
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, getLogger())) {
final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, reader.getSchema());
try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out)) {
writer.beginRecordSet();
Record record;
while ((record = reader.nextRecord()) != null) {
final Record processed = AbstractRecordProcessor.this.process(record, writeSchema, original, context);
writer.write(processed);
}
final WriteResult writeResult = writer.finishRecordSet();
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
attributes.putAll(writeResult.getAttributes());
recordCount.set(writeResult.getRecordCount());
}
} catch (final SchemaNotFoundException e) {
throw new ProcessException(e.getLocalizedMessage(), e);
} catch (final MalformedRecordException e) {
throw new ProcessException("Could not parse incoming data", e);
}
}
});
代码示例来源:origin: apache/nifi
while ((record = reader.nextRecord()) != null) {
for (Map.Entry<String, RecordPath> entry : paths.entrySet()) {
RecordPathResult result = entry.getValue().evaluate(record);
代码示例来源:origin: apache/nifi
protected RecordSchema getValidationSchema(final ProcessContext context, final FlowFile flowFile, final RecordReader reader)
throws MalformedRecordException, IOException, SchemaNotFoundException {
final String schemaAccessStrategy = context.getProperty(SCHEMA_ACCESS_STRATEGY).getValue();
if (schemaAccessStrategy.equals(READER_SCHEMA.getValue())) {
return reader.getSchema();
} else if (schemaAccessStrategy.equals(SCHEMA_NAME_PROPERTY.getValue())) {
final SchemaRegistry schemaRegistry = context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name(schemaName).build();
return schemaRegistry.retrieveSchema(schemaIdentifier);
} else if (schemaAccessStrategy.equals(SCHEMA_TEXT_PROPERTY.getValue())) {
final String schemaText = context.getProperty(SCHEMA_TEXT).evaluateAttributeExpressions(flowFile).getValue();
final Parser parser = new Schema.Parser();
final Schema avroSchema = parser.parse(schemaText);
return AvroTypeUtil.createSchema(avroSchema);
} else {
throw new ProcessException("Invalid Schema Access Strategy: " + schemaAccessStrategy);
}
}
}
代码示例来源:origin: apache/nifi
final RecordSet recordSet = recordReader.createRecordSet();
recordWriter = createHDFSRecordWriter(context, flowFile, configuration, tempFile, recordReader.getSchema());
writeResult.set(recordWriter.write(recordSet));
} catch (Exception e) {
代码示例来源:origin: apache/nifi
throws IllegalArgumentException, MalformedRecordException, IOException, SQLException {
final RecordSchema recordSchema = recordParser.getSchema();
final ComponentLog log = getLogger();
int batchIndex = 0;
while ((currentRecord = recordParser.nextRecord()) != null) {
Object[] values = currentRecord.getValues();
if (values != null) {
代码示例来源:origin: apache/nifi
return Optional.ofNullable(errOrReader.get().getValue().nextRecord());
} catch (Exception e) {
throw new LookupFailureException(String.format("Failed to read Record when looking up with %s", coordinates), e);
内容来源于网络,如有侵权,请联系作者删除!