我正在学习使用RxJS的React式编程,遇到了一个需要逐行读取文件的情况,实际上我使用了如下解决方案:
https://gist.github.com/yvele/447555b1c5060952a279
它的作品,但我需要使用一些正常的JS代码转换缓冲区流的行.(使用“readline”模块在上面的例子)
我想知道是否有其他方法可以使用RxJS操作符将Observable of Buffer转换为Observable of line,如下面的示例。
var Rx = require('rx');
var fs = require('fs');
var lines = Rx.Observable
.fromEvent(rl, 'data') // emits buffers overtime
// some transforms ...
.subscribe(
(line) => console.log(line), // emit string line by line
err => console.log("Error: %s", err),
() => console.log("Completed")
);
4条答案
按热度按时间jq6vz3qz1#
使用
scan
和concatMap
,您可能会获得非常接近您所需的效果。例如:
bfnvny8b2#
您可以使用以下类
并按如下方式使用
tzxcd3kk3#
我会这样说:
创建可观察对象的替代方法可以是:
理想情况下,
rxjs
可以提供如下运算符:fromEvent(emitter, nextEvent, errorEvent, completeEvent )
,以使上述代码更加简单。fnx2tebb4#
我尝试了上面的一系列答案,并构建了自己的丑陋版本。然后,我在GitHub上浏览了一下代码,发现RxJS处理流就像处理对象一样--没有必要去处理事件。只需将一个ReadStream传递给
from
,它会测试它是否为ReadableStreamLike
,然后将其转换为AsyncGenerator
。