我正在设置一个Sping Boot 应用程序(使用@Repositories
的 DAO 模式),我试图在其中编写一个@Service
,以便在多个线程中异步地从数据库中提取数据,并按顺序合并处理传入的有效负载,最好是在有效负载到达时进行合并处理。
目标是为需要单独查询多个不重叠的过滤条件集,但需要进行后处理(例如,转换聚合)为组合结果。
由于我对Java还很陌生,并且来自Golang及其用于多线程和任务通信的相对琐碎的语法,我很难在Java和Sping Boot 中找到一个更好的API--或者确定这种方法是否更适合开始。
问题:
给予
- a控制器:
@RestController
@RequestMapping("/api")
public class MyController {
private final MyService myService;
@Autowired
public MyController(MyService myService) {
this.myService = myService;
}
@PostMapping("/processing")
public DeferredResult<MyResult> myHandler(@RequestBody MyRequest myRequest) {
DeferredResult<MyResult> myDeferredResult = new DeferredResult<>();
myService.myProcessing(myRequest, myDeferredResult);
return myDeferredResult;
}
字符串
- a服务:
import com.acme.parallel.util.MyDataTransformer
@Service
public class MyServiceImpl implementing MyService {
private final MyRepository myRepository;
@Autowired
public MyService(MyRepository myRepository) {
this.myRepository = myRepository;
}
public void myProcessing(MyRequest myRequest, MyDeferredResult myDeferredResult) {
MyDataTransformer myDataTransformer = new MyDataTransformer();
/* PLACEHOLDER CODE
for (MyFilter myFilter : myRequest.getMyFilterList()) {
// MyPartialResult myPartialResult = myRepository.myAsyncQuery(myFilter);
// myDataTransformer.transformMyPartialResult(myPartialResult);
}
*/
myDeferredResult.setResult(myDataTransformer.getMyResult());
}
}
型
*存储库:
@Repository
public class MyRepository {
public MyPartialResult myAsyncQuery(MyFilter myFilter) {
// for the sake of an example
return new MyPartialResult(myFilter, TakesSomeAmountOfTimeToQUery.TRUE);
}
}
型
- 以及一个MyDataTransformer帮助器类:
public class MyDataTransformer {
private final MyResult myResult = new MyResult(); // e.g. a Map
public void transformMyPartialResult(MyPartialResult myPartialResult) {
/* PLACEHOLDER CODE
this.myResult.transformAndMergeIntoMe(myPartialResult);
*/
}
}
型
我如何实施
- 异步和多线程的
MyService.myProcessing
方法,以及 MyDataTransformer.transformMyPartialResult
方法顺序/线程安全- (或重新设计以上)
将传入的MyPartialResult
合并为一个MyResult
?
尝试次数:
最简单的解决方案 * 似乎 * 是跳过 “到达” 部分,通常首选的实现方式可能是,例如是:
public void myProcessing(MyRequest myRequest, MyDeferredResult myDeferredResult) {
MyDataTransformer myDataTransformer = new MyDataTransformer();
List<CompletableFuture<myPartialResult>> myPartialResultFutures = new ArrayList<>();
for (MyFilter myFilter : myRequest.getMyFilterList()) { // Stream is the way they say, but I like for
myPartialResultFutures.add(CompletableFuture.supplyAsync(() -> myRepository.myAsyncQuery(myFilter));
}
myPartialResultFutures.stream()
.map(CompletableFuture::join)
.map(myDataTransformer::transformMyPartialResult);
myDeferredResult.setResult(myDataTransformer.getMyResult());
}
型
然而,如果可行的话,我希望在传入的有效负载 * 到达时 * 按顺序处理它们,因此我目前正在尝试这样的方法:
public void myProcessing(MyRequest myRequest, MyDeferredResult myDeferredResult) {
MyDataTransformer myDataTransformer = new MyDataTransformer();
List<CompletableFuture<myPartialResult>> myPartialResultFutures = new ArrayList<>();
for (MyFilter myFilter : myRequest.getMyFilterList()) {
myPartialResultFutures.add(CompletableFuture.supplyAsync(() -> myRepository.myAsyncQuery(myFilter).thenAccept(myDataTransformer::transformMyPartialResult));
}
myPartialResultFutures.forEach(CompletableFuture::join);
myDeferredResult.setResult(myDataTransformer.getMyResult());
}
型
但我不明白在调用myDataTransformer.transformMyPartialResult
时是否需要实现任何线程安全协议,以及如何实现,或者这是否有意义,性能方面。
更新:
基于这样的假设
myRepository.myAsyncQuery
所需的时间量略有不同,并且myDataTransformer.transformMyPartialResult
每次呼叫所用的时间不断增加
实现 * 线程安全/原子类型/对象 *,例如一个ConcurrentHashMap
:
public class MyDataTransformer {
private final ConcurrentMap<K, V> myResult = new ConcurrentHashMap<K, V>();
public void transformMyPartialResult(MyPartialResult myPartialResult) {
myPartialResult.myRows.stream()
.map((row) -> this.myResult.merge(row[0], row[1], Integer::sum)));
}
}
型
进入 * 后一次**尝试 *(处理 “到达”):
public void myProcessing(MyRequest myRequest, MyDeferredResult myDeferredResult) {
MyDataTransformer myDataTransformer = new MyDataTransformer();
List<CompletableFuture<myPartialResult>> myPartialResultFutures = new ArrayList<>();
for (MyFilter myFilter : myRequest.getMyFilterList()) {
myPartialResultFutures.add(CompletableFuture.supplyAsync(() -> myRepository.myAsyncQuery(myFilter).thenAccept(myDataTransformer::transformMyPartialResult));
}
myPartialResultFutures.forEach(CompletableFuture::join);
myDeferredResult.setResult(myDataTransformer.getMyResult());
}
型
- 比先等待所有线程快了一个数量级 *,即使有原子性协议开销也是如此。
现在,这一点 * 可能 * 已经很明显了(但并不是最终的选择,因为异步/多线程处理并不总是更好的选择),我很高兴这种方法是一种有效的选择。
剩下的是在我看来像是一个蹩脚的、缺乏灵活性的解决方案--或者至少是一个丑陋的解决方案。是否有更好的方法?
1条答案
按热度按时间bvjxkvbb1#
我们希望在查询结果可用时立即处理它,所以
thenApply
。这里,查询数据库的线程可能会立即进行数据转换,这与前面提到的方法不同,最后查询的partailResult可以合并在一个reduce操作中。字符串