我已配置 Apache Flume 接收消息( JSON 类型)输入 HTTP 来源。我的Flume坏了 MongoDB 以及 HBase .如何根据指定的字段将消息写入不同的集合和表?例如:假设我们有 T_1 以及 T_2 . 现在有一个传入的消息应该保存在 T_1 . 如何处理这些消息并将其分配到保存位置?
Apache Flume
JSON
HTTP
MongoDB
HBase
T_1
T_2
eqoofvh91#
您可以使用regex标记消息类型+多路复用将其发送到正确的目的地。示例,基于消息“test”字符串/字段的正则表达式
agent.sources.s1.interceptors.i1.type=regex_extractor agent.sources.s1.interceptors.i1.regex=(TEST1)
将拦截器分配给序列化程序se1
agent.sources.s1.interceptors.i1.serializers=SE1 agent.sources.s1.intercetpros.i1.serializers.SE1.name=Test
发送到所需的通道,通道(c1,c2)可以Map到不同的接收器
agent.sources.s1.selector.type=multiplexing agent.sources.s1.selector.header=Test agent.sources.s1.selector.mapping.Test=c1
测试正则表达式的所有事件都将转到通道c1,其他事件将默认为c2
agent.sources.s1.selector.default=c2
yfjy0ee72#
尝试使用 Multiplexing Channel Selector . 默认值( Replicating Channel Selector 将源生成的flume事件复制到其所有配置的通道。然而,复用器能够根据flume事件内的报头的值将事件放入特定信道。为了根据应用程序逻辑创建这样的头,您需要为httpsource创建一个自定义处理程序。通过实现 HttpSourceHandler api的接口。
Multiplexing Channel Selector
Replicating Channel Selector
HttpSourceHandler
2条答案
按热度按时间eqoofvh91#
您可以使用regex标记消息类型+多路复用将其发送到正确的目的地。示例,基于消息“test”
字符串/字段的正则表达式
将拦截器分配给序列化程序se1
发送到所需的通道,通道(c1,c2)可以Map到不同的接收器
测试正则表达式的所有事件都将转到通道c1,其他事件将默认为c2
yfjy0ee72#
尝试使用
Multiplexing Channel Selector
. 默认值(Replicating Channel Selector
将源生成的flume事件复制到其所有配置的通道。然而,复用器能够根据flume事件内的报头的值将事件放入特定信道。为了根据应用程序逻辑创建这样的头,您需要为httpsource创建一个自定义处理程序。通过实现
HttpSourceHandler
api的接口。