org.apache.nifi.serialization.RecordReader类的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(9.7k)|赞(0)|评价(0)|浏览(166)

本文整理了Java中org.apache.nifi.serialization.RecordReader类的一些代码示例,展示了RecordReader类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。RecordReader类的具体详情如下:
包路径:org.apache.nifi.serialization.RecordReader
类名称:RecordReader

RecordReader介绍

[英]A RowRecordReader is responsible for parsing data and returning a record at a time in order to allow the caller to iterate over the records individually.

PLEASE NOTE: This interface is still considered 'unstable' and may change in a non-backward-compatible manner between minor or incremental releases of NiFi.
[中]RowRecordReader负责解析数据并一次返回一条记录,以允许调用方逐个遍历记录。
请注意:此接口仍被视为“不稳定”,可能在NiFi的次要或增量版本之间以不向后兼容的方式更改。

代码示例

代码示例来源:origin: apache/nifi

throws IllegalArgumentException, MalformedRecordException, IOException, SQLException {
final RecordSchema recordSchema = recordParser.getSchema();
  while ((currentRecord = recordParser.nextRecord()) != null) {
    Object sql = currentRecord.getValue(sqlField);
    if (sql == null || StringUtils.isEmpty((String) sql)) {

代码示例来源:origin: apache/nifi

try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, getLogger())) {
  final RecordSchema schema = writerFactory.getSchema(originalAttributes, reader.getSchema());
  final RecordSet recordSet = reader.createRecordSet();
  final PushBackRecordSet pushbackSet = new PushBackRecordSet(recordSet);

代码示例来源:origin: apache/nifi

@Override
  public void close() {
    if (recordParser != null) {
      try {
        recordParser.close();
      } catch (final Exception e) {
        logger.warn("Failed to close decorated source for " + flowFile, e);
      }
    }

    try {
      rawIn.close();
    } catch (final Exception e) {
      logger.warn("Failed to close InputStream for " + flowFile, e);
    }
  }
}

代码示例来源:origin: apache/nifi

/**
 * Returns the next record in the stream or <code>null</code> if no more records are available. Types will be coerced and any unknown fields will be dropped.
 *
 * @return the next record in the stream or <code>null</code> if no more records are available.
 *
 * @throws IOException if unable to read from the underlying data
 * @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record
 * @throws SchemaValidationException if a Record contains a field that violates the schema and cannot be coerced into the appropriate field type.
 */
default Record nextRecord() throws IOException, MalformedRecordException {
  return nextRecord(true, true);
}

代码示例来源:origin: apache/nifi

@Override
public RecordSchema getSchema() throws IOException {
  try {
    return RecordReader.this.getSchema();
  } catch (final MalformedRecordException mre) {
    throw new IOException(mre);
  }
}

代码示例来源:origin: apache/nifi

while ((record = recordReader.nextRecord()) != null) {
  if (recordWriter == null) {
    final OutputStream rawOut = session.write(merged);
recordReader.close();
flowFileSession.migrate(this.session, Collections.singleton(flowFile));
flowFileMigrated = true;
  recordReader.close();

代码示例来源:origin: apache/nifi

@Override
  public Record next() throws IOException {
    try {
      return RecordReader.this.nextRecord();
    } catch (final MalformedRecordException mre) {
      throw new IOException(mre);
    }
  }
};

代码示例来源:origin: apache/nifi

RecordSchema recordSchema = recordReader.getSchema();
for (RecordField field : recordSchema.getFields()) {
  String fieldName = field.getFieldName();

代码示例来源:origin: org.apache.nifi/nifi-standard-processors

while ((record = recordReader.nextRecord()) != null) {
  if (recordWriter == null) {
    final OutputStream rawOut = session.write(merged);
recordReader.close();
flowFileSession.migrate(this.session, Collections.singleton(flowFile));
flowFileMigrated = true;
  recordReader.close();

代码示例来源:origin: apache/nifi

@Override
  public void process(final InputStream in) throws IOException {
    try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, getLogger())) {
      final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, reader.getSchema());
      Record record;
      while ((record = reader.nextRecord()) != null) {
        final Set<Relationship> relationships = route(record, writeSchema, original, context, flowFileContext);
        numRecords.incrementAndGet();
        for (final Relationship relationship : relationships) {
          final RecordSetWriter recordSetWriter;
          Tuple<FlowFile, RecordSetWriter> tuple = writers.get(relationship);
          if (tuple == null) {
            FlowFile outFlowFile = session.create(original);
            final OutputStream out = session.write(outFlowFile);
            recordSetWriter = writerFactory.createWriter(getLogger(), writeSchema, out);
            recordSetWriter.beginRecordSet();
            tuple = new Tuple<>(outFlowFile, recordSetWriter);
            writers.put(relationship, tuple);
          } else {
            recordSetWriter = tuple.getValue();
          }
          recordSetWriter.write(record);
        }
      }
    } catch (final SchemaNotFoundException | MalformedRecordException e) {
      throw new ProcessException("Could not parse incoming data", e);
    }
  }
});

代码示例来源:origin: apache/nifi

@Override
public void write(long writeId, InputStream inputStream) throws StreamingException {
  // The inputStream is already available to the recordReader, so just iterate through the records
  try {
    Record record;
    while ((record = recordReader.nextRecord()) != null) {
      write(writeId, record);
    }
  } catch (MalformedRecordException | IOException e) {
    throw new StreamingException(e.getLocalizedMessage(), e);
  }
}

代码示例来源:origin: apache/nifi

@Override
public RelDataType getRowType(final RelDataTypeFactory typeFactory) {
  if (relDataType != null) {
    return relDataType;
  }
  RecordSchema schema;
  try (final InputStream in = session.read(flowFile)) {
    final RecordReader recordParser = recordParserFactory.createRecordReader(flowFile, in, logger);
    schema = recordParser.getSchema();
  } catch (final Exception e) {
    throw new ProcessException("Failed to determine schema of data records for " + flowFile, e);
  }
  final List<String> names = new ArrayList<>();
  final List<RelDataType> types = new ArrayList<>();
  final JavaTypeFactory javaTypeFactory = (JavaTypeFactory) typeFactory;
  for (final RecordField field : schema.getFields()) {
    names.add(field.getFieldName());
    final RelDataType relDataType = getRelDataType(field.getDataType(), javaTypeFactory);
    types.add(javaTypeFactory.createTypeWithNullability(relDataType, field.isNullable()));
  }
  logger.debug("Found Schema: {}", new Object[] {schema});
  if (recordSchema == null) {
    recordSchema = schema;
  }
  relDataType = typeFactory.createStructType(Pair.zip(names, types));
  return relDataType;
}

代码示例来源:origin: apache/nifi

try (final InputStream in = session.read(flowFile);
   final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
  final List<String> fieldNames = recordReader.getSchema().getFieldNames();
  final RecordSet recordSet = recordReader.createRecordSet();

代码示例来源:origin: org.apache.nifi/nifi-standard-processors

@Override
  public void close() {
    if (recordParser != null) {
      try {
        recordParser.close();
      } catch (final Exception e) {
        logger.warn("Failed to close decorated source for " + flowFile, e);
      }
    }

    try {
      rawIn.close();
    } catch (final Exception e) {
      logger.warn("Failed to close InputStream for " + flowFile, e);
    }
  }
}

代码示例来源:origin: apache/nifi

@Override
  public void process(final InputStream in, final OutputStream out) throws IOException {
    try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, getLogger())) {
      final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, reader.getSchema());
      try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out)) {
        writer.beginRecordSet();
        Record record;
        while ((record = reader.nextRecord()) != null) {
          final Record processed = AbstractRecordProcessor.this.process(record, writeSchema, original, context);
          writer.write(processed);
        }
        final WriteResult writeResult = writer.finishRecordSet();
        attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
        attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
        attributes.putAll(writeResult.getAttributes());
        recordCount.set(writeResult.getRecordCount());
      }
    } catch (final SchemaNotFoundException e) {
      throw new ProcessException(e.getLocalizedMessage(), e);
    } catch (final MalformedRecordException e) {
      throw new ProcessException("Could not parse incoming data", e);
    }
  }
});

代码示例来源:origin: apache/nifi

while ((record = reader.nextRecord()) != null) {
  for (Map.Entry<String, RecordPath> entry : paths.entrySet()) {
    RecordPathResult result = entry.getValue().evaluate(record);

代码示例来源:origin: apache/nifi

protected RecordSchema getValidationSchema(final ProcessContext context, final FlowFile flowFile, final RecordReader reader)
    throws MalformedRecordException, IOException, SchemaNotFoundException {
    final String schemaAccessStrategy = context.getProperty(SCHEMA_ACCESS_STRATEGY).getValue();
    if (schemaAccessStrategy.equals(READER_SCHEMA.getValue())) {
      return reader.getSchema();
    } else if (schemaAccessStrategy.equals(SCHEMA_NAME_PROPERTY.getValue())) {
      final SchemaRegistry schemaRegistry = context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
      final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
      final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name(schemaName).build();
      return schemaRegistry.retrieveSchema(schemaIdentifier);
    } else if (schemaAccessStrategy.equals(SCHEMA_TEXT_PROPERTY.getValue())) {
      final String schemaText = context.getProperty(SCHEMA_TEXT).evaluateAttributeExpressions(flowFile).getValue();
      final Parser parser = new Schema.Parser();
      final Schema avroSchema = parser.parse(schemaText);
      return AvroTypeUtil.createSchema(avroSchema);
    } else {
      throw new ProcessException("Invalid Schema Access Strategy: " + schemaAccessStrategy);
    }
  }
}

代码示例来源:origin: apache/nifi

final RecordSet recordSet = recordReader.createRecordSet();
  recordWriter = createHDFSRecordWriter(context, flowFile, configuration, tempFile, recordReader.getSchema());
  writeResult.set(recordWriter.write(recordSet));
} catch (Exception e) {

代码示例来源:origin: apache/nifi

throws IllegalArgumentException, MalformedRecordException, IOException, SQLException {
final RecordSchema recordSchema = recordParser.getSchema();
final ComponentLog log = getLogger();
  int batchIndex = 0;
  while ((currentRecord = recordParser.nextRecord()) != null) {
    Object[] values = currentRecord.getValues();
    if (values != null) {

代码示例来源:origin: apache/nifi

return Optional.ofNullable(errOrReader.get().getValue().nextRecord());
} catch (Exception e) {
  throw new LookupFailureException(String.format("Failed to read Record when looking up with %s", coordinates), e);

相关文章