java 多线程结果的顺序处理

vaqhlq81  于 2023-08-02  发布在  Java
关注(0)|答案(1)|浏览(148)

我正在设置一个Sping Boot 应用程序(使用@RepositoriesDAO 模式),我试图在其中编写一个@Service,以便在多个线程中异步地从数据库中提取数据,并按顺序合并处理传入的有效负载,最好是在有效负载到达时进行合并处理。
目标是为需要单独查询多个不重叠的过滤条件集,但需要进行后处理(例如,转换聚合)为组合结果。
由于我对Java还很陌生,并且来自Golang及其用于多线程和任务通信的相对琐碎的语法,我很难在Java和Sping Boot 中找到一个更好的API--或者确定这种方法是否更适合开始。

问题:

给予

  • a控制器
@RestController
@RequestMapping("/api")
public class MyController {

  private final MyService myService;

  @Autowired
  public MyController(MyService myService) {
      this.myService = myService;
  }

  @PostMapping("/processing")
  public DeferredResult<MyResult> myHandler(@RequestBody MyRequest myRequest) {
      DeferredResult<MyResult> myDeferredResult = new DeferredResult<>();
      myService.myProcessing(myRequest, myDeferredResult);

      return myDeferredResult;
}

字符串

  • a服务
import com.acme.parallel.util.MyDataTransformer

@Service
public class MyServiceImpl implementing MyService {

  private final MyRepository myRepository;

  @Autowired
  public MyService(MyRepository myRepository) {
      this.myRepository = myRepository;
  }

  public void myProcessing(MyRequest myRequest, MyDeferredResult myDeferredResult) {
    MyDataTransformer myDataTransformer = new MyDataTransformer();

    /* PLACEHOLDER CODE
    for (MyFilter myFilter : myRequest.getMyFilterList()) {
      // MyPartialResult myPartialResult = myRepository.myAsyncQuery(myFilter);

      // myDataTransformer.transformMyPartialResult(myPartialResult);
    }
    */

    myDeferredResult.setResult(myDataTransformer.getMyResult());
  }
}

*存储库

@Repository
public class MyRepository {

  public MyPartialResult myAsyncQuery(MyFilter myFilter) {
    // for the sake of an example
    return new MyPartialResult(myFilter, TakesSomeAmountOfTimeToQUery.TRUE);
  }
}

  • 以及一个MyDataTransformer帮助器类:
public class MyDataTransformer {

  private final MyResult myResult = new MyResult();  // e.g. a Map

  public void transformMyPartialResult(MyPartialResult myPartialResult) {
    /* PLACEHOLDER CODE
    this.myResult.transformAndMergeIntoMe(myPartialResult);
    */
  }
}


我如何实施

  • 异步和多线程的MyService.myProcessing方法,以及
  • MyDataTransformer.transformMyPartialResult方法顺序/线程安全
  • (或重新设计以上)

将传入的MyPartialResult合并为一个MyResult

尝试次数:

最简单的解决方案 * 似乎 * 是跳过 “到达” 部分,通常首选的实现方式可能是,例如是:

public void myProcessing(MyRequest myRequest, MyDeferredResult myDeferredResult) {
  MyDataTransformer myDataTransformer = new MyDataTransformer();
  
  List<CompletableFuture<myPartialResult>> myPartialResultFutures = new ArrayList<>();

  for (MyFilter myFilter : myRequest.getMyFilterList()) {     // Stream is the way they say, but I like for
    myPartialResultFutures.add(CompletableFuture.supplyAsync(() -> myRepository.myAsyncQuery(myFilter));
  }

  myPartialResultFutures.stream()
    .map(CompletableFuture::join)
    .map(myDataTransformer::transformMyPartialResult);
      
  myDeferredResult.setResult(myDataTransformer.getMyResult());
}


然而,如果可行的话,我希望在传入的有效负载 * 到达时 * 按顺序处理它们,因此我目前正在尝试这样的方法:

public void myProcessing(MyRequest myRequest, MyDeferredResult myDeferredResult) {
  MyDataTransformer myDataTransformer = new MyDataTransformer();
  
  List<CompletableFuture<myPartialResult>> myPartialResultFutures = new ArrayList<>();

  for (MyFilter myFilter : myRequest.getMyFilterList()) {
    myPartialResultFutures.add(CompletableFuture.supplyAsync(() -> myRepository.myAsyncQuery(myFilter).thenAccept(myDataTransformer::transformMyPartialResult));
  }

  myPartialResultFutures.forEach(CompletableFuture::join);
      
  myDeferredResult.setResult(myDataTransformer.getMyResult());
}


但我不明白在调用myDataTransformer.transformMyPartialResult时是否需要实现任何线程安全协议,以及如何实现,或者这是否有意义,性能方面。

更新:

基于这样的假设

  • myRepository.myAsyncQuery所需的时间量略有不同,并且
  • myDataTransformer.transformMyPartialResult每次呼叫所用的时间不断增加

实现 * 线程安全/原子类型/对象 *,例如一个ConcurrentHashMap

public class MyDataTransformer {

  private final ConcurrentMap<K, V> myResult = new ConcurrentHashMap<K, V>();

  public void transformMyPartialResult(MyPartialResult myPartialResult) {
    myPartialResult.myRows.stream()
      .map((row) -> this.myResult.merge(row[0], row[1], Integer::sum)));
  }
}


进入 * 后一次**尝试 *(处理 “到达”):

public void myProcessing(MyRequest myRequest, MyDeferredResult myDeferredResult) {
  MyDataTransformer myDataTransformer = new MyDataTransformer();
  
  List<CompletableFuture<myPartialResult>> myPartialResultFutures = new ArrayList<>();

  for (MyFilter myFilter : myRequest.getMyFilterList()) {
    myPartialResultFutures.add(CompletableFuture.supplyAsync(() -> myRepository.myAsyncQuery(myFilter).thenAccept(myDataTransformer::transformMyPartialResult));
  }

  myPartialResultFutures.forEach(CompletableFuture::join);
      
  myDeferredResult.setResult(myDataTransformer.getMyResult());
}

  • 比先等待所有线程快了一个数量级 *,即使有原子性协议开销也是如此。

现在,这一点 * 可能 * 已经很明显了(但并不是最终的选择,因为异步/多线程处理并不总是更好的选择),我很高兴这种方法是一种有效的选择。
剩下的是在我看来像是一个蹩脚的、缺乏灵活性的解决方案--或者至少是一个丑陋的解决方案。是否有更好的方法?

bvjxkvbb

bvjxkvbb1#

我们希望在查询结果可用时立即处理它,所以thenApply。这里,查询数据库的线程可能会立即进行数据转换,这与前面提到的方法不同,最后查询的partailResult可以合并在一个reduce操作中。

public static void service() {
        //myRequest.getMyFilterList()
        List<Integer> inputList = Arrays.asList(1, 2, 3);

        List<CompletableFuture<PartialResultTransformed>> partialProcessedResult = inputList.stream()
                .map(input -> CompletableFuture.supplyAsync(() -> repository.query(input)))
                .map(future -> future.thenApply(queryResult -> datatransformer.transform(queryResult)))
                .collect(Collectors.toList());

        List<PartialResultTransformed> allPartialResult = partialProcessedResult.stream()
                .map(CompletableFuture::join) //todo:handle exception scenario
                .collect(Collectors.toList());

        // final transformation of partial results
        //mergePartailResults;
       /* allPartialResult.stream()
                        .reduce();*/
    }

字符串

相关问题