在java中将[chained]completablefutures写入csv

gkn4icbw  于 2021-06-30  发布在  Java
关注(0)|答案(1)|浏览(266)

我有一个 HashMap<String, CompletableFuture<HashMap<String, String>>> 将项目Map到其属性,例如。 { "pizza" -> { "calories" -> "120", "fat" -> "12" } } ,其中从不同的数据源检索属性。
例如,我们得到 "fat" 属性,而我们得到 "calories" 来自solr的属性。
当我最初找回 "fat" 从数据库,我用 supplyAsync 为了不阻塞主线程,例如:

public CompletableFuture<HashMap<String, String>> getFat(String name, boolean) {
    return CompletableFuture.supplyAsync(new Supplier<HashMap<String, String>>() {
      @Override
      public HashMap<String, String> get() {
        HashMap<String, String> attributes = new HashMap<>();

        ... do work ...

        attributes.put(name, attributes);
        return attributes;
      }
   })
 }

然后,我通过对solr的异步调用将其链接起来,这样最终会有一个异步hashmap将项Map到它们的属性,即 HashMap<String, CompletableFuture<HashMap<String, String>>> itemsToAttributesMapping; (因此,我循环遍历hashmap的键,并使用新属性更新值,使用 thenApply ).
最后,我将数据Map到csv,这就是问题所在:

File file = new File(home + "/Downloads/rmsSkuValidationResults.csv");

       try{
          FileWriter outputfile = new FileWriter(file);
          CSVWriter writer = new CSVWriter(outputfile);

            for(String itemKey : itemsToAttributesMapping.keySet()) {
                itemsToAttributesMapping.get(itemKey).thenAccept(attributes -> {

                String[] row = { attributes.get("calories"),
                            attributes.get("fat")

                        ... more attributes ...

                        };
                writer.writeNext(row);
                });
            }

         writer.close();
      }
      catch(Exception e){
        e.printStackTrace();
      }

按原样,打印到csv文件对于大约800-1100个项目可以正常工作,但之后停止写入,程序终止。
我尝试了以上的变体,包括使用 get 而不是 thenAccept ,或添加 join 之后 thenAccept 导致程序挂起(异步计算很快,不应该挂起)。
我还尝试存储 thenAccepts 我跑,然后打电话 allOf 但这会导致奇怪的行为,即solr的属性在几百个项目之后不再显示。数据库中的属性确实会显示在每个项目上,这让我首先想到 supplyAsync 总是有效的,但是随后 thenApply 将属性添加到原始 HashMap<String, CompletableFuture<HashMap<String, String>>> itemsToAttributesMapping; 由供应商提供 supplyAsnc 停止工作。
如果您能深入了解问题所在,我们将不胜感激。也许我对完全未来的理解是错误的,特别是在链接和解析未来方面?这可能是超时问题,或者线程正在丢失?最后一个我扩展的方法表明,也许问题是 thenApply 什么?

wnvonmuf

wnvonmuf1#

下面是上面代码的大致说明:

get(itemKey1) then at some unspecified time in the future writeNext(attr1)
get(itemKey2) then at some unspecified time in the future writeNext(attr2)
get(itemKey3) then at some unspecified time in the future writeNext(attr3)
get(itemKey4) then at some unspecified time in the future writeNext(attr4)
get(itemKey5) then at some unspecified time in the future writeNext(attr5)
get(itemKey6) then at some unspecified time in the future writeNext(attr6)
get(itemKey7) then at some unspecified time in the future writeNext(attr7)
attr1 finally delivered; writeNext(attr1)
get(itemKey8) then at some unspecified time in the future writeNext(attr8)
attr2 finally delivered; writeNext(attr2)
attr3 finally delivered; writeNext(attr3)
get(itemKey9) then at some unspecified time in the future writeNext(attr9)
no more items; writer.close()
attr4 finally delivered; oops, writer closed
attr5 finally delivered; oops, writer closed
attr6 finally delivered; oops, writer closed
attr7 finally delivered; oops, writer closed
attr8 finally delivered; oops, writer closed
attr9 finally delivered; oops, writer closed

你说过你试过了 .get() 以及 .join() . 这将基本上使程序同步,但这是一个很好的调试步骤。它会将执行改为:

get(itemKey1) then at some unspecified time in the future writeNext(attr1)
attr1 finally delivered; writeNext(attr1)
get(itemKey2) then at some unspecified time in the future writeNext(attr2)
attr2 finally delivered; writeNext(attr2)
get(itemKey3) then at some unspecified time in the future writeNext(attr3)
attr3 finally delivered; writeNext(attr3)
...
...
...
get(itemKey9) then at some unspecified time in the future writeNext(attr9)
attr9 finally delivered; writeNext(attr9)
no more items; writer.close()

这应该管用的。将输出添加到每个阶段( thenApply 你表现得不如 thenAccept )透露?真的像你说的那么快吗?
请显示更多代码。尤其是链条部分,如果你认为问题可能出在那里的话。

相关问题