我在hbase中有超过60亿的社交媒体数据(包括内容/时间/作者和其他可能的字段),在48台服务器中有4100个区域,我现在需要将这些数据导入elasticsearch。
我很清楚es的bulkapi,用mapreduce在java中使用bulk仍然需要很多天(至少一周左右)。我可以用spark来代替,但我不认为它会有什么帮助。
我想知道有没有其他技巧可以将这些大数据写入elasticsearch?像手动写入es索引文件和使用某种恢复来加载本地文件系统中的文件?
谢谢你的建议。
有关我的群集环境的一些详细信息:
spark 1.3.1独立版(我可以在Yarn上更改为使用spark 1.6.2或1.6.3)
hadoop 2.7.1(hdp 2.4.2.258)
ElasticSearch2.3.3
2条答案
按热度按时间2g32fytz1#
afaik spark是下面两个选项中索引的最佳选项。除此之外,我还将提供以下方法:
划分(输入扫描标准)并征服60亿社交媒体数据:
我建议用不同的搜索条件创建多个spark/mapreduce工作(根据类别或其他内容将60亿社交媒体数据划分为6个部分),并并行触发它们。例如,基于数据捕获时间范围(scan.settimerange(t1,t2))或者使用一些模糊行逻辑(fuzzyrowfilter),肯定会加快速度。
或
流媒体方法的种类:
您还可以考虑在通过spark或mapreduce插入数据时同时为它们创建索引。
例如,在solr的情况下:clouder有nrt hbase lily indexer。。。i、 当hbase表同时基于wal(write-ahead log)条目填充时,它将创建solr索引。检查一下有没有这样的东西,以便ElasticSearch。
即使它不存在的es以及,不必麻烦,而摄取数据它自己使用Spark/mapreduce程序,你可以自己创建。
方案1:
我建议如果你对spark满意的话,spark支持hadoop2.1中es的本地集成是一个很好的解决方案。看见
elasticsearch hadoop提供了elasticsearch和apachespark之间的本机集成,其形式是rdd(弹性分布式数据集)(确切地说是成对rdd),可以从elasticsearch读取数据。rdd提供了两种风格:一种用于scala(使用scala集合以tuple2的形式返回数据),另一种用于java(使用包含java.util集合的tuple2的形式返回数据)。
这里的示例与wards上1.3版本的spark不同
hbase以外的更多样本
选项2:正如你所知,比spark慢一点
将数据写入elasticsearch使用elasticsearch hadoop,map/reduce作业可以将数据写入elasticsearch,使其可以通过索引进行搜索。elasticsearch hadoop支持(所谓的)新旧hadoop API。
bbmckpt72#
我自己找到了一个实用的技巧来提高批量索引的性能。
我可以计算客户端中的哈希路由,并确保每个批量请求包含具有相同路由的所有索引请求。根据路由结果和ip的shard信息,将批量请求直接发送到相应的shard节点。此技巧可以避免批量重路由开销,并减少可能导致esrejectedexception的批量请求线程池占用。
例如,我在不同的机器上有48个节点。假设我向任何节点发送一个包含3000个索引请求的批量请求,这些索引请求将根据路由重新路由到其他节点(通常是所有节点)。客户端线程需要等待整个进程的完成,包括处理本地批量和等待其他节点的批量响应。然而,如果没有重路由阶段,网络成本就没有了(除了转发到副本节点),客户端只需要等待更少的时间。同时,假设我只有1个副本,则大容量线程的总占用量只有2个客户端->主碎片和主碎片->副本碎片)
路由哈希:
shard\u num=3\u hash(\u routing)%num\u primary\u shard
试着看看:
org.elasticsearch.cluster.routing.Murmur3HashFunction
客户机可以通过对catapi的请求获得碎片和索引别名。碎片信息网址:猫碎片
别名Mapurl:cat别名
注意事项:
es可能会更改不同版本中的默认哈希函数,这意味着客户端代码可能不兼容版本。
这个技巧基于散列结果基本平衡的假设。
客户端应该考虑容错性,例如到相应的shard节点的连接超时。