使用 sparklyr
在hadoop集群(不是vm)上的包中,我正在处理几种类型的表,这些表需要连接、过滤等。。。我想知道什么是使用 dplyr
命令以及中的数据管理功能 sparklyr
若要运行处理,请将其存储在缓存中,并使用中间数据对象生成保留在缓存中的下游对象。这个问题是表面的,因为它上面提出的,但我希望得到更多的信息比纯粹的效率,所以如果你想编辑我的问题,我同意。。。
我在 hive 里有几张table,我们叫他们吧 Activity2016
, Accounts2016
,和 Accounts2017
. “accounts”表还包括地址历史记录。我想从2016年的数据开始,合并姓名和当前地址的两个表,过滤一些活动和帐户详细信息,然后合并2017年帐户信息的两种不同方式,特别是统计留在自己地址的人数与更改地址的人数。我们有数百万行,所以我们用我们的星火团做这个活动。
首先,我要做的是:
sc <- spark_connect()
Activity2016 %>% filter(COL1 < Cut1 & COL1 > Cut2) %>%
select(NAME,ADDRESS1) %>%
inner_join(Accounts2016,c("NAME"="NAME","ADDRESS1"="ADDRESS1")) %>%
distinct(NAME,ADDRESS1) %>% sdf_register("JOIN2016")
tbl_cache(sc,"JOIN2016")
JOINED_2016 <- tbl(sc, "JOIN2016")
Acct2017 = tbl(sc, "HiveDB.Accounts2017")
# Now, I can run:
JOINED_2016 %>% inner_join(Acct2017,c("NAME"="NAME","ADDRESS1"="ADDRESS2")) %>%
distinct(NAME,ADDRESS1.x) %>% sdf_register("JOIN2017")
# Rinse & Repeat
tbl_cache(sc,"JOIN2017")
JOINED_2017 <- tbl(sc,"JOIN2017")
然后我继续与 JOINED_2016
以及 JOINED_2017
,使用 dplyr
动词等。。。
这里似乎存在多重低效。。。比如,1)我不应该直接把它发送到缓存,并把它作为变量调用吗?2) 难道我不能直接把它送到一个书面的Hive表吗?3) 如何将最终对象强制转换为运行basic R
命令,如 table(JOINED_2016$COL1)
或者这些是不可用的(我在尝试时会出错) %>% select(COL1) %>% table
)?
如果下游有错误,我不写,数据就会丢失。。。但我觉得有太多的选择如何写数据,我不清楚。它什么时候会变成缓存对象,而不是 RDD
,相对于一个配置单元内部/外部表,相对于一个sparkDataframe,对于r处理这些数据对象的能力,每个都有什么限制?
例如,如果我只是运行:
JOIN2016 <- Activity2016 %>% filter(COL1 < Cut1 & COL1 > Cut2) %>%
select(NAME,ADDRESS1) %>%
inner_join(Accounts2016,c("NAME"="NAME","ADDRESS1"="ADDRESS1")) %>%
distinct(NAME,ADDRESS1)
这会是一个r吗 data.frame
? (可能会使我的网关节点的ram崩溃。。。所以我不愿意尝试。这是一个企业集群)
所以总结一下:我应该为 tbl
以及 tbl_cache
我需要命令吗?
我应该用吗 dbWriteTable
我能在之后、之前或代替之前直接做吗 sdf_register
... 或者我需要使用 tbl
在我可以写任何东西到Hive之前? sdf_register
几乎毫无意义。
我应该用吗 copy_to
或者 db_copy_to
而不是 dbWriteTable
? 我不想把Hive变成一个垃圾场,所以我要小心我如何写中间数据,然后在我存储数据之后保持一致。
这是哪一个 data.frame
-我必须运行类型才能像内存中的r对象一样处理数据,还是仅限于 dplyr
命令?
抱歉,这个问题太多了,但我觉得在r-bloggers的文章中,这些问题并不是很清楚 sparklyr
教程,也不是关于软件的其他问题。
1条答案
按热度按时间7fyelxc51#
sdf_register
在处理长时间运行的查询时不是很有用。它基本上是一个非物质化视图,这意味着每次调用它时它都会运行底层查询。添加以下内容将把数据作为表写入配置单元。spark_dataframe %>% invoke("write") %>% invoke("saveAsTable", as.character("your_desired_table_name"))
它使用saveAsTable
as表,它将在中创建一个表,并在spark会话结束后保留该表。使用createOrReplaceTempView
在spark会话结束时不持久化数据。