我正在尝试在rxjs中创建一个稍微复杂的队列系统,如下所示:
- 任何用户都可以在任何时间请求标题,只要他们没有请求或超时挂起
- 任何有头衔的用户都可以取消他们的超时
- 标题最多只能保持30秒
- 标题在任何时候只能由一个用户持有
- 用户可以同时持有多个标题
- 当没有其他人在使用它们时,标题被交付
- 如果一个标题没有被使用,则在要求时立即交付
- 如果标题正在使用,则在当前用户将其归还或他们的时间用完时交付
import {
Subject,
groupBy,
mergeMap,
switchMap,
delayWhen,
timer,
takeUntil,
share,
} from "rxjs";
type Snowflake = string;
enum Title {
A = "A",
B = "B",
}
const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms));
// This is an external function, cannot modify
const startTitleReq = (item: { title: Title; user: Snowflake }) => {
console.log(`Starting request for ${item.user}, requested ${item.title}`);
// Can take longer in reality
return sleep(10_000).then(() => ({ success: true }));
};
const queue = new Subject<{ title: Title; user: Snowflake }>();
const cancel = new Subject();
queue
.pipe(
groupBy((item) => item.title),
mergeMap((group$) =>
group$.pipe(
switchMap((item) => startTitleReq(item)),
delayWhen((result) =>
timer(result.success ? 10_000 : 0).pipe(takeUntil(cancel))
)
)
),
share()
)
.subscribe(({ success }) => {
console.log({ success });
});
queue.next({ user: "1", title: Title.A });
queue.next({ user: "2", title: Title.A });
queue.next({ user: "3", title: Title.B });
因此,预期的结果将是:
queue.next({ user: "1", title: Title.A }); // 1, success log after 10 seconds
queue.next({ user: "2", title: Title.A }); // 3, success log after 40 seconds
queue.next({ user: "3", title: Title.B }); // 2, success log after 20 seconds
现在让我们假设这是开始:
queue.next({ user: "1", title: Title.A }); // 1, success log after 10 seconds
// After 5 seconds..
cancel.next();
queue.next({ user: "2", title: Title.A }); // 2, success log after 25 seconds
1条答案
按热度按时间2q5ifsrm1#
很好的思考与模拟,并试图简化的问题,以尽可能少。
我选了一个你想要的
当我给予
getTitle
方法一个随机延迟时,我们得到了以下结果:这是live demo with the mocks。
**编辑:**根据评论,我似乎第一次没有理解这个问题。这是我第二次使用的live demo。
大部分代码: