如何用原始inputstream示例化fsdatainputstream?

qco9c6ql  于 2021-06-04  发布在  Hadoop
关注(0)|答案(1)|浏览(556)

我真的不明白我怎么能创建这样的输入流,这是寻找和定位可读。。。

Resource resource = new ClassPathResource("somefile");
InputStream bla = resource.getInputStream();
FSDataInputStream inputStream = new FSDataInputStream (bla);

在fs线上投掷:

java.lang.IllegalArgumentException: In is not an instance of Seekable or PositionedReadable

我需要模仿,这对我来说是个障碍。

vktxenjb

vktxenjb1#

FSDataInputStream 在fsdatainputstream.java中定义的构造函数,如下所示 InputStream 参数为 instanceSeekable 或者 PositionedReadable ```
public FSDataInputStream(InputStream in) throws IOException
{
super(in);
if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {
throw new IllegalArgumentException(
"In is not an instance of Seekable or PositionedReadable");
}
}

希望追随 `solution` 帮助你。

import java.io.*;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;

public class SeekableTest {

public static void main(String[] args) throws IOException
{
    Resource resource = new ClassPathResource("somefile");
    InputStream in = resource.getInputStream();
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    byte buf[] = new byte[1024];
    int read;

    while ((read = in.read(buf)) > 0)
      baos.write(buf, 0, read);

    byte data[] = baos.toByteArray();
    SeekableByteArrayInputStream bais = new SeekableByteArrayInputStream(data);
    FSDataInputStream in2 = new FSDataInputStream(bais);
}

static class SeekableByteArrayInputStream extends ByteArrayInputStream implements Seekable, PositionedReadable {

    public SeekableByteArrayInputStream(byte[] buf)
    {
        super(buf);
    }
    @Override
    public long getPos() throws IOException{
        return pos;
    }

    @Override
    public void seek(long pos) throws IOException {
      if (mark != 0)
        throw new IllegalStateException();

      reset();
      long skipped = skip(pos);

      if (skipped != pos)
        throw new IOException();
    }

    @Override
    public boolean seekToNewSource(long targetPos) throws IOException {
      return false;
    }

    @Override
    public int read(long position, byte[] buffer, int offset, int length) throws IOException {

      if (position >= buf.length)
        throw new IllegalArgumentException();
      if (position + length > buf.length)
        throw new IllegalArgumentException();
      if (length > buffer.length)
        throw new IllegalArgumentException();

      System.arraycopy(buf, (int) position, buffer, offset, length);
      return length;
    }

    @Override
    public void readFully(long position, byte[] buffer) throws IOException {
      read(position, buffer, 0, buffer.length);

    }

    @Override
    public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
      read(position, buffer, offset, length);
    }
}

}

参考:accumulo

相关问题