org.apache.hadoop.io.Writable类的使用及代码示例

x33g5p2x  于2022-02-02 转载在 其他  
字(9.0k)|赞(0)|评价(0)|浏览(108)

本文整理了Java中org.apache.hadoop.io.Writable类的一些代码示例,展示了Writable类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Writable类的具体详情如下:
包路径:org.apache.hadoop.io.Writable
类名称:Writable

Writable介绍

[英]A serializable object which implements a simple, efficient, serialization protocol, based on DataInput and DataOutput.

Any key or value type in the Hadoop Map-Reduce framework implements this interface.

Implementations typically implement a static read(DataInput) method which constructs a new instance, calls #readFields(DataInput) and returns the instance.

Example:

public class MyWritable implements Writable { 
// Some data 
private int counter; 
private long timestamp; 
// Default constructor to allow (de)serialization 
MyWritable() { } 
public void write(DataOutput out) throws IOException { 
out.writeInt(counter); 
out.writeLong(timestamp); 
} 
public void readFields(DataInput in) throws IOException { 
counter = in.readInt(); 
timestamp = in.readLong(); 
} 
public static MyWritable read(DataInput in) throws IOException { 
MyWritable w = new MyWritable(); 
w.readFields(in); 
return w; 
} 
}

[中]一个可序列化的对象,它基于DataInput和DataOutput实现了一个简单、高效的序列化协议。
Hadoop Map Reduce框架中的任何keyvalue类型都实现该接口。
实现通常实现一个静态read(DataInput)方法,该方法构造一个新实例,调用#readFields(DataInput)并返回实例。
例子:

public class MyWritable implements Writable { 
// Some data 
private int counter; 
private long timestamp; 
// Default constructor to allow (de)serialization 
MyWritable() { } 
public void write(DataOutput out) throws IOException { 
out.writeInt(counter); 
out.writeLong(timestamp); 
} 
public void readFields(DataInput in) throws IOException { 
counter = in.readInt(); 
timestamp = in.readLong(); 
} 
public static MyWritable read(DataInput in) throws IOException { 
MyWritable w = new MyWritable(); 
w.readFields(in); 
return w; 
} 
}

代码示例

代码示例来源:origin: apache/hive

public static String encodeWritable(Writable key) throws IOException {
 ByteArrayOutputStream bos = new ByteArrayOutputStream();
 DataOutputStream dos = new DataOutputStream(bos);
 key.write(dos);
 dos.flush();
 return Base64.encodeBase64URLSafeString(bos.toByteArray());
}

代码示例来源:origin: apache/hive

private <T extends Writable> T copy(T oldWritable, T newWritable) throws IOException {
 ByteArrayOutputStream baos = new ByteArrayOutputStream();
 DataOutputStream out = new DataOutputStream(baos);
 oldWritable.write(out);
 ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
 DataInputStream in = new DataInputStream(bais);
 newWritable.readFields(in);
 return newWritable;
}

代码示例来源:origin: apache/flink

@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
  ensureInstanceInstantiated();
  copyInstance.readFields(source);
  copyInstance.write(target);
}

代码示例来源:origin: apache/hive

public static void decodeWritable(Writable w, String idStr) throws IOException {
 DataInputStream in = new DataInputStream(new ByteArrayInputStream(Base64.decodeBase64(idStr)));
 w.readFields(in);
}

代码示例来源:origin: rayokota/hgraphdb

@Override
public void write(final DataOutput output) throws IOException {
  Kryo kryo = new Kryo();
  kryo.register(HBaseEdge.class, new HBaseEdgeSerializer());
  ByteArrayOutputStream baos = new ByteArrayOutputStream();
  Output out = new Output(baos);
  kryo.writeObject(out, this.edge);
  out.close();
  final byte[] serialized = baos.toByteArray();
  WritableUtils.writeCompressedByteArray(output, serialized);
  Writable writable = value != null ? value : NullWritable.get();
  Text.writeString(output, writable.getClass().getName());
  writable.write(output);
}

代码示例来源:origin: apache/ignite

/**
 * Deserialization of Hadoop Writable object.
 *
 * @param writable Writable object to deserialize to.
 * @param bytes byte array to deserialize.
 */
public static void deserialize(Writable writable, byte[] bytes) throws IOException {
  DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(bytes));
  writable.readFields(dataIn);
  dataIn.close();
}

代码示例来源:origin: apache/ignite

/**
 * Wraps native split.
 *
 * @param id Split ID.
 * @param split Split.
 * @param hosts Hosts.
 * @throws IOException If failed.
 */
public static HadoopSplitWrapper wrapSplit(int id, Object split, String[] hosts) throws IOException {
  ByteArrayOutputStream arr = new ByteArrayOutputStream();
  ObjectOutput out = new ObjectOutputStream(arr);
  assert split instanceof Writable;
  ((Writable)split).write(out);
  out.flush();
  return new HadoopSplitWrapper(id, split.getClass().getName(), arr.toByteArray(), hosts);
}

代码示例来源:origin: com.facebook.hadoop/hadoop-core

private void processData() throws  IOException, InterruptedException {
 DataInputStream dis =
  new DataInputStream(new ByteArrayInputStream(data.array()));
 int id = dis.readInt();                    // try to read an id
 if (LOG.isDebugEnabled())
  LOG.debug(" got #" + id);
 Writable param = ReflectionUtils.newInstance(paramClass, conf);           // read param
 param.readFields(dis);        
  
 Call call = new Call(id, param, this, responder);
 callQueue.put(call);              // queue the call; maybe blocked here
}

代码示例来源:origin: org.apache.hadoop/hadoop-common

@SuppressWarnings("unchecked")
 @Override
 <T> T readFrom(ByteBuffer bb) throws IOException {
  // create a stream that may consume up to the entire ByteBuffer.
  DataInputStream in = new DataInputStream(new ByteArrayInputStream(
    bb.array(), bb.position() + bb.arrayOffset(), bb.remaining()));
  try {
   writable.readFields(in);
  } finally {
   // advance over the bytes read.
   bb.position(bb.limit() - in.available());
  }
  return (T)writable;
 }
}

代码示例来源:origin: apache/hive

(isFileIdWritable ? HAS_SYNTHETIC_FILEID_FLAG : 0) |
  (syntheticAcidProps != null? HAS_SYNTHETIC_ACID_PROPS_FLAG : 0);
out.writeByte(flags);
out.writeInt(deltas.size());
for(AcidInputFormat.DeltaMetaData delta: deltas) {
 delta.write(out);
 byte[] tailBuffer = fileTail.toByteArray();
 int tailLen = tailBuffer.length;
 WritableUtils.writeVInt(out, tailLen);
 out.write(tailBuffer);
 out.writeLong(((Long)fileKey).longValue());
} else if (isFileIdWritable) {
 ((Writable)fileKey).write(out);
out.writeLong(fileLen);

代码示例来源:origin: apache/hbase

/**
 * No protobuf encoding of raw sasl messages
 */
protected final void doRawSaslReply(SaslStatus status, Writable rv,
  String errorClass, String error) throws IOException {
 BufferChain bc;
 // In my testing, have noticed that sasl messages are usually
 // in the ballpark of 100-200. That's why the initial capacity is 256.
 try (ByteBufferOutputStream saslResponse = new ByteBufferOutputStream(256);
   DataOutputStream  out = new DataOutputStream(saslResponse)) {
  out.writeInt(status.state); // write status
  if (status == SaslStatus.SUCCESS) {
   rv.write(out);
  } else {
   WritableUtils.writeString(out, errorClass);
   WritableUtils.writeString(out, error);
  }
  bc = new BufferChain(saslResponse.getByteBuffer());
 }
 doRespond(() -> bc);
}

代码示例来源:origin: org.apache.hadoop/hadoop-common

out.writeInt(length);
 for (int i = 0; i < length; i++) {
  writeObject(out, Array.get(instance, i),
  out.writeBoolean(((Boolean)instance).booleanValue());
 } else if (declaredClass == Character.TYPE) { // char
  out.writeChar(((Character)instance).charValue());
 } else if (declaredClass == Byte.TYPE) {    // byte
  out.writeByte(((Byte)instance).byteValue());
} else if (Writable.class.isAssignableFrom(declaredClass)) { // Writable
 UTF8.writeString(out, instance.getClass().getName());
 ((Writable)instance).write(out);

代码示例来源:origin: org.apache.hadoop/hadoop-common

@Override
public void write(DataOutput out) throws IOException {
 out.writeInt(values.length);                 // write values
 for (int i = 0; i < values.length; i++) {
  values[i].write(out);
 }
}

代码示例来源:origin: dremio/dremio-oss

@Override
 public T read(final Kryo kryo, final Input input, final Class<T> type) {
  try {
   final T object = kryo.newInstance(type);
   kryo.reference(object);
   object.readFields(new DataInputStream(input));
   return object;
  } catch (final IOException e) {
   throw new RuntimeException("unable to deserialize Writable object", e);
  }
 }
}

代码示例来源:origin: org.apache.hadoop/hadoop-common

@Override
public void write(DataOutput out) throws IOException {
 super.write(out);
 
 // Write out the number of entries in the map
 
 out.writeInt(instance.size());
 // Then write out each key/value pair
 
 for (Map.Entry<Writable, Writable> e: instance.entrySet()) {
  out.writeByte(getId(e.getKey().getClass()));
  e.getKey().write(out);
  out.writeByte(getId(e.getValue().getClass()));
  e.getValue().write(out);
 }
}

代码示例来源:origin: urbanairship/datacube

@Override
public void write(DataOutput out) throws IOException {
  out.writeUTF(elementClass.getName());
  out.writeInt(collection.size());
  for(Writable writable: collection) {
    writable.write(out);
  }
}

代码示例来源:origin: org.apache.hadoop/hadoop-common

val.readFields(valIn);
if (valIn.read() > 0) {
 LOG.info("available bytes: " + valIn.available());
 throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
            + " bytes, should read " +
int valLength = WritableUtils.readVInt(valLenIn);
val.readFields(valIn);

代码示例来源:origin: org.apache.hadoop/hadoop-common

@Override
public void write(DataOutput out) throws IOException {
 if (type == NOT_SET || instance == null)
  throw new IOException("The GenericWritable has NOT been set correctly. type="
             + type + ", instance=" + instance);
 out.writeByte(type);
 instance.write(out);
}

代码示例来源:origin: org.apache.hadoop/hadoop-mapred-test

private void write(DataOutput out) throws IOException {
 WritableUtils.writeVInt(out, values.length);
 WritableUtils.writeVLong(out, written);
 for (int i = 0; i < values.length; ++i) {
  Text.writeString(out, values[i].getClass().getName());
 }
 for (int i = 0; i < values.length; ++i) {
  if (has(i)) {
   values[i].write(out);
  }
 }
}

代码示例来源:origin: cloudera/crunch

@Override
public void write(DataOutput out) throws IOException {
 Text.writeString(out, valueClazz.getName());
 WritableUtils.writeVInt(out, instance.size());
 for (Map.Entry<Text, T> e : instance.entrySet()) {
  e.getKey().write(out);
  e.getValue().write(out);
 }
}

相关文章