NodeJS 逐行读取文件的“被动”方式是什么

mw3dktmi  于 2022-12-29  发布在  Node.js
关注(0)|答案(4)|浏览(485)

我正在学习使用RxJS的React式编程,遇到了一个需要逐行读取文件的情况,实际上我使用了如下解决方案:
https://gist.github.com/yvele/447555b1c5060952a279
它的作品,但我需要使用一些正常的JS代码转换缓冲区流的行.(使用“readline”模块在上面的例子)
我想知道是否有其他方法可以使用RxJS操作符将Observable of Buffer转换为Observable of line,如下面的示例。

var Rx = require('rx');
var fs = require('fs');
var lines = Rx.Observable
  .fromEvent(rl, 'data') // emits buffers overtime
  // some transforms ...
  .subscribe(
    (line) => console.log(line), // emit string line by line
    err => console.log("Error: %s", err),
    () => console.log("Completed")
  );
jq6vz3qz

jq6vz3qz1#

使用scanconcatMap,您可能会获得非常接近您所需的效果。
例如:

bufferSource
  .concat(Rx.Observable.of("\n")) // parens was missing // to make sure we don't miss the last line!
  .scan(({ buffer }, b) => {
    const splitted = buffer.concat(b).split("\n");
    const rest = splitted.pop();
    return { buffer: rest, items: splitted };
  }, { buffer: "", items: [] })
  // Each item here is a pair { buffer: string, items: string[] }
  // such that buffer contains the remaining input text that has no newline
  // and items contains the lines that have been produced by the last buffer
  .concatMap(({ items }) => items)
  // we flatten this into a sequence of items (strings)
  .subscribe(
    item => console.log(item),
    err => console.log(err),
    () => console.log("Done with this buffer source"),
  );
bfnvny8b

bfnvny8b2#

您可以使用以下类

'use strict'

const lineReader = require('line-reader');
const Rx = require('rxjs');
const RxOp = require('rxjs/operators');

class CSVReader {
    constructor(filepath {
        this.filepath = filepath;
    }

    readByLines() 
    {
        const source = new Rx.Subject();

        lineReader.open(this.filepath, (err, reader)=> {
            Rx.of(0).pipe(
                RxOp.expand(val => {
                    reader.nextLine((err2, line) => source.next(line));
                    return Rx.of(1 + val);
                }),
                RxOp.takeWhile(_=> { 
                    let has = reader.hasNextLine();
                    if(!has) source.complete();
                    return has;
                })
            ).subscribe(_=>_);
        })

        return source;        
    }
}

module.exports = CSVReader

并按如下方式使用

const { bufferCount } = require('rxjs/operators');

let reader = new CSVReader('path/to/file');

reader.readByLines()
    .pipe(bufferCount(2)) // chunk size
    .subscribe(chunk=> {
        console.log({chunk});
    });
tzxcd3kk

tzxcd3kk3#

我会这样说:

const readline = require('readline');
const fs = require('fs');
const path = require('path');
const {fromEvent, race, Observable} = require('rxjs');
const {tap, takeUntil, take, map} = require('rxjs/operators');


const rl = readline.createInterface({
    input: fs.createReadStream(path.resolve('./', 'myfile'))
});

let obs = new Observable(observer=>{
    rl.on('line', val => observer.next(val)),
    rl.on('error', err => observer.error(err)),
    rl.on('close', complete => observer.complete(complete))
})
.pipe(tap(line=>console.log(`line: ${line}`)))

obs.subscribe(()=>{},
   (e)=>console.log(`Error reading file: ${e}`),
   ()=>console.log("Read complete"))

创建可观察对象的替代方法可以是:

let obs = fromEvent(rl, 'line')
.pipe(
    takeUntil(race(
        fromEvent(rl, 'close').pipe(take(1))  , 
        fromEvent(rl, 'error').pipe(map((err)=>{throw err}))   
    )))

理想情况下,rxjs可以提供如下运算符:fromEvent(emitter, nextEvent, errorEvent, completeEvent ),以使上述代码更加简单。

fnx2tebb

fnx2tebb4#

我尝试了上面的一系列答案,并构建了自己的丑陋版本。然后,我在GitHub上浏览了一下代码,发现RxJS处理流就像处理对象一样--没有必要去处理事件。只需将一个ReadStream传递给from,它会测试它是否为ReadableStreamLike,然后将其转换为AsyncGenerator

import * as readline from 'node:readline';
import { from } from 'rxjs';

const file = fs.createReadStream(fileName);
const line = readline.createInterface({ input: file });

const line$ = from(line).subscribe({
  next:  (dat) => { ... },
  error: (err) => { ... },
  complete: () => { ... }
});

相关问题