java—按顺序阻塞任务的并行组执行

8ljdwjyq  于 2021-06-27  发布在  Java
关注(0)|答案(1)|浏览(294)

我已经仔细考虑了一段时间了,随着时间的推移,我对线程、执行器等的了解也越来越多。我对执行器和线程有一个粗略的了解,但是我觉得有点卡住了。
这就是我想做的。
有命令,也有行动。例如,命令是命名的,用户可以任意调用它!播放歌曲!欢呼等。一个行动是把工作送到一个服务机构的东西;例如,请求websocket客户端发送一条新消息,或者请求irc客户端发送一条新消息,等等。
当一个命令被执行时,它一个接一个地按顺序执行它的动作。
例如,the!cheer命令可能有四个操作:
发出websocket请求,并等待成功响应(例如:在obs中显示场景项)
发送irc信息(例如:发送聊天信息)。一旦发送出去,
等待1-3秒(例如:等待视频结束播放)。一旦等待结束,那么
发出另一个websocket请求(例如:从步骤1隐藏场景项)
它们不仅必须按顺序执行,而且我们也不能让它们同时开始(操作1、2和4首先完成,然后操作3最后完成);每个动作都取决于它的前一个动作先完成。
除此之外,客户机可以随时任意提交命令,并且不能相互阻塞。例如!longcommand可以启动,但不会阻塞!启动shortcommand(假设底层服务没有被阻塞)。
我想做的是:
我知道我可以使用future/callable来阻止挂起在给定线程上执行的结果,因此每个操作在运行时都应该返回future(future来自它使用的相应服务)。然后,我可以简单地在一个命令上以这样的阻塞方式逐个调用这些操作,以确保它们按顺序执行,并且每个操作都等待另一个操作完成:

class ExecutableCommand implments Runnable {
  // omitted for brevity

  run() {
    for(Action action:command.getActions()) {
    action.run().get();
  } 

}

但是我该如何处理执行命令呢?我想我应该通过一个执行器来提交每个命令,或者像这样的线程池执行器来提交每个命令?

class ExecutorServiceWrapper {

  private final ExecutorService executorService = Executors.newThreadPoolExecutor(4);

  void submit(ExecutableCommand command) {
    executorService.submit(command)
  }

}

然后,每个客户端ofc只需保留对executorservicewrapper的引用,并调用它以响应触发它们的事件:

class FromChatHandler() {
  private final ExecutorServiceWrapper masterQueue;

  onMessage(String message) {
    Command command = // parse what command to lookup from message
    masterQueue.submit(command)
  }
}

@RestController // or whatever
class MyController() {
  private final ExecutorServiceWrapper masterQueue;

  @Post
  executeCommandByName(String commandName) {
    Command command = // lookup command
    masterQueue.submit(command)
  }
}

class directHandler() {
  private final ExecutorServiceWrapper masterQueue;

  handle(Command command) {
    Command command = // build the command given the message
    masterQueue.submit(command)
  }
}

我假设由于每个命令都被提交给执行器,所以每个命令都将进入自己的线程,这样它就不会阻塞其他的线程。
但我不确定我是否应该像上面那样使用executablecommand执行命令中的每个操作。
另外,我不确定它是否能处理这种情况:线程池固定为5个线程。已执行5个命令。它们运行时间很长,使用不同的服务,但是底层服务没有被阻塞,仍然可以接受工作。有人试图执行第6个命令——不应该阻止它们,因为底层服务仍然可以接受工作。
有没有更好的办法?我走对了吗?

j8ag8udp

j8ag8udp1#

在这方面花了一点时间之后,我提出了一些可能的解决方案,可以使用执行者,也可以使用期货。现在还不确定哪一个会比另一个更好,但是因为我知道我可以扩展threadpoolexecutor(比如说,添加一个暂停特性),我可能会倾向于executors。
否则,如果任何人有意见,他们总是欢迎!
我现在把这两种解决方案都放在我的gh()中,但我也会把它们放在下面。https://github.com/tinatiel/concurrency-learning

期货执行

package futures;

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;

public class CommandActionExample {

    public static void main(String[] args) {

        // Initialize some starting params
        Random random = new Random();
        int maxActions = 20;
        int maxCommands = 5;

        // Generate some commands, with a random number of actions.
        // We'll use the indexes as the command and action names to keep it simple/readable
        List<Command> commands = new ArrayList<>();
        for(Integer c = 0; c < maxCommands; c++) {
            Command command = new Command(String.format("%d", c+1));
            for(Integer a = 0; a < random.nextInt(maxActions); a++) {
                Action action = new Action(random, String.format("%d", a+1));
                command.addAction(action);
            }
            commands.add(command);
        }

        // Print out the commands we'll execute, again to keep the results readable/understandable
        System.out.println("Commands to execute: \n" + commands.stream().map(Command::toString).collect(Collectors.joining("\n")) + "\n");

        // Build a Future that tries to execute all commands (supplied as Futures) in an arbitrary order
        try {
            CompletableFuture.allOf(commands.stream()
                    .map((Function<Command, CompletableFuture<Void>>) CompletableFuture::runAsync)
                    .collect(Collectors.toList())
                    .toArray(CompletableFuture[]::new)
            ).get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
//        commands.get(0).run(); // sanity check one of the command's actions run as expected

        // When we execute the results, the actions should be executed in-order within a command at some point in the future
        // (not started all at once), so something like:
        // 0  Command-2:Action-1 scheduled at 34
        // 0  Command-1:Action-1 scheduled at 21
        // 0  Command-3:Action-1 scheduled at 4
        // 4  Command-3:Action2 scheduled at ...
        // 21 Command-1:Action-2 scheduled at ...
        // 34 Command-1-Action-2 scheduled at ...
        // ...
        // Now how to test this...Maybe with JUnit inOrder.verify(...).run() ?

    }

    public static class Action implements Runnable {

        private Command command;
        private final Random random;
        private final String name;

        public Action(Random random, String name) {
            this.random = random;
            this.name = name;
        }

        public void setCommand(Command command) {
            this.command = command;
        }

        @Override
        public void run() {

            // Simply sleep for a random period of time. This simulates pieces of work being done (network request, etc.)
            long msTime = random.nextInt(1000);
            System.out.println(new Timestamp(System.currentTimeMillis()) + ": Command-" + command.name + ":Action-" + name + " executing on Thread '" + Thread.currentThread().getName() + "' executing for " + msTime + "ms");
            try {
                Thread.sleep(msTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        @Override
        public String toString() {
            return "Action{" +
                    "name='" + name + '\'' +
                    '}';
        }
    }

    public static class Command implements Runnable {

        private final String name;
        private final List<Action> actions = new ArrayList<>();

        public Command(String name) {
            this.name = name;
        }

        public void addAction(Action action) {
            action.setCommand(this);
            actions.add(action);
        }

        @Override
        public void run() {
            // If there are no actions, then do nothing
            if(actions.isEmpty()) return;

            // Build up a chain of futures.
            // Looks like we have to build them up in reverse order, so start with the first action...
            CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(actions.remove(0));

            // ...And then reverse the list and build the rest of the chain
            // (yes we could execute backwards...but it's not common and I/others probably don't like to reason about it)
            Collections.reverse(actions);
            for(int i=0; i< actions.size(); i++) {
                completableFuture.thenRun(actions.get(i));
            }

            // Execute our chain
            try {
                completableFuture.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

        @Override
        public String toString() {
            return "Command{" +
                    "name='" + name + '\'' +
                    ", actions=" + actions +
                    '}';
        }
    }

}

结果

输出和调度与预期一样,但似乎使用forkjoinpool。

Commands to execute: 
Command{name='1', actions=[Action{name='1'}, Action{name='2'}, Action{name='3'}]}
Command{name='2', actions=[Action{name='1'}, Action{name='2'}, Action{name='3'}, Action{name='4'}, Action{name='5'}, Action{name='6'}]}
Command{name='3', actions=[Action{name='1'}, Action{name='2'}, Action{name='3'}, Action{name='4'}, Action{name='5'}, Action{name='6'}, Action{name='7'}]}
Command{name='4', actions=[Action{name='1'}, Action{name='2'}, Action{name='3'}, Action{name='4'}, Action{name='5'}, Action{name='6'}]}
Command{name='5', actions=[Action{name='1'}, Action{name='2'}, Action{name='3'}, Action{name='4'}, Action{name='5'}, Action{name='6'}]}

2020-12-30 21:17:27.11: Command-2:Action-1 executing on Thread 'ForkJoinPool.commonPool-worker-5' executing for 207ms
2020-12-30 21:17:27.11: Command-4:Action-1 executing on Thread 'ForkJoinPool.commonPool-worker-9' executing for 930ms
2020-12-30 21:17:27.11: Command-1:Action-1 executing on Thread 'ForkJoinPool.commonPool-worker-3' executing for 948ms
2020-12-30 21:17:27.11: Command-3:Action-1 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 173ms
2020-12-30 21:17:27.11: Command-5:Action-1 executing on Thread 'ForkJoinPool.commonPool-worker-11' executing for 348ms
2020-12-30 21:17:27.314: Command-3:Action-2 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 890ms
2020-12-30 21:17:27.345: Command-2:Action-2 executing on Thread 'ForkJoinPool.commonPool-worker-5' executing for 178ms
2020-12-30 21:17:27.485: Command-5:Action-2 executing on Thread 'ForkJoinPool.commonPool-worker-11' executing for 702ms
2020-12-30 21:17:27.485: Command-5:Action-3 executing on Thread 'ForkJoinPool.commonPool-worker-15' executing for 161ms
2020-12-30 21:17:27.532: Command-2:Action-3 executing on Thread 'ForkJoinPool.commonPool-worker-5' executing for 201ms
2020-12-30 21:17:27.657: Command-5:Action-4 executing on Thread 'ForkJoinPool.commonPool-worker-15' executing for 257ms
2020-12-30 21:17:27.735: Command-2:Action-4 executing on Thread 'ForkJoinPool.commonPool-worker-5' executing for 518ms
2020-12-30 21:17:27.919: Command-5:Action-5 executing on Thread 'ForkJoinPool.commonPool-worker-15' executing for 258ms
2020-12-30 21:17:28.06: Command-4:Action-2 executing on Thread 'ForkJoinPool.commonPool-worker-9' executing for 926ms
2020-12-30 21:17:28.075: Command-1:Action-2 executing on Thread 'ForkJoinPool.commonPool-worker-3' executing for 413ms
2020-12-30 21:17:28.184: Command-5:Action-6 executing on Thread 'ForkJoinPool.commonPool-worker-15' executing for 77ms
2020-12-30 21:17:28.216: Command-3:Action-3 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 487ms
2020-12-30 21:17:28.263: Command-2:Action-5 executing on Thread 'ForkJoinPool.commonPool-worker-5' executing for 570ms
2020-12-30 21:17:28.497: Command-1:Action-3 executing on Thread 'ForkJoinPool.commonPool-worker-3' executing for 273ms
2020-12-30 21:17:28.716: Command-3:Action-4 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 302ms
2020-12-30 21:17:28.833: Command-2:Action-6 executing on Thread 'ForkJoinPool.commonPool-worker-5' executing for 202ms
2020-12-30 21:17:28.992: Command-4:Action-3 executing on Thread 'ForkJoinPool.commonPool-worker-9' executing for 733ms
2020-12-30 21:17:29.024: Command-3:Action-5 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 756ms
2020-12-30 21:17:29.727: Command-4:Action-4 executing on Thread 'ForkJoinPool.commonPool-worker-9' executing for 131ms
2020-12-30 21:17:29.78: Command-3:Action-6 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 920ms
2020-12-30 21:17:29.858: Command-4:Action-5 executing on Thread 'ForkJoinPool.commonPool-worker-9' executing for 305ms
2020-12-30 21:17:30.168: Command-4:Action-6 executing on Thread 'ForkJoinPool.commonPool-worker-9' executing for 612ms
2020-12-30 21:17:30.715: Command-3:Action-7 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 330ms

执行人执行

package executors;

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.stream.Collectors;

public class CommandActionExample {

    public static void main(String[] args) {

        // Initialize some starting params
        Random random = new Random();
        int maxActions = 20;
        int maxCommands = 5;

        // Generate some commands, with a random number of actions.
        // We'll use the indexes as the command and action names to keep it simple/readable
        List<Command> commands = new ArrayList<>();
        for(Integer c = 0; c < maxCommands; c++) {
            Command command = new Command(String.format("%d", c+1));
            for(Integer a = 0; a < random.nextInt(maxActions); a++) {
                Action action = new Action(random, String.format("%d", a+1));
                command.addAction(action);
            }
            commands.add(command);
        }

        // Print out the commands we'll execute, again to keep the results readable/understandable
        System.out.println("Commands to execute: \n" + commands.stream().map(Command::toString).collect(Collectors.joining("\n")) + "\n");

        ExecutorService executorService = Executors.newFixedThreadPool(20);
        for(Command command:commands) executorService.submit(command);

        // When we execute the results, the actions should be executed in-order within a command at some point in the future
        // (not started all at once), so something like:
        // 0  Command-2:Action-1 scheduled at 34
        // 0  Command-1:Action-1 scheduled at 21
        // 0  Command-3:Action-1 scheduled at 4
        // 4  Command-3:Action2 scheduled at ...
        // 21 Command-1:Action-2 scheduled at ...
        // 34 Command-1-Action-2 scheduled at ...
        // ...
        // Now how to test this...Maybe with JUnit inOrder.verify(...).run() ?

    }

    public static class Action implements Runnable {

        private Command command;
        private final Random random;
        private final String name;

        public Action(Random random, String name) {
            this.random = random;
            this.name = name;
        }

        public void setCommand(Command command) {
            this.command = command;
        }

        @Override
        public void run() {

            // Simply sleep for a random period of time. This simulates pieces of work being done (network request, etc.)
            long msTime = random.nextInt(1000);
            System.out.println(new Timestamp(System.currentTimeMillis()) + ": Command-" + command.name + ":Action-" + name + " executing on Thread '" + Thread.currentThread().getName() + "' executing for " + msTime + "ms");
            try {
                Thread.sleep(msTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        @Override
        public String toString() {
            return "Action{" +
                    "name='" + name + '\'' +
                    '}';
        }
    }

    public static class Command implements Runnable {

        private final String name;
        private final List<Action> actions = new ArrayList<>();

        public Command(String name) {
            this.name = name;
        }

        public void addAction(Action action) {
            action.setCommand(this);
            actions.add(action);
        }

        @Override
        public void run() {
            // If there are no actions, then do nothing
            if(actions.isEmpty()) return;

            ExecutorService executor = Executors.newSingleThreadExecutor(); // Because there is only one thread, this has the effect of executing sequentially and blocking
            for(Action action:actions) executor.submit(action);

        }

        @Override
        public String toString() {
            return "Command{" +
                    "name='" + name + '\'' +
                    ", actions=" + actions +
                    '}';
        }
    }

}

结果

输出和进度如预期

2020-12-31 09:43:09.952: Command-3:Action-1 executing on Thread 'pool-4-thread-1' executing for 632ms
2020-12-31 09:43:09.952: Command-4:Action-1 executing on Thread 'pool-3-thread-1' executing for 932ms
2020-12-31 09:43:09.952: Command-2:Action-1 executing on Thread 'pool-6-thread-1' executing for 586ms
2020-12-31 09:43:09.952: Command-5:Action-1 executing on Thread 'pool-5-thread-1' executing for 987ms
2020-12-31 09:43:09.952: Command-1:Action-1 executing on Thread 'pool-2-thread-1' executing for 706ms
2020-12-31 09:43:10.562: Command-2:Action-2 executing on Thread 'pool-6-thread-1' executing for 329ms
2020-12-31 09:43:10.608: Command-3:Action-2 executing on Thread 'pool-4-thread-1' executing for 503ms
2020-12-31 09:43:10.891: Command-2:Action-3 executing on Thread 'pool-6-thread-1' executing for 443ms
2020-12-31 09:43:10.9: Command-4:Action-2 executing on Thread 'pool-3-thread-1' executing for 866ms
2020-12-31 09:43:10.955: Command-5:Action-2 executing on Thread 'pool-5-thread-1' executing for 824ms
2020-12-31 09:43:11.346: Command-2:Action-4 executing on Thread 'pool-6-thread-1' executing for 502ms
2020-12-31 09:43:11.766: Command-4:Action-3 executing on Thread 'pool-3-thread-1' executing for 638ms
2020-12-31 09:43:11.779: Command-5:Action-3 executing on Thread 'pool-5-thread-1' executing for 928ms
2020-12-31 09:43:11.848: Command-2:Action-5 executing on Thread 'pool-6-thread-1' executing for 179ms
2020-12-31 09:43:12.037: Command-2:Action-6 executing on Thread 'pool-6-thread-1' executing for 964ms
2020-12-31 09:43:12.412: Command-4:Action-4 executing on Thread 'pool-3-thread-1' executing for 370ms
2020-12-31 09:43:12.709: Command-5:Action-4 executing on Thread 'pool-5-thread-1' executing for 204ms
2020-12-31 09:43:12.783: Command-4:Action-5 executing on Thread 'pool-3-thread-1' executing for 769ms
2020-12-31 09:43:12.913: Command-5:Action-5 executing on Thread 'pool-5-thread-1' executing for 188ms
2020-12-31 09:43:13.102: Command-5:Action-6 executing on Thread 'pool-5-thread-1' executing for 524ms
2020-12-31 09:43:13.555: Command-4:Action-6 executing on Thread 'pool-3-thread-1' executing for 673ms
2020-12-31 09:43:13.634: Command-5:Action-7 executing on Thread 'pool-5-thread-1' executing for 890ms
2020-12-31 09:43:14.23: Command-4:Action-7 executing on Thread 'pool-3-thread-1' executing for 147ms
2020-12-31 09:43:14.527: Command-5:Action-8 executing on Thread 'pool-5-thread-1' executing for 538ms

相关问题