scala—使用数据集与使用spark的rdd相比的性能缺点

px9o7tmv  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(249)

我重写了部分代码以使用dataset而不是RDD,但是有些操作的性能显著下降。
例如:

val filtered = trips.filter(t => exportFilter.check(t)).cache()

似乎要慢得多,cpu大多空闲:

为什么会这样?在尝试访问普通对象时使用数据集是个坏主意吗?
更新:
以下是过滤器检查方法:

override def check(trip: Trip): Boolean = {
    if (trip == null || !trip.isCompleted) {
      return false
    }

    // Return if no extended filter configured or we already
    if (exportConfiguration.isBasicFilter) {
      return trip.isCompleted
    }

    // Here trip is completed, check other conditions

    // Filter out trips from future
    val isTripTimeOk = checkTripTime(trip)
    return isTripTimeOk
  }

  /**
   * Trip time should have end time today or inside yesterday midnight interval
   */
  def checkTripTime(trip: Trip): Boolean = {

    // Check inclusive trip low bound. Should have end time today or inside yesterday midnight interval
    val isLowBoundOk = tripTimingProcessor.isLaterThanYesterdayMidnightIntervalStarts(trip.getEndTimeMillis)
    if (!isLowBoundOk) {
      updateLowBoundMetrics(trip)
      return false
    }

    // Check trip high bound
    val isHighBoundOk = tripTimingProcessor.isBeforeMidnightIntervalStarts(trip.getEndTimeMillis)
    if (!isHighBoundOk) {
      metricService.inc(trip.getStartTimeMillis, trip.getProviderId,
        ExportMetricName.TRIPS_EXPORTED_S3_SKIPPED_END_INSIDE_MIDNIGHT_INTERVAL)
    }

    return isHighBoundOk

  }

  private def updateLowBoundMetrics(trip: Trip) = {
    metricService.inc(trip.getStartTimeMillis, trip.getProviderId,
      ExportMetricName.TRIPS_EXPORTED_S3_SKIPPED_END_BEFORE_YESTERDAY_MIDNIGHT_INTERVAL)
    val pointIter = trip.getPoints.iterator()
    while (pointIter.hasNext()) {
      val point = pointIter.next()
      metricService.inc(point.getCaptureTimeMillis, point.getProviderId,
        ExportMetricName.POINT_EXPORTED_S3_SKIPPED_END_BEFORE_YESTERDAY_MIDNIGHT_INTERVAL)
    }
  }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题