java在hadoop上实现apriori算法

kyvafyod  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(329)

我试图用hadoop实现apriori算法。我已经实现了apriori算法的非分布式版本,但是我对hadoop和mapreduce的不熟悉导致了一些问题。
我想要实现算法的方法分为两个阶段:
1) 在第一阶段,map reduce作业将对原始事务数据集进行操作。此阶段的输出是一个包含所有1项集及其对1的支持的文件。
2) 在第二阶段,我想读入前一阶段的输出,然后构造新的项集。重要的是,我想在mapper中确定在数据集中是否仍然找到任何新的项集。我设想,如果我将原始数据集作为输入发送到Map器,它将对原始文件进行分区,以便每个Map器只扫描部分数据集。然而,候选列表需要根据前一阶段的所有输出构建。这将在循环中迭代固定次数的过程。
我的问题是如何具体地确保我能够访问每个Map器中的完整项集,以及能够访问原始数据集来计算每个阶段中的新支持。
感谢您的任何建议、意见、建议或回答。
编辑:根据反馈,我只想更具体地说明我在这里问什么。

xpcnnkqh

xpcnnkqh1#

在开始之前,我建议您阅读hadoop map reduce教程。
步骤1:将数据文件加载到hdfs。假设您的数据是txt文件,每一组都是一行。

a b c
a c d e
a e f
a f z
...

步骤2:按照MapReduce教程构建自己的apriori类。

public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {
  // Seprate the line into tokens by space
  StringTokenizer itr = new StringTokenizer(value.toString());
  while (itr.hasMoreTokens()) {
    // Add the token into a writable set
    ... put the element into a writable set ...
  }
  context.write(word, one);
}

步骤3:运行mapreducejar文件。输出将在hdfs中的一个文件中。你会有这样的感觉:

a b 3 (number of occurrence)
a b c 5
a d 2
...

根据输出文件,可以计算关系。
另一方面,您可能需要考虑使用比map reduce更高级别的抽象,比如级联或apachespark。

ryhaxcpt

ryhaxcpt2#

我使用hadoop流在apachespark和hadoopmapreduce中实现了aes算法。我知道这和apriori不一样,但是你可以尝试使用我的方法。
使用hadoop streming mapreduce实现aes的简单示例。
aes-hadoop流的项目结构
1n\u reducer.py/1n\u combiner是相同的代码,但没有约束。

import sys

CONSTRAINT = 1000

def do_reduce(word, _values):
    return word, sum(_values)

prev_key = None
values = []

for line in sys.stdin:
    key, value = line.split("\t")
    if key != prev_key and prev_key is not None:
        result_key, result_value = do_reduce(prev_key, values)
        if result_value > CONSTRAINT:
            print(result_key + "\t" + str(result_value))
        values = []
    prev_key = key
    values.append(int(value))

if prev_key is not None:
    result_key, result_value = do_reduce(prev_key, values)
    if result_value > CONSTRAINT:
        print(result_key + "\t" + str(result_value))

基本Map器.py:

import sys

def count_usage():
    for line in sys.stdin:
        elements = line.rstrip("\n").rsplit(",")
        for item in elements:
            print("{item}\t{count}".format(item=item, count=1))

if __name__ == "__main__":
    count_usage()

2n\u mapper.py使用上一次迭代的结果。在回答您的问题时,您可以读取上一次迭代的输出,以这样的方式形成项集。

import itertools
import sys

sys.path.append('.')
N_DIM = 2

def get_2n_items():
    items = set()
    with open("part-00000") as inf:
        for line in inf:
            parts = line.split('\t')
            if len(parts) > 1:
                items.add(parts[0])

    return items

def count_usage_of_2n_items():
    all_items_set = get_2n_items()
    for line in sys.stdin:
        items = line.rstrip("\n").rsplit(",")  # 74743 43355 53554
        exist_in_items = set()
        for item in items:
            if item in all_items_set:
                exist_in_items.add(item)
        for combination in itertools.combinations(exist_in_items, N_DIM):
            combination = sorted(combination)
            print("{el1},{el2}\t{count}".format(el1=combination[0], el2=combination[1], count=1))

if __name__ == "__main__":
    count_usage_of_2n_items()

根据我的经验,如果唯一组合(项目集)的数量太大(100k+),apriori算法不适合hadoop。如果您发现了一个使用hadoopmapreduce(流媒体或javamapreduce实现)实现apriori算法的优雅解决方案,请与社区分享。
如果你需要更多的代码片段请要求。

相关问题