org.apache.hadoop.io.SequenceFile类的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(16.8k)|赞(0)|评价(0)|浏览(138)

本文整理了Java中org.apache.hadoop.io.SequenceFile类的一些代码示例,展示了SequenceFile类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。SequenceFile类的具体详情如下:
包路径:org.apache.hadoop.io.SequenceFile
类名称:SequenceFile

SequenceFile介绍

[英]SequenceFiles are flat files consisting of binary key/value pairs.

SequenceFile provides SequenceFile.Writer, SequenceFile.Reader and Sorter classes for writing, reading and sorting respectively.

There are three SequenceFile``Writers based on the CompressionType used to compress key/value pairs:

  1. Writer : Uncompressed records.
  2. RecordCompressWriter : Record-compressed files, only compress values.
  3. BlockCompressWriter : Block-compressed files, both keys & values are collected in 'blocks' separately and compressed. The size of the 'block' is configurable.

The actual compression algorithm used to compress key and/or values can be specified by using the appropriate CompressionCodec.

The recommended way is to use the static createWriter methods provided by the SequenceFile to chose the preferred format.

The SequenceFile.Reader acts as the bridge and can read any of the above SequenceFile formats.

SequenceFile Formats

Essentially there are 3 different formats for SequenceFiles depending on the CompressionType specified. All of them share a common header described below.

SequenceFile Header
  • version - 3 bytes of magic header SEQ, followed by 1 byte of actual version number (e.g. SEQ4 or SEQ6)
  • keyClassName -key class
  • valueClassName - value class
  • compression - A boolean which specifies if compression is turned on for keys/values in this file.
  • blockCompression - A boolean which specifies if block-compression is turned on for keys/values in this file.
  • compression codec - CompressionCodec class which is used for compression of keys and/or values (if compression is enabled).
  • metadata - Metadata for this file.
  • sync - A sync marker to denote end of the header.
Uncompressed SequenceFile Format
  • Header

  • Record

  • Record length

    • Key length
    • Key
    • Value
  • A sync-marker every few 100 kilobytes or so.

Record-Compressed SequenceFile Format
  • Header

  • Record

  • Record length

    • Key length
    • Key
    • Compressed Value
  • A sync-marker every few 100 kilobytes or so.

Block-Compressed SequenceFile Format
  • Header

  • Record Block

  • Uncompressed number of records in the block

    • Compressed key-lengths block-size
    • Compressed key-lengths block
    • Compressed keys block-size
    • Compressed keys block
    • Compressed value-lengths block-size
    • Compressed value-lengths block
    • Compressed values block-size
    • Compressed values block
  • A sync-marker every block.

The compressed blocks of key lengths and value lengths consist of the actual lengths of individual keys/values encoded in ZeroCompressedInteger format.
[中]SequenceFile是由二进制键/值对组成的平面文件。
SequenceFile提供SequenceFile。编剧,音序文件。分别用于书写、阅读和排序的Reader和Sorter类。
根据用于压缩键/值对的压缩类型,有三个SequenceFile``Writers:
1.[$4$]:未压缩的记录。
1.[$5$]:记录压缩文件,仅压缩值。
1.BlockCompressWriter:块压缩文件,键和值分别收集在“块”中并压缩。“块”的大小是可配置的。
用于压缩密钥和/或值的实际压缩算法可以通过使用适当的压缩编解码器来指定。
建议使用SequenceFile提供的静态createWriter方法来选择首选格式。
序列文件。Reader充当桥梁,可以读取上述任何SequenceFile格式。
#####序列文件格式
根据指定的CompressionType,基本上有3种不同的SequenceFile格式。它们都共享一个{$0$}如下所述。
#####序列文件头
*版本-3字节的魔法头序列,后跟1字节的实际版本号(例如,序列4或序列6)
*keyClassName-密钥类
*valueClassName-值类
*压缩-一个布尔值,指定是否为此文件中的键/值打开压缩。
*blockCompression—一个布尔值,指定是否为此文件中的键/值启用块压缩。
*压缩编解码器-CompressionCodec类,用于压缩密钥和/或值(如果已启用压缩)。
*元数据-此文件的元数据。
*sync-一个同步标记,表示报头的结尾。
#####未压缩序列文件格式

  • Header
    *记录
    *记录长度
    *键长
    *钥匙
    *价值观
    *每隔大约100千字节就有一个同步标记。
    #####记录压缩序列文件格式
  • Header
    *记录
    *记录长度
    *键长
    *钥匙
    *压缩值
    *每隔大约100千字节就有一个同步标记。
    #####块压缩序列文件格式
  • Header
    *记录块
    *块中未压缩的记录数
    *压缩密钥长度块大小
    *压缩密钥长度块
    *压缩密钥块大小
    *压缩密钥块
    *压缩值长度块大小
    *压缩值长度块
    *压缩值块大小
    *压缩值块
    *每个街区都有一个同步标记。
    密钥长度和值长度的压缩块由以ZeroCompressedInteger格式编码的单个密钥/值的实际长度组成。

代码示例

代码示例来源:origin: apache/flink

org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
FileSystem fs = FileSystem.get(URI.create(sequenceFile.getAbsolutePath()), conf);
Path path = new Path(sequenceFile.getAbsolutePath());
Text value = new Text();
SequenceFile.Writer writer = null;
try {
  writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass());
  for (int i = 0; i < kvCount; i++) {
    if (i == 1) {
        value.set(i + " - somestring");
        writer.append(key, value);
    value.set(i + " - somestring");
    writer.append(key, value);
path = new Path(sequenceFileInPathNull);
  writer1 = SequenceFile.createWriter(fs, conf, path, NullWritable.class, value1.getClass());
  for (int i = 0; i < kvCount; i++) {
    value1.set(i);
    writer1.append(NullWritable.get(), value1);

代码示例来源:origin: apache/avro

/**
  * Gets type of compression for the output sequence file.
  *
  * @param conf The job configuration.
  * @return The compression type.
  */
 public static CompressionType getOutputCompressionType(Configuration conf) {
  String typeName = conf.get(FileOutputFormat.COMPRESS_TYPE);
  if (typeName != null) {
   return CompressionType.valueOf(typeName);
  }
  return SequenceFile.getDefaultCompressionType(conf);
 }
}

代码示例来源:origin: apache/kylin

@Override
public void doCleanup(Context context) throws IOException, InterruptedException {
  mos.close();
  Path outputDirBase = new Path(context.getConfiguration().get(FileOutputFormat.OUTDIR), PathNameCuboidBase);
  FileSystem fs = FileSystem.get(context.getConfiguration());
  if (!fs.exists(outputDirBase)) {
    fs.mkdirs(outputDirBase);
    SequenceFile
        .createWriter(context.getConfiguration(),
            SequenceFile.Writer.file(new Path(outputDirBase, "part-m-00000")),
            SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class))
        .close();
  }
}

代码示例来源:origin: linkedin/camus

private void writePrevious(Collection<EtlKey> missedKeys, JobContext context) throws IOException {
 FileSystem fs = FileSystem.get(context.getConfiguration());
 Path output = FileOutputFormat.getOutputPath(context);
 if (fs.exists(output)) {
  fs.mkdirs(output);
 }
 output = new Path(output, EtlMultiOutputFormat.OFFSET_PREFIX + "-previous");
 SequenceFile.Writer writer =
   SequenceFile.createWriter(fs, context.getConfiguration(), output, EtlKey.class, NullWritable.class);
 for (EtlKey key : missedKeys) {
  writer.append(key, NullWritable.get());
 }
 writer.close();
}

代码示例来源:origin: linkedin/camus

FileSystem fs = FileSystem.get(context.getConfiguration());
if (EtlMultiOutputFormat.isRunMoveData(context)) {
 Path workPath = super.getWorkPath();
     getPartitionedPath(context, file, count.getEventCount(), count.getLastKey().getOffset());
   Path dest = new Path(baseOutDir, partitionedFile);
   if (!fs.exists(dest.getParent())) {
    mkdirs(fs, dest.getParent());
  Path tempPath = new Path(workPath, "counts." + context.getConfiguration().get("mapred.task.id"));
  OutputStream outputStream = new BufferedOutputStream(fs.create(tempPath));
  ObjectMapper mapper = new ObjectMapper();
SequenceFile.Writer offsetWriter = SequenceFile.createWriter(fs, context.getConfiguration(),
  new Path(super.getWorkPath(),
    EtlMultiOutputFormat.getUniqueFile(context, EtlMultiOutputFormat.OFFSET_PREFIX, "")),
 log.info("Avg record size for " + offsets.get(s).getTopic() + ":" + offsets.get(s).getPartition() + " = "
   + offsets.get(s).getMessageSize());
 offsetWriter.append(offsets.get(s), NullWritable.get());
offsetWriter.close();
super.commitTask(context);

代码示例来源:origin: intel-hadoop/HiBench

private static void createControlFile(
                   FileSystem fs,
                   int fileSize, // in MB 
                   int nrFiles
                   ) throws IOException {
 LOG.info("creating control file: "+fileSize+" mega bytes, "+nrFiles+" files");
 fs.delete(CONTROL_DIR, true);
 for(int i=0; i < nrFiles; i++) {
  String name = getFileName(i);
  Path controlFile = new Path(CONTROL_DIR, "in_file_" + name);
  SequenceFile.Writer writer = null;
  try {
   writer = SequenceFile.createWriter(fs, fsConfig, controlFile,
                     Text.class, LongWritable.class,
                     CompressionType.NONE);
   writer.append(new Text(name), new LongWritable(fileSize));
  } catch(Exception e) {
   throw new IOException(e.getLocalizedMessage());
  } finally {
   if (writer != null)
    writer.close();
   writer = null;
  }
 }
 LOG.info("created control files for: "+nrFiles+" files");
}

代码示例来源:origin: ch.cern.hadoop/hadoop-common

/**
 * Test that makes sure createWriter succeeds on a file that was 
 * already created
 * @throws IOException
 */
public void testCreateWriterOnExistingFile() throws IOException {
 Configuration conf = new Configuration();
 FileSystem fs = FileSystem.getLocal(conf);
 Path name = new Path(new Path(System.getProperty("test.build.data","."),
   "createWriterOnExistingFile") , "file");
 fs.create(name);
 SequenceFile.createWriter(fs, conf, name, RandomDatum.class,
   RandomDatum.class, 512, (short) 1, 4096, false,
   CompressionType.NONE, null, new Metadata());
}

代码示例来源:origin: org.apache.mahout/mahout-utils

private static VectorWriter getSeqFileWriter(String outFile) throws IOException {
 Path path = new Path(outFile);
 Configuration conf = new Configuration();
 FileSystem fs = FileSystem.get(conf);
 SequenceFile.Writer seqWriter = SequenceFile.createWriter(fs, conf, path, LongWritable.class,
  VectorWritable.class);
 return new SequenceFileVectorWriter(seqWriter);
}

代码示例来源:origin: apache/kylin

if (fs.exists(outputFolder) == false) {
  fs.mkdirs(outputFolder);
int compactionThreshold = Integer.parseInt(hbaseConf.get("hbase.hstore.compactionThreshold", "3"));
logger.info("hbase.hstore.compactionThreshold is {0}", String.valueOf(compactionThreshold));
if (hfileSizeMB > 0.0f && hfileSizeMB * compactionThreshold < mbPerRegion) {
final Path hfilePartitionFile = new Path(outputFolder, "part-r-00000_hfile");
short regionCount = (short) innerRegionSplits.size();
try (SequenceFile.Writer hfilePartitionWriter = SequenceFile.createWriter(hbaseConf,
    SequenceFile.Writer.file(hfilePartitionFile), SequenceFile.Writer.keyClass(RowKeyWritable.class),
    SequenceFile.Writer.valueClass(NullWritable.class))) {
    hfilePartitionWriter.append(
        new RowKeyWritable(KeyValueUtil.createFirstOnRow(splits.get(i), 9223372036854775807L).createKeyOnly(false).getKey()),
        NullWritable.get());

代码示例来源:origin: KylinOLAP/Kylin

public static void copyTo64MB(String src, String dst) throws IOException {
  Configuration hconf = new Configuration();
  Path srcPath = new Path(src);
  Path dstPath = new Path(dst);
  FileSystem fs = FileSystem.get(hconf);
  long srcSize = fs.getFileStatus(srcPath).getLen();
  int copyTimes = (int) (67108864 / srcSize); // 64 MB
  System.out.println("Copy " + copyTimes + " times");
  Reader reader = new Reader(hconf, SequenceFile.Reader.file(srcPath));
  Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf);
  Text value = new Text();
  Writer writer = SequenceFile.createWriter(hconf, Writer.file(dstPath), Writer.keyClass(key.getClass()), Writer.valueClass(Text.class), Writer.compression(CompressionType.BLOCK, getLZOCodec(hconf)));
  int count = 0;
  while (reader.next(key, value)) {
    for (int i = 0; i < copyTimes; i++) {
      writer.append(key, value);
      count++;
    }
  }
  System.out.println("Len: " + writer.getLength());
  System.out.println("Rows: " + count);
  reader.close();
  writer.close();
}

代码示例来源:origin: com.presidentio.but/format

@Override
public void write(String line) throws IOException {
  if (writer == null) {
    tmpFile = File.createTempFile("seq-", ".dat");
    writer = SequenceFile.createWriter(new Configuration(), Writer.file(new Path(tmpFile.toURI())),
        Writer.keyClass(NullWritable.class), Writer.valueClass(Text.class));
  }
  text.set(line);
  writer.append(NullWritable.get(), text);
}

代码示例来源:origin: sequenceiq/sequenceiq-samples

/**
  * Reduce task done, write output to a file.
  */
 @Override
 public void cleanup(Context context) throws IOException {
  //write output to a file
  Configuration conf = context.getConfiguration();
  Path outDir = new Path(conf.get(FileOutputFormat.OUTDIR));
  Path outFile = new Path(outDir, "reduce-out");
  FileSystem fileSys = FileSystem.get(conf);
  SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
    outFile, LongWritable.class, LongWritable.class, 
    CompressionType.NONE);
  writer.append(new LongWritable(numInside), new LongWritable(numOutside));
  writer.close();
 }
}

代码示例来源:origin: org.apache.hadoop/hadoop-mapred-test

public void run() {
 try {
  for(int i=start; i < end; i++) {
   String name = getFileName(i);
   Path controlFile = new Path(INPUT_DIR, "in_file_" + name);
   SequenceFile.Writer writer = null;
   try {
    writer = SequenceFile.createWriter(fs, fs.getConf(), controlFile,
                      Text.class, LongWritable.class,
                      CompressionType.NONE);
    String logFile = jhLogFiles[i].getPath().toString();
    writer.append(new Text(logFile), new LongWritable(0));
   } catch(Exception e) {
    throw new IOException(e);
   } finally {
    if (writer != null)
     writer.close();
    writer = null;
   }
  }
 } catch(IOException ex) {
  LOG.error("FileCreateDaemon failed.", ex);
 }
 numFinishedThreads++;
}

代码示例来源:origin: apache/storm

private static void createSeqFile(FileSystem fs, Path file, int rowCount) throws IOException {
  Configuration conf = new Configuration();
  try {
    if (fs.exists(file)) {
      fs.delete(file, false);
    }
    SequenceFile.Writer w = SequenceFile.createWriter(fs, conf, file, IntWritable.class, Text.class);
    for (int i = 0; i < rowCount; i++) {
      w.append(new IntWritable(i), new Text("line " + i));
    }
    w.close();
    System.out.println("done");
  } catch (IOException e) {
    e.printStackTrace();
  }
}

代码示例来源:origin: apache/incubator-gobblin

public void put(String storeName, String tableName, T state) throws IOException {
 String tmpTableName = this.useTmpFileForPut ? TMP_FILE_PREFIX + tableName : tableName;
 Path tmpTablePath = new Path(new Path(this.storeRootDir, storeName), tmpTableName);
 if (!this.fs.exists(tmpTablePath) && !create(storeName, tmpTableName)) {
  throw new IOException("Failed to create a state file for table " + tmpTableName);
 try {
  @SuppressWarnings("deprecation")
  SequenceFile.Writer writer = closer.register(SequenceFile.createWriter(this.fs, this.conf, tmpTablePath,
    Text.class, this.stateClass, SequenceFile.CompressionType.BLOCK, new DefaultCodec()));
  writer.append(new Text(Strings.nullToEmpty(state.getId())), state);
 } catch (Throwable t) {
  throw closer.rethrow(t);

代码示例来源:origin: pinterest/secor

public SequenceFileWriter(LogFilePath path, CompressionCodec codec) throws IOException {
  Configuration config = new Configuration();
  fsPath = new Path(path.getLogFilePath());
  FileSystem fs = FileUtil.getFileSystem(path.getLogFilePath());
  if (codec != null) {
    this.mWriter = SequenceFile.createWriter(fs, config, fsPath,
        LongWritable.class, BytesWritable.class,
        SequenceFile.CompressionType.BLOCK, codec);
  } else {
    this.mWriter = SequenceFile.createWriter(fs, config, fsPath,
        LongWritable.class, BytesWritable.class);
  }
  this.mKey = new LongWritable();
  this.mValue = new BytesWritable();
  LOG.info("Created sequence file writer: {}", fsPath);
}

代码示例来源:origin: apache/incubator-gobblin

@Test
@SuppressWarnings("deprecation")
public void testSerializeToSequenceFile() throws IOException {
 Closer closer = Closer.create();
 Configuration conf = new Configuration();
 WritableShimSerialization.addToHadoopConfiguration(conf);
 try {
  SequenceFile.Writer writer1 = closer.register(SequenceFile.createWriter(this.fs, conf,
    new Path(this.outputPath, "seq1"), Text.class, WorkUnitState.class));
  Text key = new Text();
  WorkUnitState workUnitState = new WorkUnitState();
  TestWatermark watermark = new TestWatermark();
  watermark.setLongWatermark(10L);
  workUnitState.setActualHighWatermark(watermark);
  writer1.append(key, workUnitState);
  SequenceFile.Writer writer2 = closer.register(SequenceFile.createWriter(this.fs, conf,
    new Path(this.outputPath, "seq2"), Text.class, WorkUnitState.class));
  watermark.setLongWatermark(100L);
  workUnitState.setActualHighWatermark(watermark);
  writer2.append(key, workUnitState);
 } catch (Throwable t) {
  throw closer.rethrow(t);
 } finally {
  closer.close();
 }
}

代码示例来源:origin: org.apache.hadoop/hadoop-mapred-test

private static <T extends WritableComparable<?>> Path writePartitionFile(
  String testname, Configuration conf, T[] splits) throws IOException {
 final FileSystem fs = FileSystem.getLocal(conf);
 final Path testdir = new Path(System.getProperty("test.build.data", "/tmp")
                ).makeQualified(fs);
 Path p = new Path(testdir, testname + "/_partition.lst");
 TotalOrderPartitioner.setPartitionFile(conf, p);
 conf.setInt(MRJobConfig.NUM_REDUCES, splits.length + 1);
 SequenceFile.Writer w = null;
 try {
  w = SequenceFile.createWriter(fs, conf, p,
    splits[0].getClass(), NullWritable.class,
    SequenceFile.CompressionType.NONE);
  for (int i = 0; i < splits.length; ++i) {
   w.append(splits[i], NullWritable.get());
  }
 } finally {
  if (null != w)
   w.close();
 }
 return p;
}

代码示例来源:origin: larsgeorge/hbase-book

RawComparator<K> comparator = (RawComparator<K>) job.getGroupingComparator();
Arrays.sort(samples, comparator);
Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf));
FileSystem fs = dst.getFileSystem(conf);
if (fs.exists(dst)) fs.delete(dst, false);
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, dst, job.getMapOutputKeyClass(), NullWritable.class);
NullWritable nullValue = NullWritable.get();
float stepSize = samples.length / (float) numPartitions;
 writer.append(currentKey, nullValue);
 lastKey = currentKey;
 lastKeyIndex = currentKeyOffset;
 splits.add(currentKey);
writer.close();
LOG.info("*********************************************  ");
LOG.info(" START KEYs for new Regions:  ");

代码示例来源:origin: org.apache.hadoop/hadoop-common

+ "must be set");
this.indexInterval = conf.getInt(INDEX_INTERVAL, this.indexInterval);
FileSystem fs = dirName.getFileSystem(conf);
if (!fs.mkdirs(dirName)) {
 throw new IOException("Mkdirs failed to create directory " + dirName);
Path dataFile = new Path(dirName, DATA_FILE_NAME);
Path indexFile = new Path(dirName, INDEX_FILE_NAME);
             SequenceFile.Writer.file(dataFile),
             SequenceFile.Writer.keyClass(keyClass));
this.data = SequenceFile.createWriter(conf, dataOptions);
 Options.prependOptions(opts, SequenceFile.Writer.file(indexFile),
   SequenceFile.Writer.keyClass(keyClass),
   SequenceFile.Writer.valueClass(LongWritable.class),
   SequenceFile.Writer.compression(CompressionType.BLOCK));
this.index = SequenceFile.createWriter(conf, indexOptions);

相关文章