scala构建的反向索引超过了java堆的大小

x6yk4ghg  于 2021-06-03  发布在  Hadoop
关注(0)|答案(0)|浏览(277)

这可能是一个非常特殊的情况,但在我的头上敲了一会儿后,我想从stackoverflow社区得到帮助。
我正在为大数据集(大系统中一天的数据)建立一个倒排索引。倒排索引的建立在hadoop上作为一个map-reduce作业来执行。利用scala建立倒排索引。倒排指数结构如下: {key:"New", ProductID:[1,2,3,4,5,...]} 这些被写入avro文件。
在这个过程中,我遇到了java堆大小的问题。我认为原因是像我上面所展示的“new”这样的术语包含了大量的productid。我有一个粗略的想法,这个问题可能发生在我的scala代码中:

def toIndexedRecord(ids: List[Long], token: String): IndexRecord = {
    val javaList = ids.map(l => l: java.lang.Long).asJava //need to convert from scala long to java long
    new IndexRecord(token, javaList)
  }

我就是这样使用这个方法的(它在很多地方都有使用,但是代码结构和登录名都是一样的)

val titles = textPipeDump.map(vf => (vf.itemId, normalizer.customNormalizer(vf.title + " " + vf.subTitle).trim))
    .flatMap {
    case (id, title) =>
      val ss = title.split("\\s+")
      ss.map(word => (word, List(id)))
  }
    .filter(f => f._2.nonEmpty)
    .group
    .sum
    .map {
    case (token, ids) =>
      toIndexedRecord(ids, token)
  }
``` `textPipeDump` 烫伤了吗 `MultipleTextLine` field对象

case class MultipleTextLineFiles(p : String*) extends FixedPathSource(p:_*) with TextLineScheme

我有一个case类来拆分和获取文本行中的字段,这就是对象 `ss` 这是我的堆栈跟踪:

Exception in thread "IPC Client (47) connection to /127.0.0.1:55977 from job_201306241658_232590" java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.io.IOUtils.closeStream(IOUtils.java:226)
at org.apache.hadoop.ipc.Client$Connection.close(Client.java:903)
at org.apache.hadoop.ipc.Client$Connection.run(Client.java:800)
28079664 [main] ERROR cascading.flow.stream.TrapHandler - caught Throwable, no trap available, rethrowing
cascading.pipe.OperatorException: [WritableSequenceFile(h...][com.twitter.scalding.GroupBuilder$$anonfun$1.apply(GroupBuilder.scala:189)] operator Every failed executing operation: MRMAggregator[decl:'value']
at cascading.flow.stream.AggregatorEveryStage.receive(AggregatorEveryStage.java:136)
at cascading.flow.stream.AggregatorEveryStage.receive(AggregatorEveryStage.java:39)
at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:49)
at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:28)
at cascading.flow.hadoop.stream.HadoopGroupGate.run(HadoopGroupGate.java:90)
at cascading.flow.hadoop.FlowReducer.reduce(FlowReducer.java:133)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:520)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:421)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1178)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: java.lang.OutOfMemoryError: Java heap space
at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176)
at scala.collection.immutable.List.$colon$colon$colon(List.scala:127)
at scala.collection.immutable.List.$plus$plus(List.scala:193)
at com.twitter.algebird.ListMonoid.plus(Monoid.scala:86)
at com.twitter.algebird.ListMonoid.plus(Monoid.scala:84)
at com.twitter.scalding.KeyedList$$anonfun$sum$1.apply(TypedPipe.scala:264)
at com.twitter.scalding.MRMAggregator.aggregate(Operations.scala:279)
at cascading.flow.stream.AggregatorEveryStage.receive(AggregatorEveryStage.java:128)
... 12 more

当我为小数据集执行map reduce作业时,我没有得到错误。这意味着随着数据量的增加,我为新的或旧的等词建立索引的项目/产品标识的数量会变大,从而导致堆大小溢出。
因此,问题是如何避免java堆大小溢出并完成这项任务。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题