我在Databricks中运行此笔记本时超时。写给Parquet的最后一步大约需要15-18分钟,然后才会发生超时错误。我不确定哪里出了问题。
from pyspark.sql.functions import explode, sequence
# Create hours string
spark.sql(f"select explode(array('00', '01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23')) as hh").createOrReplaceTempView('hours')
# Create minutes string
spark.sql(f"select explode(array('00', '01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '40', '41', '42', '43', '44', '45', '46', '47', '48', '49', '50', '51', '52', '53', '54', '55', '56', '57', '58', '59')) as mm").createOrReplaceTempView('minutes')
# Create seconds string
spark.sql(f"select explode(array('00', '01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '40', '41', '42', '43', '44', '45', '46', '47', '48', '49', '50', '51', '52', '53', '54', '55', '56', '57', '58', '59')) as ss").createOrReplaceTempView('seconds')
# Create Time string, add hour, minute, second
spark.sql(f"select CAST(CONCAT(hours.hh, ':', minutes.mm, ':', seconds.ss) as string) as Time, explode(sequence(0,23,1)) as Hour from hours cross join minutes cross join seconds ").createOrReplaceTempView('time1')
spark.sql(f"select *, explode(sequence(0,59,1)) as Minute from time1").createOrReplaceTempView('time2')
spark.sql(f"select *, explode(sequence(0,59,1)) as Second from time2").createOrReplaceTempView('time3')
# Add TimeID
spark.sql(f"select row_number() over (order by TIME) as TimeID, * from time3").createOrReplaceTempView('src')
# Add HourDescription
spark.sql(f"select *, CONCAT(CASE date_part('HOUR', Time) WHEN 0 THEN '00' WHEN 1 THEN '01' WHEN 2 THEN '02' WHEN 3 THEN '03' WHEN 4 THEN '04' WHEN 5 THEN '05' WHEN 6 THEN '06' WHEN 7 THEN '07' WHEN 8 THEN '08' WHEN 9 THEN '09' END, ':00') as HourDescription from src").createOrReplaceTempView('src1')
# Add HourBucket
spark.sql(f"select *, CONCAT(HourDescription, ' - ', CONCAT(CASE date_part('HOUR', Time) WHEN 0 THEN '01' WHEN 1 THEN '02' WHEN 2 THEN '03' WHEN 3 THEN '04' WHEN 4 THEN '05' WHEN 5 THEN '06' WHEN 6 THEN '07' WHEN 7 THEN '08' WHEN 8 THEN '09' WHEN 9 THEN '10' WHEN 10 THEN '11' WHEN 11 THEN '12' WHEN 12 THEN '13' WHEN 13 THEN '14' WHEN 14 THEN '15' WHEN 15 THEN '16' WHEN 16 THEN '17' WHEN 17 THEN '18' WHEN 18 THEN '19' WHEN 19 THEN '20' WHEN 20 THEN '21' WHEN 21 THEN '22' WHEN 22 THEN '23' WHEN 23 THEN '00' END, ':00')) as HourBucket from src1").createOrReplaceTempView('src2')
# Add DayPart
spark.sql(f"select *, CASE WHEN (Hour >= 0 AND Hour < 6) THEN 'Night' WHEN (Hour >= 6 AND Hour < 12) THEN 'Morning' WHEN (Hour >= 12 AND Hour < 18) THEN 'Afternoon' ELSE 'Evening' END as DayPart FROM src2").createOrReplaceTempView('src3')
# Add BusinessHour
spark.sql(f"select *, CASE WHEN (Hour >= 8 AND Hour < 18) THEN 'Yes' ELSE 'No' END as BusinessHour FROM src3").createOrReplaceTempView('src_final')
# Write to Parquet
df = sqlContext.sql("select * from src_final");
df.write.parquet("/mnt/xxx/xx/xxx/")
1条答案
按热度按时间kh212irz1#
我想通了。爆炸(顺序)花了很大力气。尤其是那一分钟的那个刚一插手。我按如下方式修复了代码: