javascript 如何在RxJ中捕获错误并继续执行序列?

cbeh67ev  于 2022-12-28  发布在  Java
关注(0)|答案(5)|浏览(148)

我有一个要解析的项列表,但其中一个项的解析可能会失败。
捕获错误但继续执行序列的"接收方式"是什么
代码示例:

var observable = Rx.Observable.from([0,1,2,3,4,5])
.map(
  function(value){
      if(value == 3){
        throw new Error("Value cannot be 3");
      }
    return value;
  });

observable.subscribe(
  function(value){
  console.log("onNext " + value);
  },
  function(error){
    console.log("Error: " + error.message);
  },
  function(){
    console.log("Completed!");
  });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>

我想以非处方的方式做什么:

var items = [0,1,2,3,4,5];

for (var item in items){
  try{
    if(item == 3){
      throw new Error("Value cannot be 3");
    }
    console.log(item);
  }catch(error){
     console.log("Error: " + error.message);
  }
}
qyzbxkaa

qyzbxkaa1#

我建议您使用flatMap(现在rxjs版本5中的mergeMap),如果您不关心错误,它将允许您折叠错误。你将创建一个内部的可观察对象,如果发生错误,它可以被吞噬。这种方法的优点是,您可以将运算符链接在一起,如果管道中的任何位置发生错误,它将自动转发到挡块。

const {from, iif, throwError, of, EMPTY} = rxjs;
const {map, flatMap, catchError} = rxjs.operators;

// A helper method to let us create arbitrary operators
const {pipe} = rxjs;

// Create an operator that will catch and squash errors
// This returns a function of the shape of Observable<T> => Observable<R>
const mapAndContinueOnError = pipe(
  //This will get skipped if upstream throws an error
  map(v => v * 2),
  catchError(err => {
    console.log("Caught Error, continuing")
    //Return an empty Observable which gets collapsed in the output
    return EMPTY;
  })
)

const observable = from([0, 1, 2, 3, 4, 5]).pipe(
  flatMap((value) => 
    iif(() => value != 3, 
      of(value), 
      throwError(new Error("Value cannot be 3"))
    ).pipe(mapAndContinueOnError)
  )
);

observable.subscribe(
  (value) => console.log("onNext " + value), (error) => console.log("Error: " + error.message), () => console.log("Completed!")
);
<script src="https://unpkg.com/rxjs@7.0.0/dist/bundles/rxjs.umd.min.js"></script>
ws51t4hk

ws51t4hk2#

您需要切换到一个新的可处理流,如果其中发生错误,则将安全地处理该流,并保持原始流处于活动状态:

Rx.Observable.from([0,1,2,3,4,5])
    .switchMap(value => {

        // This is the disposable stream!
        // Errors can safely occur in here without killing the original stream

        return Rx.Observable.of(value)
            .map(value => {
                if (value === 3) {
                    throw new Error('Value cannot be 3');
                }
                return value;
            })
            .catch(error => {
                // You can do some fancy stuff here with errors if you like
                // Below we are just returning the error object to the outer stream
                return Rx.Observable.of(error);
            });

    })
    .map(value => {
        if (value instanceof Error) {
            // Maybe do some error handling here
            return `Error: ${value.message}`;
        }
        return value;
    })
    .subscribe(
      (x => console.log('Success', x)),
      (x => console.log('Error', x)),
      (() => console.log('Complete'))
    );
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.1/Rx.min.js"></script>

有关此技术的更多信息,请参阅此博客文章:The Quest for Meatballs: Continue RxJS Streams When Errors Occur

5q4ezhmt

5q4ezhmt3#

为了防止endlessObservable$失效,可以将failingObservable$放入一个高阶Map运算符(例如switchMapconcatMapexhaustMap ...)中,并通过使用empty() observable(不返回任何值)终止内部流来消 debugging 误。
使用RxJS 6:

endlessObservable$
    .pipe(
        switchMap(() => failingObservable$
            .pipe(
                catchError((err) => {
                    console.error(err);
                    return EMPTY;
                })
            )
        )
    );
pvcm50d1

pvcm50d14#

如果你不想或无法访问导致错误的内部可观察性,你可以这样做:
使用RxJS 7:

const numbers$ = new Subject();

numbers$
  .pipe(
    tap(value => {
      if (value === 3) {
        throw new Error('Value cannot be 3');
      }
    }),
    tap(value => {
      console.log('Value:', value);
    }),
    catchError((err, caught) => {
      console.log('Error:', err.message);
      return caught;
    })
  )
  .subscribe();

for (let n = 1; n <= 10; n++) {
  numbers$.next(n);
}

这里有趣的是可以返回的catchError运算符中的"catch"参数。https://rxjs.dev/api/operators/catchError
它仅在可观测源为热时有效。
在我的例子中,我使用redux-observable,我希望有一种方法在一个地方处理我的错误。
我想到了这个:

const allEpics = combineEpics(
    // all my epics
);

export const rootEpic = (action$, state$, dependencies) => {
  return allEpics(action$, state$, dependencies).pipe(
    catchError((err, caught) => {
        if (err instanceof MyError) {
        return concat(of(displayAlert(err.message)), caught);
      }
      throw err;
    })
  );
};

如果我的任何史诗抛出"MyError"异常,它将被捕获,另一个动作将被调度.

fjaof16o

fjaof16o5#

实际上,您可以在map函数中使用try/catch来处理这个错误。

var source = Rx.Observable.from([0, 1, 2, 3, 4, 5])
    .map(
        function(value) {
            try {
                if (value === 3) {
                    throw new Error("Value cannot be 3");
                }
                return value;

            } catch (error) {
                console.log('I caught an error');
                return undefined;
            }
        })
    .filter(function(x) {
        return x !== undefined; });


source.subscribe(
    function(value) {
        console.log("onNext " + value);
    },
    function(error) {
        console.log("Error: " + error.message);
    },
    function() {
        console.log("Completed!");
    });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>

相关问题