如何实现或加入hadoop(滚烫/层叠)

7ajki6be  于 2021-06-03  发布在  Hadoop
关注(0)|答案(5)|浏览(271)

只需将连接字段作为一个reducer键发送,就可以很容易地通过单个键连接数据集。但是用几个键连接至少一个应该相同的记录对我来说不是那么容易。
例如,我有日志,我想按用户参数对它们进行分组,我想按(ipaddress、sessionid、visitorcockies)加入它们
因此,如果log1.ip==log2.ip或log1.session=log2.session或log1.cockie=log2.coockie,则log1应与log2分组。也许有可能创建复合密钥或一些概率方法,如minhash。。。
有可能吗?

xlpyo6sf

xlpyo6sf1#

你能详细描述一下“通过多个键连接记录”吗?
如果您知道工作流中可以连接特定键的点,那么最好的方法可能是定义一个具有多个连接的流,而不是尝试操作一个复杂的数据结构,以便在一个步骤中解析n个键。
下面是一个示例应用程序,它演示了如何在级联中处理不同类型的连接:https://github.com/cascading/copa

5ktev3wc

5ktev3wc2#

对于级联,我最终创建了一个过滤器来检查内部条件的输出是否为真。级联过滤器输出真/假值,可以选择使用。

z6psavjg

z6psavjg3#

在创建了joe所描述的独立连接之后,需要除去重复的连接。如果数据中的两个元组在“or join”中使用的所有字段中相等,则它们是重复的。因此,如果在表示所有相关字段的键上执行自然联接,则会将所有重复项分组在一起。因此,可以用相应元组的一次出现来替换它们。
让我们看一个例子:假设你有元组和字段(a,b,c,d),你感兴趣的字段是a,b,和c。首先分别对a、b和c进行等距联接。对于每一个,您都要将初始元组流与其自身连接起来。用(a0,b0,c0,d0)表示第一个流,用(a1,b1,c1,d1)表示第二个流。结果将是元组(a0、b0、c0、d0、a1、b1、c1、d1)。对于这些元组中的每一个,您都要创建一个元组(a0a1b0b1c0c1,a0,b0,c0,d0,a1,b1,c1,d1),这样所有重复的元组都将在后续的缩减器中分组在一起。对于每个组,只返回其中一个元组。

waxmsbnn

waxmsbnn4#

问题是,mapreduce连接通常是通过为某些字段上匹配的记录提供相同的reduce键来实现的,以便将它们发送到相同的reducer。所以任何能绕过这个的东西都会有点麻烦,但有可能。。。
下面是我的建议:对于每个输入记录,生成三个副本,每个副本都有一个新的“key”字段,前缀是它来自的字段。例如,假设您有以下输入:

(ip=1.2.3.4, session=ABC, cookie=123)
(ip=3.4.5.6, session=DEF, cookie=456)

然后你就会产生

(ip=1.2.3.4, session=ABC, cookie=123, key=ip_1.2.3.4)
(ip=1.2.3.4, session=ABC, cookie=123, key=session_ABC)
(ip=1.2.3.4, session=ABC, cookie=123, key=cookie_123)
(ip=3.4.5.6, session=DEF, cookie=456, key=ip_3.4.5.6)
(ip=3.4.5.6, session=DEF, cookie=456, key=session_DEF)
(ip=3.4.5.6, session=DEF, cookie=456, key=cookie_456)

然后你就可以在这个新领域进行分组了。
我不太熟悉滚烫/层叠(虽然我一直想了解更多),但这肯定符合hadoop中通常的连接方式。

n3h0vuf2

n3h0vuf25#

提示:使用类型别名使您的代码易于阅读
注意0:这个解决方案特别好,因为它总是只有一个Map的作业,即使有更多的键要加入。
注1:假设每个管道都没有重复的键,否则您必须使'key也有一个索引,它来自哪个日志,mapto将是一个flatmapto,而且有点复杂。
注意2:为了简单起见,这将丢弃连接字段,为了保留它们,您需要一个大而难看的元组(ip1、ip2、session1、session2等等)。如果你真的想,我可以写一个例子,让他们。
注3:如果你真的想合并重复的值,你可以在logentry1和logentry2后面各加一个groupby,生成一个logentrylist,然后cat(正如在注解中提到的,这对于连接是不正常的)。这将再创建两个Map作业。

type String2 = (String, String)
type String3 = (String, String, String)

def addKey(log: Pipe): Pipe = log.flatMap[String3, String](('ip, 'session, 'cookie) -> 'key)(
  _.productIterator.toList.zipWithIndex.map {
    case (key: String, index: Int) => index.toString + key
  }
)

(addKey(log1) ++ addKey(log2)).groupBy('key)(_.toList[String]('logEntry -> 'group))
.mapTo[Iterable[String], String2]('group -> ('logEntry1, 'logEntry2))(list => (list.head, list.last))

相关问题