rxjava有序(按键)任务执行

nsc4cvqm  于 2021-07-09  发布在  Java
关注(0)|答案(1)|浏览(458)

我有一堆表示数据的对象。这些对象可以写入相应的文件。用户可能会要求比以前写入文件的更改更快地进行某些更改。
比如说,我对文件a、文件b和文件c进行更改,然后提交执行。然后,在编写过程中,我对文件a进行更改并将其发布。例如,有3个线程在运行。一旦对a、b和c的第一次更改被执行(写入文件),对a的第一次和第二次更改将几乎同时执行。但是,我希望第二个更改在第一个更改完成后应用。
在rxjava中如何做到这一点?
还有一点。在另一个地方,我想用最新的更改运行操作。一种选择是等待所有任务完成。
是否有合适的rxjava原语/方法能够覆盖这两个用例?
我是rxjava的新手,但我希望这是有意义的。 Subjects 在我看来是相关的,但会有成百上千的文件。
我已经有了自定义的实现 Executor .

public class OrderingExecutor
implements Executor
{
    @Delegate
    private final Executor delegate;
    private final Map<Object, Queue<Runnable>> keyedTasks = new HashMap<>();

    public OrderingExecutor(
        Executor delegate)
    {
        this.delegate = delegate;
    }

    public void execute(
        Runnable task,
        Object key)
    {
        Objects.requireNonNull(key);

        boolean first;
        Runnable wrappedTask;
        synchronized (keyedTasks)
        {
            Queue<Runnable> dependencyQueue = keyedTasks.get(key);
            first = (dependencyQueue == null);
            if (dependencyQueue == null)
            {
                dependencyQueue = new LinkedList<>();
                keyedTasks.put(key, dependencyQueue);
            }

            wrappedTask = wrap(task, dependencyQueue, key);
            if (!first)
            {
                dependencyQueue.add(wrappedTask);
            }
        }

        // execute method can block, call it outside synchronize block
        if (first)
        {
            delegate.execute(wrappedTask);
        }

    }

    private Runnable wrap(
        Runnable task,
        Queue<Runnable> dependencyQueue,
        Object key)
    {
        return new OrderedTask(task, dependencyQueue, key);
    }

    class OrderedTask
    implements Runnable
    {

        private final Queue<Runnable> dependencyQueue;
        private final Runnable task;
        private final Object key;

        public OrderedTask(
            Runnable task,
            Queue<Runnable> dependencyQueue,
            Object key)
        {
            this.task = task;
            this.dependencyQueue = dependencyQueue;
            this.key = key;
        }

        @Override
        public void run()
        {
            try
            {
                task.run();
            }
            finally
            {
                Runnable nextTask = null;
                synchronized (keyedTasks)
                {
                    if (dependencyQueue.isEmpty())
                    {
                        keyedTasks.remove(key);
                    }
                    else
                    {
                        nextTask = dependencyQueue.poll();
                    }
                }
                if (nextTask != null)
                {
                    delegate.execute(nextTask);
                }
            }
        }
    }
}

也许有什么明智的方法可以把它插入rxjava?

8yparm6h

8yparm6h1#

现在还不完全清楚您试图在这里实现什么,但是您可以在rxjava之上分层优先级队列。

class OrderedTask implements Comparable<OrderedTask> { ... }

PriorityBlockingQueue<OrderedTask> queue = new PriorityBlockingQueue<>();

PublishSubject<Integer> trigger = PublishSubject.create();

trigger.flatMap(v -> {
   OrderedTask t = queue.poll();
   return someAPI.workWith(t);
}, 1)
.subscribe(result -> { }, error -> { });

queue.offer(new SomeOrderedTask(1));
trigger.onNext(1);

queue.offer(new SomeOrderedTask(2));
trigger.onNext(2);

相关问题