org.apache.nifi.serialization.RecordReader.nextRecord()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(8.0k)|赞(0)|评价(0)|浏览(175)

本文整理了Java中org.apache.nifi.serialization.RecordReader.nextRecord方法的一些代码示例,展示了RecordReader.nextRecord的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。RecordReader.nextRecord方法的具体详情如下:
包路径:org.apache.nifi.serialization.RecordReader
类名称:RecordReader
方法名:nextRecord

RecordReader.nextRecord介绍

[英]Returns the next record in the stream or null if no more records are available. Types will be coerced and any unknown fields will be dropped.
[中]返回流中的下一条记录,如果没有更多记录,则返回null。类型将被强制,任何未知字段都将被删除。

代码示例

代码示例来源:origin: apache/nifi

/**
 * Returns the next record in the stream or <code>null</code> if no more records are available. Types will be coerced and any unknown fields will be dropped.
 *
 * @return the next record in the stream or <code>null</code> if no more records are available.
 *
 * @throws IOException if unable to read from the underlying data
 * @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse a record
 * @throws SchemaValidationException if a Record contains a field that violates the schema and cannot be coerced into the appropriate field type.
 */
default Record nextRecord() throws IOException, MalformedRecordException {
  return nextRecord(true, true);
}

代码示例来源:origin: apache/nifi

@Override
  public Record next() throws IOException {
    try {
      return RecordReader.this.nextRecord();
    } catch (final MalformedRecordException mre) {
      throw new IOException(mre);
    }
  }
};

代码示例来源:origin: apache/nifi

@Override
public void write(long writeId, InputStream inputStream) throws StreamingException {
  // The inputStream is already available to the recordReader, so just iterate through the records
  try {
    Record record;
    while ((record = recordReader.nextRecord()) != null) {
      write(writeId, record);
    }
  } catch (MalformedRecordException | IOException e) {
    throw new StreamingException(e.getLocalizedMessage(), e);
  }
}

代码示例来源:origin: apache/nifi

while ((record = reader.nextRecord()) != null) {
  for (Map.Entry<String, RecordPath> entry : paths.entrySet()) {
    RecordPathResult result = entry.getValue().evaluate(record);

代码示例来源:origin: apache/nifi

return Optional.ofNullable(errOrReader.get().getValue().nextRecord());
} catch (Exception e) {
  throw new LookupFailureException(String.format("Failed to read Record when looking up with %s", coordinates), e);

代码示例来源:origin: apache/nifi

while ((currentRecord = recordParser.nextRecord()) != null) {
  Object sql = currentRecord.getValue(sqlField);
  if (sql == null || StringUtils.isEmpty((String) sql)) {

代码示例来源:origin: apache/nifi

private Record handleResponse(InputStream is, Map<String, String> context) throws SchemaNotFoundException, MalformedRecordException, IOException {
  try (RecordReader reader = readerFactory.createRecordReader(context, is, getLogger())) {
    Record record = reader.nextRecord();
    if (recordPath != null) {
      Optional<FieldValue> fv = recordPath.evaluate(record).getSelectedFields().findFirst();
      if (fv.isPresent()) {
        FieldValue fieldValue = fv.get();
        RecordSchema schema = new SimpleRecordSchema(Collections.singletonList(fieldValue.getField()));
        Record temp;
        Object value = fieldValue.getValue();
        if (value instanceof Record) {
          temp = (Record) value;
        } else if (value instanceof Map) {
          temp = new MapRecord(schema, (Map<String, Object>) value);
        } else {
          Map<String, Object> val = new HashMap<>();
          val.put(fieldValue.getField().getFieldName(), value);
          temp = new MapRecord(schema, val);
        }
        record = temp;
      } else {
        record = null;
      }
    }
    return record;
  } catch (Exception ex) {
    is.close();
    throw ex;
  }
}

代码示例来源:origin: apache/nifi

@Override
public boolean moveNext() {
  currentRow = null;
  try {
    final Record record = recordParser.nextRecord();
    if (record == null) {
      // If we are out of data, close the InputStream. We do this because
      // Calcite does not necessarily call our close() method.
      close();
      try {
        onFinish();
      } catch (final Exception e) {
        logger.error("Failed to perform tasks when enumerator was finished", e);
      }
      return false;
    }
    currentRow = filterColumns(record);
  } catch (final Exception e) {
    throw new ProcessException("Failed to read next record in stream for " + flowFile + " due to " + e.getMessage(), e);
  }
  recordsRead++;
  return true;
}

代码示例来源:origin: apache/nifi

@Override
  public void process(final InputStream in, final OutputStream out) throws IOException {
    try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, getLogger())) {
      final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, reader.getSchema());
      try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out)) {
        writer.beginRecordSet();
        Record record;
        while ((record = reader.nextRecord()) != null) {
          final Record processed = AbstractRecordProcessor.this.process(record, writeSchema, original, context);
          writer.write(processed);
        }
        final WriteResult writeResult = writer.finishRecordSet();
        attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
        attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
        attributes.putAll(writeResult.getAttributes());
        recordCount.set(writeResult.getRecordCount());
      }
    } catch (final SchemaNotFoundException e) {
      throw new ProcessException(e.getLocalizedMessage(), e);
    } catch (final MalformedRecordException e) {
      throw new ProcessException("Could not parse incoming data", e);
    }
  }
});

代码示例来源:origin: apache/nifi

@Override
  public void process(final InputStream in) throws IOException {
    try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, getLogger())) {
      final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, reader.getSchema());
      Record record;
      while ((record = reader.nextRecord()) != null) {
        final Set<Relationship> relationships = route(record, writeSchema, original, context, flowFileContext);
        numRecords.incrementAndGet();
        for (final Relationship relationship : relationships) {
          final RecordSetWriter recordSetWriter;
          Tuple<FlowFile, RecordSetWriter> tuple = writers.get(relationship);
          if (tuple == null) {
            FlowFile outFlowFile = session.create(original);
            final OutputStream out = session.write(outFlowFile);
            recordSetWriter = writerFactory.createWriter(getLogger(), writeSchema, out);
            recordSetWriter.beginRecordSet();
            tuple = new Tuple<>(outFlowFile, recordSetWriter);
            writers.put(relationship, tuple);
          } else {
            recordSetWriter = tuple.getValue();
          }
          recordSetWriter.write(record);
        }
      }
    } catch (final SchemaNotFoundException | MalformedRecordException e) {
      throw new ProcessException("Could not parse incoming data", e);
    }
  }
});

代码示例来源:origin: apache/nifi

List<SolrInputDocument> inputDocumentList = new LinkedList<>();
try {
  while ((record = reader.nextRecord()) != null) {
  SolrInputDocument inputDoc = new SolrInputDocument();
  writeRecord(record, inputDoc,fieldList,EMPTY_STRING);

代码示例来源:origin: apache/nifi

Record record;
if (startIndex >= 0) {
  while ( index++ < startIndex && (reader.nextRecord()) != null) {}
while ((record = reader.nextRecord()) != null) {
  PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), recordPath, flowFile, rowFieldName, columnFamily,
      timestampFieldName, fieldEncodingStrategy, rowEncodingStrategy, complexFieldStrategy);

代码示例来源:origin: apache/nifi

RecordSchema schema = reader.getSchema();
Record record;
while ((record = reader.nextRecord()) != null) {

代码示例来源:origin: apache/nifi

while ((record = reader.nextRecord()) != null) {

代码示例来源:origin: apache/nifi

while ((record = reader.nextRecord()) != null) {

代码示例来源:origin: apache/nifi

while ((record = recordReader.nextRecord()) != null) {
  if (recordWriter == null) {
    final OutputStream rawOut = session.write(merged);

代码示例来源:origin: apache/nifi

batchStatement.setSerialConsistencyLevel(ConsistencyLevel.valueOf(serialConsistencyLevel));
while((record = reader.nextRecord()) != null) {
  Map<String, Object> recordContentMap = (Map<String, Object>) DataTypeUtils
      .convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema()));

代码示例来源:origin: apache/nifi

while ((record = reader.nextRecord()) != null) {
  Map<String, Object> recordMap = (Map<String, Object>) DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema()));

代码示例来源:origin: apache/nifi

while ((record = reader.nextRecord()) != null) {

代码示例来源:origin: apache/nifi

Record record = recordReader.nextRecord();
assertEquals(1, record.getValue("id"));
assertEquals("John", record.getValue("firstName"));
assertEquals("Doe", record.getValue("lastName"));
record = recordReader.nextRecord();
assertEquals(2, record.getValue("id"));
assertEquals("Jane", record.getValue("firstName"));

相关文章