我正在尝试使用hortonworks沙盒为pig中的poc实现简单的数据处理流程。
想法如下:有一些已经处理过的数据集。新数据集应添加到旧数据中,且不存在重复项。
出于测试目的,我使用非常小的数据集(小于10kb)。对于虚拟机,我已经分配了4gb的ram和4个处理器内核中的2个。
这是我的Pig剧本:
-- CONFIGURABLE PROPERTIES
%DEFAULT atbInput '/user/hue/ATB_Details/in/1'
%DEFAULT atbOutputBase '/user/hue/ATB_Details/out/1'
%DEFAULT atbPrevOutputBase '/user/hue/ATB_Details/in/empty'
%DEFAULT validData 'valid'
%DEFAULT invalidData 'invalid'
%DEFAULT billDateDimensionName 'tmlBillingDate'
%DEFAULT admissionDateDimensionName 'tmlAdmissionDate'
%DEFAULT dischargeDateDimensionName 'tmlDischargeDate'
%DEFAULT arPostDateDimensionName 'tmlARPostDate'
%DEFAULT patientTypeDimensionName 'dicPatientType'
%DEFAULT patientTypeCodeDimensionName 'dicPatientTypeCode'
REGISTER bdw-all-deps-1.0.jar;
DEFINE toDateDimension com.epam.bigdata.etl.udf.ToDateDimension();
DEFINE toCodeDimension com.epam.bigdata.etl.udf.ToCodeDimension();
DEFINE isValid com.epam.bigdata.etl.udf.atbdetails.IsValidFunc();
DEFINE isGarbage com.epam.bigdata.etl.udf.atbdetails.IsGarbageFunc();
DEFINE toAccounntBalanceCategory com.epam.bigdata.etl.udf.atbdetails.ToBalanceCategoryFunc();
DEFINE isEndOfMonth com.epam.bigdata.etl.udf.IsLastDayOfMonthFunc();
DEFINE toBalanceCategoryId com.epam.bigdata.etl.udf.atbdetails.ToBalanceCategoryIdFunc();
rawData = LOAD '$atbInput';
--CLEANSING
SPLIT rawData INTO garbage IF isGarbage($0),
cleanLines OTHERWISE;
splitRecords = FOREACH cleanLines GENERATE FLATTEN(STRSPLIT($0, '\\|'));
cleanData = FOREACH splitRecords GENERATE
$0 AS Id:LONG,
$1 AS FacilityName:CHARARRAY,
$2 AS SubFacilityName:CHARARRAY,
$3 AS PeriodDate:CHARARRAY,
$4 AS AccountNumber:CHARARRAY,
$5 AS RAC:CHARARRAY,
$6 AS ServiceTypeCode:CHARARRAY,
$7 AS ServiceType:CHARARRAY,
$8 AS AdmissionDate:CHARARRAY,
$9 AS DischargeDate:CHARARRAY,
$10 AS BillDate:CHARARRAY,
$11 AS PatientTypeCode:CHARARRAY,
$12 AS PatientType:CHARARRAY,
$13 AS InOutType:CHARARRAY,
$14 AS FinancialClassCode:CHARARRAY,
$15 AS FinancialClass:CHARARRAY,
$16 AS SystemIPGroupCode:CHARARRAY,
$17 AS SystemIPGroup:CHARARRAY,
$18 AS CurrentInsuranceCode:CHARARRAY,
$19 AS CurrentInsurance:CHARARRAY,
$20 AS InsuranceCode1:CHARARRAY,
$21 AS InsuranceBalance1:DOUBLE,
$22 AS InsuranceCode2:CHARARRAY,
$23 AS InsuranceBalance2:DOUBLE,
$24 AS InsuranceCode3:CHARARRAY,
$25 AS InsuranceBalance3:DOUBLE,
$26 AS InsuranceCode4:CHARARRAY,
$27 AS InsuranceBalance4:DOUBLE,
$28 AS InsuranceCode5:CHARARRAY,
$29 AS InsuranceBalance5:DOUBLE,
$30 AS AgingBucket:CHARARRAY,
$31 AS AccountBalance:DOUBLE,
$32 AS TotalCharges:DOUBLE,
$33 AS TotalPayments:DOUBLE,
$34 AS EstimatedRevenue:DOUBLE,
$35 AS CreateDateTime:CHARARRAY,
$36 AS UniqueFileId:LONG,
$37 AS PatientBalance:LONG,
$38 AS VendorCode:CHARARRAY;
--VALIDATION
SPLIT cleanData INTO validData IF isValid(*),
invalidData OTHERWISE;
--Dimension update--
--MACROS
DEFINE mergeDateDimension(validDataSet, dimensionFieldName, previousDimensionFile) RETURNS merged {
dates = FOREACH $validDataSet GENERATE $dimensionFieldName;
oldDimensions = LOAD '$previousDimensionFile' USING PigStorage('|') AS (
id:LONG,
monthName:CHARARRAY,
monthId:INT,
year:INT,
fiscalYear:INT,
originalDate:CHARARRAY);
oldOriginalDates = FOREACH oldDimensions GENERATE originalDate;
allDates = UNION dates, oldOriginalDates;
uniqueDates = DISTINCT allDates;
$merged = FOREACH uniqueDates GENERATE toDateDimension($0);
};
DEFINE mergeCodeDimension(validDataSet, dimensionFieldName, previousDimensionFile, outputIdField) RETURNS merged {
newCodes = FOREACH $validDataSet GENERATE $dimensionFieldName as newCode;
oldDim = LOAD '$previousDimensionFile' USING PigStorage('|') AS (
id:LONG,
code:CHARARRAY);
allCodes = COGROUP oldDim BY code, newCodes BY newCode;
grouped = FOREACH allCodes GENERATE
(IsEmpty(oldDim) ? 0L : SUM(oldDim.id)) as id,
group AS code;
ranked = RANK grouped BY id DESC, code DESC DENSE;
$merged = FOREACH ranked GENERATE
((id == 0L) ? $0 : id) as $outputIdField,
code AS $dimensionFieldName;
};
--DATE DIMENSIONS
billDateDim = mergeDateDimension(validData, BillDate, '$atbPrevOutputBase/dimensions/$billDateDimensionName');
STORE billDateDim INTO '$atbOutputBase/dimensions/$billDateDimensionName';
admissionDateDim = mergeDateDimension(validData, AdmissionDate, '$atbPrevOutputBase/dimensions/$admissionDateDimensionName');
STORE admissionDateDim INTO '$atbOutputBase/dimensions/$admissionDateDimensionName';
dischDateDim = mergeDateDimension(validData, DischargeDate, '$atbPrevOutputBase/dimensions/$dischargeDateDimensionName');
STORE dischDateDim INTO '$atbOutputBase/dimensions/$dischargeDateDimensionName';
arPostDateDim = mergeDateDimension(validData, PeriodDate, '$atbPrevOutputBase/dimensions/$arPostDateDimensionName');
STORE arPostDateDim INTO '$atbOutputBase/dimensions/$arPostDateDimensionName';
--CODE DIMENSION
patientTypeDim = mergeCodeDimension(validData, PatientType, '$atbPrevOutputBase/dimensions/$patientTypeDimensionName', PatientTypeId);
STORE patientTypeDim INTO '$atbOutputBase/dimensions/$patientTypeDimensionName' USING PigStorage('|');
patientTypeCodeDim = mergeCodeDimension(validData, PatientTypeCode, '$atbPrevOutputBase/dimensions/$patientTypeCodeDimensionName', PatientTypeCodeId);
STORE patientTypeCodeDim INTO '$atbOutputBase/dimensions/$patientTypeCodeDimensionName' USING PigStorage('|');
问题是,当我运行这个脚本时,它永远不会完成(卡住)。在作业浏览器中,我可以看到一个已完成的作业和多个进度为0%的作业。
如果我注解掉了最后三个文件的处理—一切都正常(即三个并行作业成功)。
我尝试了几种方法来解决这个问题:
-无\u multiquery pig参数-允许一次仅使用一个作业完全执行脚本。主要缺点是生成的作业数量太多(26个)和执行时间太长(描述的脚本大约15分钟,更复杂的版本大约40分钟)。
只使用我开发和测试的部分,通过注解掉其他部分-这不是长期观点的选择。
更改mapred-site.xml中的mapred.capacity-scheduler.maximum-system-jobs属性,这样一次应该少于三个作业,如下所述。
更改capacity-scheduler.xml中的mapred.capacity-scheduler.queue.default.maximum-capacity以配置默认队列。但这种方法对我的效果不如以前。
为沙盒虚拟机、Map器和缩减器分配更多内存-没有效果。
所以我的问题是如何限制由pig脚本启动的并发作业的数量?或者可能有其他配置修复程序允许并发执行多个作业?
[更新]
如果我从shell控制台用相同的输入数据运行相同的脚本-一切都正常。所以我认为色调有问题。
[更新]
如果我从控制台启动更复杂的脚本,它也会卡住,但在这种情况下,并行作业的数量是8。
2条答案
按热度按时间csbfibhn1#
上次我们看到这个是因为集群只有一个map任务。
sbdsn5lh2#
您可以使用exec,如下所述:
http://pig.apache.org/docs/r0.11.1/perf.html#implicit-依赖项