NodeJS 如何管理单个作业队列,同时只允许使用rxjs一次运行一个作业?

iklwldmw  于 2023-06-29  发布在  Node.js
关注(0)|答案(1)|浏览(75)

我正在尝试在rxjs中创建一个稍微复杂的队列系统,如下所示:

  • 任何用户都可以在任何时间请求标题,只要他们没有请求或超时挂起
  • 任何有头衔的用户都可以取消他们的超时
  • 标题最多只能保持30秒
  • 标题在任何时候只能由一个用户持有
  • 用户可以同时持有多个标题
  • 当没有其他人在使用它们时,标题被交付
  • 如果一个标题没有被使用,则在要求时立即交付
  • 如果标题正在使用,则在当前用户将其归还或他们的时间用完时交付
import {
  Subject,
  groupBy,
  mergeMap,
  switchMap,
  delayWhen,
  timer,
  takeUntil,
  share,
} from "rxjs";

type Snowflake = string;

enum Title {
  A = "A",
  B = "B",
}

const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms));

// This is an external function, cannot modify
const startTitleReq = (item: { title: Title; user: Snowflake }) => {
  console.log(`Starting request for ${item.user}, requested ${item.title}`);

  // Can take longer in reality
  return sleep(10_000).then(() => ({ success: true }));
};

const queue = new Subject<{ title: Title; user: Snowflake }>();
const cancel = new Subject();

queue
  .pipe(
    groupBy((item) => item.title),
    mergeMap((group$) =>
      group$.pipe(
        switchMap((item) => startTitleReq(item)),
        delayWhen((result) =>
          timer(result.success ? 10_000 : 0).pipe(takeUntil(cancel))
        )
      )
    ),
    share()
  )
  .subscribe(({ success }) => {
    console.log({ success });
  });

queue.next({ user: "1", title: Title.A });
queue.next({ user: "2", title: Title.A });
queue.next({ user: "3", title: Title.B });

因此,预期的结果将是:

queue.next({ user: "1", title: Title.A }); // 1, success log after 10 seconds
queue.next({ user: "2", title: Title.A }); // 3, success log after 40 seconds
queue.next({ user: "3", title: Title.B }); // 2, success log after 20 seconds

现在让我们假设这是开始:

queue.next({ user: "1", title: Title.A }); // 1, success log after 10 seconds

// After 5 seconds..
cancel.next();

queue.next({ user: "2", title: Title.A }); // 2, success log after 25 seconds

Link to playgound

2q5ifsrm

2q5ifsrm1#

很好的思考与模拟,并试图简化的问题,以尽可能少。
我选了一个你想要的

const request$$ = new Subject<string>();

const queue$ = request$$.pipe(
  concatMap((title) =>
    getTitle(title).pipe(
      map((_) => ({ title, success: true })),
      timeout(30_000),
      catchError((e) => {
        return of({ title, success: false });
      })
    )
  )
);

当我给予getTitle方法一个随机延迟时,我们得到了以下结果:

  • (注意,我已经将超时时间降低到5s而不是30,并使随机延迟在1到10s之间)*
Starting request for title Title 0 (delay 5006)
{title: "Title 0", success: false}

Starting request for title Title 1 (delay 9117)
{title: "Title 1", success: false}

Starting request for title Title 2 (delay 1160)
{title: "Title 2", success: true}

Starting request for title Title 3 (delay 1521)
{title: "Title 3", success: true}

Starting request for title Title 4 (delay 5109)
{title: "Title 4", success: false}

Starting request for title Title 5 (delay 8671)
{title: "Title 5", success: false}

Starting request for title Title 6 (delay 6618)
{title: "Title 6", success: false}

Starting request for title Title 7 (delay 3804)
{title: "Title 7", success: true}

Starting request for title Title 8 (delay 2827)
{title: "Title 8", success: true}

Starting request for title Title 9 (delay 6142)
{title: "Title 9", success: false}

这是live demo with the mocks

**编辑:**根据评论,我似乎第一次没有理解这个问题。这是我第二次使用的live demo

大部分代码:

const queue$$ = new Subject<{ title: Title; user: Snowflake }>();
const cancel$$ = new Subject<void>();

const queue$ = queue$$.pipe(
  concatMap((item) =>
    startTitleReq(item).pipe(
      timeout(10_000),
      catchError(() => {
        return of({ success: false });
      }),
      takeUntil(cancel$$)
    )
  )
);

相关问题