Gulp是否会在传递下一个文件之前等待管道内的异步函数完成?

oxalkeyp  于 2023-06-20  发布在  Gulp
关注(0)|答案(1)|浏览(153)
    • bounty还有4天到期**。此问题的答案有资格获得+50声望奖励。Takeshi Tokugawa YD正在寻找一个规范答案

因为我听说并行模拟是Gulp的主要特性,所以我预计即使Gulp管道中有async函数,下一个文件的处理也会在这个async函数完成对前一个文件的处理之前开始。
假设我们有两个文件:“Foo.txt”和“Bar.txt”,其中“Foo.txt”首先进入Gulp管道。我预计"Bar.txt"的处理将在myAsyncFunction完成"Foo.txt"的工作之前开始。

const gulp = require('gulp');
const through = require('through2');
const PluginError = require('plugin-error');

// Define your async function that will be used inside the Transform stream
async function myAsyncFunction(data) {

  console.log("Checkpoint 4");
  // Simulate an asynchronous operation
  await new Promise((resolve) => setTimeout(resolve, 5000));

  // Manipulate the data
  const transformedData = data.toString().toUpperCase();

  console.log("Checkpoint 5");

  return transformedData;
}

// Create a Transform stream using through2
const myTransformStream = through.obj(async function (file, encoding, callback) {

  console.log("Checkpoint 1");

  if (file.isNull()) {
    console.log("Checkpoint 2-A");
    // Pass through if null
    return callback(null, file);
  }

  console.log("Checkpoint 2-B");

  if (file.isBuffer()) {

    console.log("Checkpoint 3");

    try {
      // Convert the file contents to string and pass it to the async function
      const transformedData = await myAsyncFunction(file.contents.toString());

      console.log("Checkpoint 6");
      console.log("=============");

      // Create a new file with the transformed data
      const transformedFile = file.clone();
      transformedFile.contents = Buffer.from(transformedData);

      // Pass the transformed file to the next stream
      this.push(transformedFile);
    } catch (err) {
      // Handle any errors that occur during the async operation
      this.emit('error', new PluginError('gulp-my-task', err));
    }
  }

  callback();
});

// Define your Gulp task that uses the Transform stream
gulp.task('my-task', function () {
  return gulp.src('src/*.txt')
      .pipe(myTransformStream)
      .pipe(gulp.dest('dist'));
});

但是我得到了输出:

Checkpoint 1  
Checkpoint 2-B
Checkpoint 3  
Checkpoint 4  
Checkpoint 5
Checkpoint 6  
============= 
Checkpoint 1  
Checkpoint 2-B
Checkpoint 3  
Checkpoint 4  
Checkpoint 5
Checkpoint 6
=============

这意味着Gulp已经处理了第一个文件,直到Checkpoint 6,然后开始处理第二个文件。不是我期望的,我期望的是

Checkpoint 1  
Checkpoint 2-B
Checkpoint 3  
Checkpoint 4
Checkpoint 1  // <- Begin the processing of the next file  
Checkpoint 2-B
Checkpoint 3  
Checkpoint 4    
Checkpoint 5
Checkpoint 6  
============= 
Checkpoint 5
Checkpoint 6
=============

请解释一下这个理论。

gxwragnw

gxwragnw1#

gulp有一个parallelseries API你可以用它来根据依赖顺序(并行表示不依赖,串行表示依赖)协调运行的任务,这些任务可以从单个任务组合在一起,形成更复杂的流程和作业连接。
在你的代码中,你不使用这两个API。您可以使用srcpipedest定义一个单通道任务管道。
src接受一个指定输入文件的glob和vinyl对象的produces a stream,以通过转换管道传递。
您可能会发现了解节点流的工作方式非常有用,尤其是对象流。您可能还希望查看节点流/管道上下文中的背压,如果您忙碌下游以控制内存使用,则该背压会阻止上游写入器发送更多数据。
根据定义,流本质上是串行的(一件事接一件事)。如果查看gulp.src的源代码,可以看到它使用的是vinyl-fs.src,它以glob-stream开始,将一个glob(模式)转换为单独的路径,并逐个发出它们。
因此,这里的困惑可能是您认为流API本身将并行使用一堆文件。
如果管道由于背压而堵塞,则流动将连续发生并且不会继续。
你是await ing超时,这意味着through不会得到回调,所以只要管道知道你仍然很忙碌,所以你不会得到任何更多的工作。
你可以让你的代码撒谎,当你没有完成的时候说你已经完成了,这样就会有更多的工作进来,但是当通过管道处理大量数据时,这就违背了反压控制内存使用的目的。或者,在某些情况下,让您的流转换器实际上协调多个并行工作者并在任何池工作者空闲时移交工作,在代码中的管道中创建一些并行性,这是有意义的。对于多核机器上的某些类型的工作负载可能有意义。
不过,回到正题...
下面是一个gulpfile示例,它演示了gulp中用于任务排序的串行/并行API。任务只是一个函数。在gulp 4中,您不需要使用字符串作为任务名称,并且最好使用vanilla函数并导出它们。Gulp将使用函数名作为任务名,但您可以使用displayName覆盖以编程方式生成的函数。

import gulp from 'gulp';

const {parallel, series} = gulp;

const randMin = 1,
    randMax = 10;

function rand (min, max) { 
    isNaN(min) && (min = randMin);
    isNaN(max) && (max = randMax);
    return Math.floor(Math.random()*(max-min+1))+min;
}

const makeTask = (function () {
    var taskIdx = 0;
    return function () {
        var taskName = 'task'+(++taskIdx),
            task = async function () {
                var timeout = rand();
                console.log(`Started: ${taskName} with random delay ${timeout}s `);
                // Simulate an asynchronous operation
                await new Promise((resolve) => setTimeout(resolve, timeout*1000));
                console.log(`Finishing: ${taskName}`);
            };
        task.displayName = taskName;
        return task;
    };   
})();

// the gulp series/parallel usage
export const myTask = series(
    makeTask(),     // task1
    parallel(
        makeTask(), // task2
        makeTask(), // task3
        makeTask()  // task4
    ), 
    makeTask()      // task5
);

注意,在上面的构造中,我们期望task 1总是在我们执行task 2/3/4/5之前运行完成。2/3/4都同时启动,并行运行。task 5只在2/3/4全部完成后才开始。
您可以将上述文件保存为filename.js,然后在目录中保存为npm install gulp。然后可以运行gulp以下内容...

$ ./node_modules/.bin/gulp --gulpfile filename.js --tasks
[18:26:27] └─┬ myTask
[18:26:27]   └─┬ <series>
[18:26:27]     ├── task1
[18:26:27]     ├─┬ <parallel>
[18:26:27]     │ ├── task2
[18:26:27]     │ ├── task3
[18:26:27]     │ └── task4
[18:26:27]     └── task5

或者运行myTask你可以做...

$ ./node_modules/.bin/gulp --gulpfile filename.js myTask
[18:22:54] Starting 'myTask'...
[18:22:54] Starting 'task1'...
Started: task1 with random delay 1s
Finishing: task1
[18:22:55] Finished 'task1' after 1 s
[18:22:55] Starting 'task2'...
[18:22:55] Starting 'task3'...
[18:22:55] Starting 'task4'...
Started: task2 with random delay 3s
Started: task3 with random delay 2s
Started: task4 with random delay 5s
Finishing: task3
[18:22:57] Finished 'task3' after 2.01 s
Finishing: task2
[18:22:58] Finished 'task2' after 3.01 s
Finishing: task4
[18:23:00] Finished 'task4' after 5 s
[18:23:00] Starting 'task5'...
Started: task5 with random delay 8s
Finishing: task5
[18:23:08] Finished 'task5' after 8.01 s
[18:23:08] Finished 'myTask' after 14 s

相关问题