本文整理了Java中org.apache.hadoop.io.SequenceFile
类的一些代码示例,展示了SequenceFile
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。SequenceFile
类的具体详情如下:
包路径:org.apache.hadoop.io.SequenceFile
类名称:SequenceFile
[英]SequenceFile
s are flat files consisting of binary key/value pairs.
SequenceFile
provides SequenceFile.Writer, SequenceFile.Reader and Sorter classes for writing, reading and sorting respectively.
There are three SequenceFile``Writer
s based on the CompressionType used to compress key/value pairs:
Writer
: Uncompressed records.RecordCompressWriter
: Record-compressed files, only compress values.BlockCompressWriter
: Block-compressed files, both keys & values are collected in 'blocks' separately and compressed. The size of the 'block' is configurable.The actual compression algorithm used to compress key and/or values can be specified by using the appropriate CompressionCodec.
The recommended way is to use the static createWriter methods provided by the SequenceFile
to chose the preferred format.
The SequenceFile.Reader acts as the bridge and can read any of the above SequenceFile
formats.
Essentially there are 3 different formats for SequenceFile
s depending on the CompressionType
specified. All of them share a common header described below.
CompressionCodec
class which is used for compression of keys and/or values (if compression is enabled).Record
Record length
A sync-marker every few 100
kilobytes or so.
Record
Record length
A sync-marker every few 100
kilobytes or so.
Record Block
Uncompressed number of records in the block
A sync-marker every block.
The compressed blocks of key lengths and value lengths consist of the actual lengths of individual keys/values encoded in ZeroCompressedInteger format.
[中]SequenceFile
是由二进制键/值对组成的平面文件。SequenceFile
提供SequenceFile。编剧,音序文件。分别用于书写、阅读和排序的Reader和Sorter类。
根据用于压缩键/值对的压缩类型,有三个SequenceFile``Writer
s:
1.[$4$]:未压缩的记录。
1.[$5$]:记录压缩文件,仅压缩值。
1.BlockCompressWriter
:块压缩文件,键和值分别收集在“块”中并压缩。“块”的大小是可配置的。
用于压缩密钥和/或值的实际压缩算法可以通过使用适当的压缩编解码器来指定。
建议使用SequenceFile
提供的静态createWriter方法来选择首选格式。
序列文件。Reader充当桥梁,可以读取上述任何SequenceFile
格式。
#####序列文件格式
根据指定的CompressionType
,基本上有3种不同的SequenceFile
格式。它们都共享一个{$0$}如下所述。
#####序列文件头
*版本-3字节的魔法头序列,后跟1字节的实际版本号(例如,序列4或序列6)
*keyClassName-密钥类
*valueClassName-值类
*压缩-一个布尔值,指定是否为此文件中的键/值打开压缩。
*blockCompression—一个布尔值,指定是否为此文件中的键/值启用块压缩。
*压缩编解码器-CompressionCodec
类,用于压缩密钥和/或值(如果已启用压缩)。
*元数据-此文件的元数据。
*sync-一个同步标记,表示报头的结尾。
#####未压缩序列文件格式
100
千字节就有一个同步标记。100
千字节就有一个同步标记。代码示例来源:origin: apache/flink
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
FileSystem fs = FileSystem.get(URI.create(sequenceFile.getAbsolutePath()), conf);
Path path = new Path(sequenceFile.getAbsolutePath());
Text value = new Text();
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass());
for (int i = 0; i < kvCount; i++) {
if (i == 1) {
value.set(i + " - somestring");
writer.append(key, value);
value.set(i + " - somestring");
writer.append(key, value);
path = new Path(sequenceFileInPathNull);
writer1 = SequenceFile.createWriter(fs, conf, path, NullWritable.class, value1.getClass());
for (int i = 0; i < kvCount; i++) {
value1.set(i);
writer1.append(NullWritable.get(), value1);
代码示例来源:origin: apache/avro
/**
* Gets type of compression for the output sequence file.
*
* @param conf The job configuration.
* @return The compression type.
*/
public static CompressionType getOutputCompressionType(Configuration conf) {
String typeName = conf.get(FileOutputFormat.COMPRESS_TYPE);
if (typeName != null) {
return CompressionType.valueOf(typeName);
}
return SequenceFile.getDefaultCompressionType(conf);
}
}
代码示例来源:origin: apache/kylin
@Override
public void doCleanup(Context context) throws IOException, InterruptedException {
mos.close();
Path outputDirBase = new Path(context.getConfiguration().get(FileOutputFormat.OUTDIR), PathNameCuboidBase);
FileSystem fs = FileSystem.get(context.getConfiguration());
if (!fs.exists(outputDirBase)) {
fs.mkdirs(outputDirBase);
SequenceFile
.createWriter(context.getConfiguration(),
SequenceFile.Writer.file(new Path(outputDirBase, "part-m-00000")),
SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class))
.close();
}
}
代码示例来源:origin: linkedin/camus
private void writePrevious(Collection<EtlKey> missedKeys, JobContext context) throws IOException {
FileSystem fs = FileSystem.get(context.getConfiguration());
Path output = FileOutputFormat.getOutputPath(context);
if (fs.exists(output)) {
fs.mkdirs(output);
}
output = new Path(output, EtlMultiOutputFormat.OFFSET_PREFIX + "-previous");
SequenceFile.Writer writer =
SequenceFile.createWriter(fs, context.getConfiguration(), output, EtlKey.class, NullWritable.class);
for (EtlKey key : missedKeys) {
writer.append(key, NullWritable.get());
}
writer.close();
}
代码示例来源:origin: linkedin/camus
FileSystem fs = FileSystem.get(context.getConfiguration());
if (EtlMultiOutputFormat.isRunMoveData(context)) {
Path workPath = super.getWorkPath();
getPartitionedPath(context, file, count.getEventCount(), count.getLastKey().getOffset());
Path dest = new Path(baseOutDir, partitionedFile);
if (!fs.exists(dest.getParent())) {
mkdirs(fs, dest.getParent());
Path tempPath = new Path(workPath, "counts." + context.getConfiguration().get("mapred.task.id"));
OutputStream outputStream = new BufferedOutputStream(fs.create(tempPath));
ObjectMapper mapper = new ObjectMapper();
SequenceFile.Writer offsetWriter = SequenceFile.createWriter(fs, context.getConfiguration(),
new Path(super.getWorkPath(),
EtlMultiOutputFormat.getUniqueFile(context, EtlMultiOutputFormat.OFFSET_PREFIX, "")),
log.info("Avg record size for " + offsets.get(s).getTopic() + ":" + offsets.get(s).getPartition() + " = "
+ offsets.get(s).getMessageSize());
offsetWriter.append(offsets.get(s), NullWritable.get());
offsetWriter.close();
super.commitTask(context);
代码示例来源:origin: intel-hadoop/HiBench
private static void createControlFile(
FileSystem fs,
int fileSize, // in MB
int nrFiles
) throws IOException {
LOG.info("creating control file: "+fileSize+" mega bytes, "+nrFiles+" files");
fs.delete(CONTROL_DIR, true);
for(int i=0; i < nrFiles; i++) {
String name = getFileName(i);
Path controlFile = new Path(CONTROL_DIR, "in_file_" + name);
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(fs, fsConfig, controlFile,
Text.class, LongWritable.class,
CompressionType.NONE);
writer.append(new Text(name), new LongWritable(fileSize));
} catch(Exception e) {
throw new IOException(e.getLocalizedMessage());
} finally {
if (writer != null)
writer.close();
writer = null;
}
}
LOG.info("created control files for: "+nrFiles+" files");
}
代码示例来源:origin: ch.cern.hadoop/hadoop-common
/**
* Test that makes sure createWriter succeeds on a file that was
* already created
* @throws IOException
*/
public void testCreateWriterOnExistingFile() throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
Path name = new Path(new Path(System.getProperty("test.build.data","."),
"createWriterOnExistingFile") , "file");
fs.create(name);
SequenceFile.createWriter(fs, conf, name, RandomDatum.class,
RandomDatum.class, 512, (short) 1, 4096, false,
CompressionType.NONE, null, new Metadata());
}
代码示例来源:origin: org.apache.mahout/mahout-utils
private static VectorWriter getSeqFileWriter(String outFile) throws IOException {
Path path = new Path(outFile);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
SequenceFile.Writer seqWriter = SequenceFile.createWriter(fs, conf, path, LongWritable.class,
VectorWritable.class);
return new SequenceFileVectorWriter(seqWriter);
}
代码示例来源:origin: apache/kylin
if (fs.exists(outputFolder) == false) {
fs.mkdirs(outputFolder);
int compactionThreshold = Integer.parseInt(hbaseConf.get("hbase.hstore.compactionThreshold", "3"));
logger.info("hbase.hstore.compactionThreshold is {0}", String.valueOf(compactionThreshold));
if (hfileSizeMB > 0.0f && hfileSizeMB * compactionThreshold < mbPerRegion) {
final Path hfilePartitionFile = new Path(outputFolder, "part-r-00000_hfile");
short regionCount = (short) innerRegionSplits.size();
try (SequenceFile.Writer hfilePartitionWriter = SequenceFile.createWriter(hbaseConf,
SequenceFile.Writer.file(hfilePartitionFile), SequenceFile.Writer.keyClass(RowKeyWritable.class),
SequenceFile.Writer.valueClass(NullWritable.class))) {
hfilePartitionWriter.append(
new RowKeyWritable(KeyValueUtil.createFirstOnRow(splits.get(i), 9223372036854775807L).createKeyOnly(false).getKey()),
NullWritable.get());
代码示例来源:origin: KylinOLAP/Kylin
public static void copyTo64MB(String src, String dst) throws IOException {
Configuration hconf = new Configuration();
Path srcPath = new Path(src);
Path dstPath = new Path(dst);
FileSystem fs = FileSystem.get(hconf);
long srcSize = fs.getFileStatus(srcPath).getLen();
int copyTimes = (int) (67108864 / srcSize); // 64 MB
System.out.println("Copy " + copyTimes + " times");
Reader reader = new Reader(hconf, SequenceFile.Reader.file(srcPath));
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf);
Text value = new Text();
Writer writer = SequenceFile.createWriter(hconf, Writer.file(dstPath), Writer.keyClass(key.getClass()), Writer.valueClass(Text.class), Writer.compression(CompressionType.BLOCK, getLZOCodec(hconf)));
int count = 0;
while (reader.next(key, value)) {
for (int i = 0; i < copyTimes; i++) {
writer.append(key, value);
count++;
}
}
System.out.println("Len: " + writer.getLength());
System.out.println("Rows: " + count);
reader.close();
writer.close();
}
代码示例来源:origin: com.presidentio.but/format
@Override
public void write(String line) throws IOException {
if (writer == null) {
tmpFile = File.createTempFile("seq-", ".dat");
writer = SequenceFile.createWriter(new Configuration(), Writer.file(new Path(tmpFile.toURI())),
Writer.keyClass(NullWritable.class), Writer.valueClass(Text.class));
}
text.set(line);
writer.append(NullWritable.get(), text);
}
代码示例来源:origin: sequenceiq/sequenceiq-samples
/**
* Reduce task done, write output to a file.
*/
@Override
public void cleanup(Context context) throws IOException {
//write output to a file
Configuration conf = context.getConfiguration();
Path outDir = new Path(conf.get(FileOutputFormat.OUTDIR));
Path outFile = new Path(outDir, "reduce-out");
FileSystem fileSys = FileSystem.get(conf);
SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
outFile, LongWritable.class, LongWritable.class,
CompressionType.NONE);
writer.append(new LongWritable(numInside), new LongWritable(numOutside));
writer.close();
}
}
代码示例来源:origin: org.apache.hadoop/hadoop-mapred-test
public void run() {
try {
for(int i=start; i < end; i++) {
String name = getFileName(i);
Path controlFile = new Path(INPUT_DIR, "in_file_" + name);
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(fs, fs.getConf(), controlFile,
Text.class, LongWritable.class,
CompressionType.NONE);
String logFile = jhLogFiles[i].getPath().toString();
writer.append(new Text(logFile), new LongWritable(0));
} catch(Exception e) {
throw new IOException(e);
} finally {
if (writer != null)
writer.close();
writer = null;
}
}
} catch(IOException ex) {
LOG.error("FileCreateDaemon failed.", ex);
}
numFinishedThreads++;
}
代码示例来源:origin: apache/storm
private static void createSeqFile(FileSystem fs, Path file, int rowCount) throws IOException {
Configuration conf = new Configuration();
try {
if (fs.exists(file)) {
fs.delete(file, false);
}
SequenceFile.Writer w = SequenceFile.createWriter(fs, conf, file, IntWritable.class, Text.class);
for (int i = 0; i < rowCount; i++) {
w.append(new IntWritable(i), new Text("line " + i));
}
w.close();
System.out.println("done");
} catch (IOException e) {
e.printStackTrace();
}
}
代码示例来源:origin: apache/incubator-gobblin
public void put(String storeName, String tableName, T state) throws IOException {
String tmpTableName = this.useTmpFileForPut ? TMP_FILE_PREFIX + tableName : tableName;
Path tmpTablePath = new Path(new Path(this.storeRootDir, storeName), tmpTableName);
if (!this.fs.exists(tmpTablePath) && !create(storeName, tmpTableName)) {
throw new IOException("Failed to create a state file for table " + tmpTableName);
try {
@SuppressWarnings("deprecation")
SequenceFile.Writer writer = closer.register(SequenceFile.createWriter(this.fs, this.conf, tmpTablePath,
Text.class, this.stateClass, SequenceFile.CompressionType.BLOCK, new DefaultCodec()));
writer.append(new Text(Strings.nullToEmpty(state.getId())), state);
} catch (Throwable t) {
throw closer.rethrow(t);
代码示例来源:origin: pinterest/secor
public SequenceFileWriter(LogFilePath path, CompressionCodec codec) throws IOException {
Configuration config = new Configuration();
fsPath = new Path(path.getLogFilePath());
FileSystem fs = FileUtil.getFileSystem(path.getLogFilePath());
if (codec != null) {
this.mWriter = SequenceFile.createWriter(fs, config, fsPath,
LongWritable.class, BytesWritable.class,
SequenceFile.CompressionType.BLOCK, codec);
} else {
this.mWriter = SequenceFile.createWriter(fs, config, fsPath,
LongWritable.class, BytesWritable.class);
}
this.mKey = new LongWritable();
this.mValue = new BytesWritable();
LOG.info("Created sequence file writer: {}", fsPath);
}
代码示例来源:origin: apache/incubator-gobblin
@Test
@SuppressWarnings("deprecation")
public void testSerializeToSequenceFile() throws IOException {
Closer closer = Closer.create();
Configuration conf = new Configuration();
WritableShimSerialization.addToHadoopConfiguration(conf);
try {
SequenceFile.Writer writer1 = closer.register(SequenceFile.createWriter(this.fs, conf,
new Path(this.outputPath, "seq1"), Text.class, WorkUnitState.class));
Text key = new Text();
WorkUnitState workUnitState = new WorkUnitState();
TestWatermark watermark = new TestWatermark();
watermark.setLongWatermark(10L);
workUnitState.setActualHighWatermark(watermark);
writer1.append(key, workUnitState);
SequenceFile.Writer writer2 = closer.register(SequenceFile.createWriter(this.fs, conf,
new Path(this.outputPath, "seq2"), Text.class, WorkUnitState.class));
watermark.setLongWatermark(100L);
workUnitState.setActualHighWatermark(watermark);
writer2.append(key, workUnitState);
} catch (Throwable t) {
throw closer.rethrow(t);
} finally {
closer.close();
}
}
代码示例来源:origin: org.apache.hadoop/hadoop-mapred-test
private static <T extends WritableComparable<?>> Path writePartitionFile(
String testname, Configuration conf, T[] splits) throws IOException {
final FileSystem fs = FileSystem.getLocal(conf);
final Path testdir = new Path(System.getProperty("test.build.data", "/tmp")
).makeQualified(fs);
Path p = new Path(testdir, testname + "/_partition.lst");
TotalOrderPartitioner.setPartitionFile(conf, p);
conf.setInt(MRJobConfig.NUM_REDUCES, splits.length + 1);
SequenceFile.Writer w = null;
try {
w = SequenceFile.createWriter(fs, conf, p,
splits[0].getClass(), NullWritable.class,
SequenceFile.CompressionType.NONE);
for (int i = 0; i < splits.length; ++i) {
w.append(splits[i], NullWritable.get());
}
} finally {
if (null != w)
w.close();
}
return p;
}
代码示例来源:origin: larsgeorge/hbase-book
RawComparator<K> comparator = (RawComparator<K>) job.getGroupingComparator();
Arrays.sort(samples, comparator);
Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf));
FileSystem fs = dst.getFileSystem(conf);
if (fs.exists(dst)) fs.delete(dst, false);
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, dst, job.getMapOutputKeyClass(), NullWritable.class);
NullWritable nullValue = NullWritable.get();
float stepSize = samples.length / (float) numPartitions;
writer.append(currentKey, nullValue);
lastKey = currentKey;
lastKeyIndex = currentKeyOffset;
splits.add(currentKey);
writer.close();
LOG.info("********************************************* ");
LOG.info(" START KEYs for new Regions: ");
代码示例来源:origin: org.apache.hadoop/hadoop-common
+ "must be set");
this.indexInterval = conf.getInt(INDEX_INTERVAL, this.indexInterval);
FileSystem fs = dirName.getFileSystem(conf);
if (!fs.mkdirs(dirName)) {
throw new IOException("Mkdirs failed to create directory " + dirName);
Path dataFile = new Path(dirName, DATA_FILE_NAME);
Path indexFile = new Path(dirName, INDEX_FILE_NAME);
SequenceFile.Writer.file(dataFile),
SequenceFile.Writer.keyClass(keyClass));
this.data = SequenceFile.createWriter(conf, dataOptions);
Options.prependOptions(opts, SequenceFile.Writer.file(indexFile),
SequenceFile.Writer.keyClass(keyClass),
SequenceFile.Writer.valueClass(LongWritable.class),
SequenceFile.Writer.compression(CompressionType.BLOCK));
this.index = SequenceFile.createWriter(conf, indexOptions);
内容来源于网络,如有侵权,请联系作者删除!