javascript 拼接/移位AsyncIterable前n个字节< Uint8Array>

i7uaboj4  于 2023-05-05  发布在  Java
关注(0)|答案(2)|浏览(162)

阅读字节流的前n个字节(以AsyncIterable的形式)感觉很麻烦并且容易出错。
有没有更好的方法来实现这一点?

async function shift(
  length: number,
  stream: AsyncIterable<Uint8Array>
): Promise<[Uint8Array, AsyncIterable<Uint8Array>]> {
  const prefix = new Uint8Array(length);

  let offset = 0;

  const iterator = stream[Symbol.asyncIterator]();

  while (true) {
    const { done, value } = await iterator.next();

    if (done) {
      throw new Error("Buffer underflow");
    } else {
      const chunk = value;
      if (chunk.length < length - offset) {
        prefix.set(chunk, offset);
        offset += chunk.length;
      } else {
        const slice = chunk.slice(0, length - offset);
        prefix.set(slice, offset);

        return [prefix, prepend(chunk.slice(slice.length), stream)];
      }
    }
  }
}

async function* prepend(
  prefix: Uint8Array,
  stream: AsyncIterable<Uint8Array>
) {
  yield prefix;
  yield* stream;
}
vlju58qv

vlju58qv1#

流原语

我们将从定义流原语开始-

flatten<T>(t: AsyncIterable<Iterable<T>>): AsyncIterable<T>
take<T>(t: AsyncIterable<T>, n: number): AsyncIterable<T>
skip<T>(t: AsyncIterable<T>, n: number): AsyncIterable<T>
toArray<T>(t: AsyncIterable<T>): Promise<Array<T>>
async function *flatten<T>(t: AsyncIterable<Iterable<T>>): AsyncIterable<T> {
  for await (const a of t) {
    yield *a
  }
}

async function *take<T>(t: AsyncIterable<T>, n: number): AsyncIterable<T> {
  for await (const v of t) {
    if (n-- <= 0) return
    yield v 
  }
  if (n > 0) throw Error("buffer underflow")
}

async function *skip<T>(t: AsyncIterable<T>, n: number): AsyncIterable<T> {
  for await (const v of t) {
    if (n-- > 0) continue
    yield v
  }
  if (n > 0) throw Error("buffer underflow")
}

async function toArray<T>(t: AsyncIterable<T>): Promise<Array<T>> {
  const r = []
  for await (const v of t) r.push(v)
  return r
}

班次

使用这些流原语,我们可以以一种舒适安全的方式编写shift-

shift(stream: AsyncIterable<Uint8Array>, count: number): Promise<[Uint8Array, AsyncIterable<number>]>
async function shift(stream: AsyncIterable<Uint8Array>, count: number) {
  return [
    new Uint8Array(await toArray(take(flatten(stream), count))),
    skip(flatten(stream), count)
  ] as const
}

让我们创建一个mock buffer并测试它-

const buffer: AsyncIterable<Uint8Array> = {
  async *[Symbol.asyncIterator]() {
    for (const v of [[0,1,2],[3,4],[5,6,7,8],[9]]) {
      yield new Uint8Array(v)
      await new Promise(r => setTimeout(r, 100))
    }
  }
}

async function main() {
  const [first, rest] = await shift(buffer, 4)
  console.log({
    first: Array.from(first),
    rest: await toArray(rest)
  })
}

main().then(console.log, console.error)
{
  first: [0, 1, 2, 3],
  rest: [4, 5, 6, 7, 8, 9]
}

demo

在typescript playground上运行并验证结果

tag5nh1u

tag5nh1u2#

我认为迭代器逻辑本身可以通过使用notClosing助手和普通迭代来简化:

async function shift(
  length: number,
  stream: AsyncIterable<Uint8Array>
): Promise<[Uint8Array, AsyncIterable<Uint8Array>]> {
  const prefix = new Uint8Array(length);
  const iter = stream[Symbol.asyncIterator]();
  let offset = 0;
  for await (const chunk of notClosing(iter)) {
    if (chunk.length < length - offset) {
      prefix.set(chunk, offset);
      offset += chunk.length;
    } else {
      const slice = chunk.slice(0, length - offset);
      prefix.set(slice, offset);
      return [prefix, prepend(chunk.slice(slice.length), iter)];
    }
  }
  throw new Error("Buffer underflow");
}

除非您想将流从块的迭代器转换为单个字节的效率低得多的迭代器,否则您无法进一步简化offset逻辑。

const AsyncIteratorPrototype = Object.getPrototypeOf(Object.getPrototypeOf(async function*(){}.prototype)) as AsyncIterator<any>;
function prepend<T>(val: T, iter: AsyncIterator<T>): AsyncIterable<T> & AsyncIterator<T> {
  return Object.assign(Object.create(AsyncIteratorPrototype), {
    first: true,
    next() {
      if (this.first) {
        const res = {done: false, value: val};
        val = undefined!; // GC
        this.first = false;
        return res;
      }
      return iter.next();
    },
    return: iter.return ? () => iter.return!() : undefined,
  });
}
function notClosing<T>(iter: AsyncIterator<T>): AsyncIterable<T> & AsyncIterator<T> {
  return Object.assign(Object.create(AsyncIteratorPrototype), {
    next: iter.next.bind(iter),
  });
}

相关问题