给出了一个将csv转换为Parquet地板(从和到s3)的应用程序,只需很少的转换:
for table in tables:
df_table = spark.read.format('csv') \
.option("header", "true") \
.option("escape", "\"") \
.load(path)
df_one_seven_thirty_days = df_table \
.filter(
(df_table['date'] == fn.to_date(fn.lit(one_day))) \
| (df_table['date'] == fn.to_date(fn.lit(seven_days))) \
| (df_table['date'] == fn.to_date(fn.lit(thirty_days)))
)
for i in df_one_seven_thirty_days.schema.names:
df_one_seven_thirty_days = df_one_seven_thirty_days.withColumnRenamed(i, colrename(i).lower())
df_one_seven_thirty_days.createOrReplaceTempView(table)
df_sql = spark.sql("SELECT * FROM "+table)
df_sql.write \
.mode("overwrite").format('parquet') \
.partitionBy("customer_id", "date") \
.option("path", path) \
.saveAsTable(adwords_table)
我在使用spark电子病历时遇到了困难。
在使用spark submit的local上,运行起来没有困难(140mb的数据),而且速度非常快。但在电子病历上,这是另一回事。
第一个“adwords\u表”将被转换,没有问题,但第二个保持空闲。
我浏览了emr提供的spark jobs ui,我注意到一旦完成这个任务:
列出187个路径的叶文件和目录:
spark杀死所有执行者:
20分钟后什么也没发生。所有任务都处于“已完成”状态,没有新任务开始。我在等saveastable启动。
我的本地机器是8核15gb,集群由10个节点r3.4x1组成:32 vcore、122 gib内存、320 ssd gb存储ebsstorage:200 gib
配置正在使用 maximizeResourceAllocation
是的,我只将--num executors/--executor cores改为5
有人知道为什么集群进入“空闲”状态而没有完成任务吗(它最终会崩溃而不会出错(3小时后)
编辑:我删除了所有的胶水目录连接,并将hadoop降级为使用:hadoop-aws:2.7.3
现在saveastable工作得很好,但是一旦它完成,我看到执行器被删除,集群处于空闲状态,这个步骤就没有完成。
所以我的问题还是一样。
2条答案
按热度按时间w8f9ii691#
经过多次尝试和头痛之后,我发现集群仍然在运行/处理。它实际上是在尝试写入数据,但只从主节点写入数据。
令人惊讶的是,这不会显示在ui上,它给人一种无所事事的印象。
无论我做什么(重新分区(1)、更大的集群等),写作都要花上几个小时。
这里的主要问题是saveastable,我不知道它在做什么,这会花费这么长的时间,或者写得这么慢。
因此,我在集群本地使用write.parquet(“hdfs:///tmp\u loc”),然后处理以使用aws
s3-dist-cp
从hdfs到s3文件夹。性能非常出色,我从saveastable(每120mb写17k行需要3到5个小时)变成了3分钟。
由于数据/模式可能会在某个时候发生变化,所以我只是从一个sql请求执行一个glue save。
6ie5vjzr2#
我也面临着同样的问题,这个问题是否与emr5.27的新版本有关?对我来说,这项工作也被一个执行器卡住了很长时间。它完成了所有99%的执行器,这发生在读取文件时。