构建不适合内存的流

dz6r00yl  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(592)

这是关于这个问题的后续问题:用于大型列表的spark flatmap函数
总结:我想在java8中编写一个spark flatmap函数,它生成与一组dna序列匹配的所有可能的正则表达式。对于巨大的字符串,这是有问题的,因为regex集合无法放入内存(一个Map器很容易生成千兆字节的数据)。我知道我必须求助于像一个懒惰的序列,我想我必须使用一个 Stream<String> 为了这个。我现在的问题是如何建立这个流。我在这里看了一下:java streams-stream.builder。
如果我的算法开始生成模式,它们可以用 accept(String) 方法,但当我在链接(用字符串生成器函数替换)中尝试代码时,我注意到随机字符串生成函数之前执行过 build() 被称为。我不明白如果这些随机字符串不能放入内存中,它们是如何存储的。
我必须以不同的方式构建流吗?基本上我想拥有我的 context.write(substring) 我的脑子里有 MapReduce Mapper.map 功能。
update1:不能使用range函数,事实上我使用的是一个在后缀树上迭代的结构。
更新2:在要求一个更完整的实现时,我没有用实际的实现替换接口,因为实现非常大,对于理解这个思想来说不是必需的。
更完整的问题草图:
我的算法试图发现dna序列上的模式。该算法采用与同一基因相对应的不同生物的序列。假设我在五月有一个基因a,在水稻和其他一些物种中有相同的基因a,然后我比较它们的上游序列。我要寻找的模式类似于正则表达式,例如tga..ga..ga。为了探索所有可能的模式,我从序列中构建了一个广义后缀树。此树提供有关模式发生的不同序列的信息。为了将树与搜索算法解耦,我实现了某种迭代器结构:treenavigator。它具有以下接口:

interface TreeNavigator {
        public void jumpTo(char c); //go from pattern p to p+c (c can be a dot from a regex or [AC] for example)
        public void backtrack(); //pop the last character
        public List<Position> getMatches();
        public Pattern trail(); //current pattern p
    }

interface SearchSpace {
        //degrees of freedom in regex, min and maxlength,...
    public boolean inSearchSpace(Pattern p); 
    public Alphabet getPatternAlphabet();
}

interface ScoreCalculator {
    //calculate a score, approximately equal to the number of occurrences of the pattern
    public Score calcConservationScore(TreeNavigator t);
}

//Motif algorithm code which is run in the MapReduce Mapper function:
public class DiscoveryAlgorithm {
    private Context context; //MapReduce context object to write to disk
    private Score minScore;

    public void runDiscovery(){
    //depth first traveral of pattern space A, AA, AAA,... AAC, ACA, ACC and so fort
        exploreSubTree(new TreeNavigator());
    }

    //branch and bound for pattern space, if pattern occurs too little, stop searching
    public boolean survivesBnB(Score s){
        return s.compareTo(minScore)>=0;
    }

    public void exploreSubTree(Navigator nav){
        Pattern current = nav.trail();
        Score currentScore = ScoreCalculator.calc(nav);

        if (!survivesBnB(currentScore)}{
           return;
        }

        if (motif in searchspace)
            context.write(pattern);

        //iterate over all possible extensions: A,C,G,T, [AC], [AG],... [ACGT]
        for (Character c in SearchSpace.getPatternAlphabet()){
             nav.jumpTo(c);
             exploreSubTree(nav);
             nav.backtrack();
        }
    }
}

完整mapreduce源@https://github.com/drdwitte/cloudspeller/ 相关研究论文:http://www.ncbi.nlm.nih.gov/pubmed/26254488
更新3:我一直在读关于创建流的方法。从我目前所读到的内容来看,我认为我必须将rundiscovery()重写为一个供应商。然后可以通过streamsupport类将此供应商转换为流。

cgvd09ve

cgvd09ve1#

另一种方法是@lukaseder解决方案,我认为它更有效:

IntStream.range(0, string.length())
    .mapToObj(start -> IntStream.rangeClosed(start+1, string.length())
            .mapToObj(end -> string.substring(start, end)))
    .flatMap(Function.identity())
    .forEach(System.out::println);

已请求按基准更新,如下所示(java 8u45、x64,用于字符串长度10、100、1000):

Benchmark                  (len)  Mode  Cnt      Score     Error  Units
SubstringTest.LukasEder       10  avgt   30      1.947 ±   0.012  us/op
SubstringTest.LukasEder      100  avgt   30    151.660 ±   0.524  us/op
SubstringTest.LukasEder     1000  avgt   30  52405.761 ± 183.921  us/op
SubstringTest.TagirValeev     10  avgt   30      1.712 ±   0.018  us/op
SubstringTest.TagirValeev    100  avgt   30    138.179 ±   5.063  us/op
SubstringTest.TagirValeev   1000  avgt   30  48188.499 ± 107.321  us/op

好吧,@lukaseder解决方案只慢了8-13%,这可能没那么多。

xvw2m8pv

xvw2m8pv2#

下面是对您的需求的一个简单、懒惰的评估:

public static void main(String[] args) {
    String string = "test";

    IntStream.range(0, string.length())
             .boxed()
             .flatMap(start -> IntStream
                 .rangeClosed(start + 1, string.length())
                 .mapToObj(stop -> new AbstractMap.SimpleEntry<>(start, stop))
             )
             .map(e -> string.substring(e.getKey(), e.getValue()))
             .forEach(System.out::println);
}

它产生:

t
te
tes
test
e
es
est
s
st
t

说明:

// Generate "start" values between 0 and the length of your string
IntStream.range(0, string.length())
         .boxed()

// Combine each "start" value with a "stop" value that is between start + 1 and the length
// of your string
         .flatMap(start -> IntStream
             .rangeClosed(start + 1, string.length())
             .mapToObj(stop -> new AbstractMap.SimpleEntry<>(start, stop))
         )

// Convert the "start" / "stop" value tuple to a corresponding substring
         .map(e -> string.substring(e.getKey(), e.getValue()))
         .forEach(System.out::println);

相关问题