在写入 parquet 时触发超时

e5nszbig  于 2022-09-21  发布在  Spark
关注(0)|答案(1)|浏览(154)

我在Databricks中运行此笔记本时超时。写给Parquet的最后一步大约需要15-18分钟,然后才会发生超时错误。我不确定哪里出了问题。

from pyspark.sql.functions import explode, sequence

# Create hours string

spark.sql(f"select explode(array('00', '01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23')) as hh").createOrReplaceTempView('hours')

# Create minutes string

spark.sql(f"select explode(array('00', '01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '40', '41', '42', '43', '44', '45', '46', '47', '48', '49', '50', '51', '52', '53', '54', '55', '56', '57', '58', '59')) as mm").createOrReplaceTempView('minutes')

# Create seconds string

spark.sql(f"select explode(array('00', '01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '40', '41', '42', '43', '44', '45', '46', '47', '48', '49', '50', '51', '52', '53', '54', '55', '56', '57', '58', '59')) as ss").createOrReplaceTempView('seconds')

# Create Time string, add hour, minute, second

spark.sql(f"select CAST(CONCAT(hours.hh, ':', minutes.mm, ':', seconds.ss) as string) as Time, explode(sequence(0,23,1)) as Hour from hours cross join minutes cross join seconds ").createOrReplaceTempView('time1')
spark.sql(f"select *, explode(sequence(0,59,1)) as Minute from time1").createOrReplaceTempView('time2')
spark.sql(f"select *, explode(sequence(0,59,1)) as Second from time2").createOrReplaceTempView('time3')

# Add TimeID

spark.sql(f"select row_number() over (order by TIME) as TimeID, * from time3").createOrReplaceTempView('src')

# Add HourDescription

spark.sql(f"select *, CONCAT(CASE date_part('HOUR', Time) WHEN 0 THEN '00' WHEN 1 THEN '01' WHEN 2 THEN '02' WHEN 3 THEN '03' WHEN 4 THEN '04' WHEN 5 THEN '05' WHEN 6 THEN '06' WHEN 7 THEN '07' WHEN 8 THEN '08' WHEN 9 THEN '09' END, ':00') as HourDescription from src").createOrReplaceTempView('src1')

# Add HourBucket

spark.sql(f"select *, CONCAT(HourDescription, ' - ', CONCAT(CASE date_part('HOUR', Time) WHEN 0 THEN '01' WHEN 1 THEN '02' WHEN 2 THEN '03' WHEN 3 THEN '04' WHEN 4 THEN '05' WHEN 5 THEN '06' WHEN 6 THEN '07' WHEN 7 THEN '08' WHEN 8 THEN '09' WHEN 9 THEN '10' WHEN 10 THEN '11' WHEN 11 THEN '12' WHEN 12 THEN '13' WHEN 13 THEN '14' WHEN 14 THEN '15' WHEN 15 THEN '16' WHEN 16 THEN '17' WHEN 17 THEN '18' WHEN 18 THEN '19' WHEN 19 THEN '20' WHEN 20 THEN '21' WHEN 21 THEN '22' WHEN 22 THEN '23' WHEN 23 THEN '00' END, ':00')) as HourBucket from src1").createOrReplaceTempView('src2')

# Add DayPart

spark.sql(f"select *, CASE WHEN (Hour >= 0  AND Hour < 6) THEN 'Night' WHEN (Hour >= 6  AND Hour < 12) THEN 'Morning' WHEN (Hour >= 12  AND Hour < 18) THEN 'Afternoon' ELSE 'Evening' END as DayPart FROM src2").createOrReplaceTempView('src3')

# Add BusinessHour

spark.sql(f"select *, CASE WHEN (Hour >= 8  AND Hour < 18) THEN 'Yes' ELSE 'No' END as BusinessHour FROM src3").createOrReplaceTempView('src_final')

# Write to Parquet

df = sqlContext.sql("select * from src_final");
df.write.parquet("/mnt/xxx/xx/xxx/")
kh212irz

kh212irz1#

我想通了。爆炸(顺序)花了很大力气。尤其是那一分钟的那个刚一插手。我按如下方式修复了代码:


# Create Time string, add hour, minute, second

spark.sql(f"select CAST(CONCAT(hours.hh, ':', minutes.mm, ':', seconds.ss) as string) as Time, cast(hours.hh as int) as Hour, cast(minutes.mm as int) as Minute, cast(seconds.ss as int) as Second from hours cross join minutes cross join seconds ").createOrReplaceTempView('time')

相关问题