我重写了部分代码以使用dataset而不是RDD,但是有些操作的性能显著下降。
例如:
val filtered = trips.filter(t => exportFilter.check(t)).cache()
似乎要慢得多,cpu大多空闲:
为什么会这样?在尝试访问普通对象时使用数据集是个坏主意吗?
更新:
以下是过滤器检查方法:
override def check(trip: Trip): Boolean = {
if (trip == null || !trip.isCompleted) {
return false
}
// Return if no extended filter configured or we already
if (exportConfiguration.isBasicFilter) {
return trip.isCompleted
}
// Here trip is completed, check other conditions
// Filter out trips from future
val isTripTimeOk = checkTripTime(trip)
return isTripTimeOk
}
/**
* Trip time should have end time today or inside yesterday midnight interval
*/
def checkTripTime(trip: Trip): Boolean = {
// Check inclusive trip low bound. Should have end time today or inside yesterday midnight interval
val isLowBoundOk = tripTimingProcessor.isLaterThanYesterdayMidnightIntervalStarts(trip.getEndTimeMillis)
if (!isLowBoundOk) {
updateLowBoundMetrics(trip)
return false
}
// Check trip high bound
val isHighBoundOk = tripTimingProcessor.isBeforeMidnightIntervalStarts(trip.getEndTimeMillis)
if (!isHighBoundOk) {
metricService.inc(trip.getStartTimeMillis, trip.getProviderId,
ExportMetricName.TRIPS_EXPORTED_S3_SKIPPED_END_INSIDE_MIDNIGHT_INTERVAL)
}
return isHighBoundOk
}
private def updateLowBoundMetrics(trip: Trip) = {
metricService.inc(trip.getStartTimeMillis, trip.getProviderId,
ExportMetricName.TRIPS_EXPORTED_S3_SKIPPED_END_BEFORE_YESTERDAY_MIDNIGHT_INTERVAL)
val pointIter = trip.getPoints.iterator()
while (pointIter.hasNext()) {
val point = pointIter.next()
metricService.inc(point.getCaptureTimeMillis, point.getProviderId,
ExportMetricName.POINT_EXPORTED_S3_SKIPPED_END_BEFORE_YESTERDAY_MIDNIGHT_INTERVAL)
}
}
暂无答案!
目前还没有任何答案,快来回答吧!