缓慢变化的维度-在hive中实现scd1和scd2

vfhzx4xs  于 2021-05-29  发布在  Hadoop
关注(0)|答案(5)|浏览(537)

**结束。**此问题不符合堆栈溢出准则。它目前不接受答案。
**想改进这个问题吗?**更新问题,使其成为堆栈溢出的主题。

11个月前关门了。
改进这个问题
我正在寻找配置单元(1.2.1)中的scd1和scd2实现。我知道在配置单元(0.14)之前加载scd1和scd2表的解决方法。下面是使用变通方法加载scd1和scd2的链接http://hortonworks.com/blog/four-step-strategy-incremental-updates-hive/
既然hive支持acid操作,只想知道是否有更好或直接的加载方法。

4xrmg8kj

4xrmg8kj1#

drop table if exists harsha.emp;

drop table if exists harsha.emp_tmp1;

drop table if exists harsha.emp_tmp2;

drop table if exists harsha.init_load;

show databases;
use harsha;
show tables;

create table harsha.emp (eid int,ename string,sal int,loc string,dept int,start_date timestamp,end_date timestamp,current_status string)
comment "emp scd implementation"
row format delimited
fields terminated by ','
lines terminated by '\n'
;

create table harsha.emp_tmp1 (eid int,ename string,sal int,loc string,dept int,start_date timestamp,end_date timestamp,current_status string)
comment "emp scd implementation"
row format delimited
fields terminated by ','
lines terminated by '\n'
;

create table harsha.emp_tmp2 (eid int,ename string,sal int,loc string,dept int,start_date timestamp,end_date timestamp,current_status string)
comment "emp scd implementation"
row format delimited
fields terminated by ','
lines terminated by '\n'
;

create table harsha.init_load (eid int,ename string,sal int,loc string,dept int) 
row format delimited
fields terminated by ','
lines terminated by '\n'
;

show tables;

insert into table harsha.emp select 101 as eid,'aaaa' as ename,3400 as sal,'chicago' as loc,10 as did,from_unixtime(unix_timestamp()) as start_date,from_unixtime(unix_timestamp('9999-12-31 23:59:59','yyyy-mm-dd hh:mm:ss')) as end_date,'current' as current_status from (select '123')x;

insert into table harsha.emp select 102 as eid,'abaa' as ename,6400 as sal,'ny' as loc,10 as did,from_unixtime(unix_timestamp()) as start_date,from_unixtime(unix_timestamp('9999-12-31 23:59:59','yyyy-mm-dd hh:mm:ss')) as end_date,'current' as current_status from (select '123')x;

insert into table harsha.emp select 103 as eid,'abca' as ename,2300 as sal,'sfo' as loc,20 as did,from_unixtime(unix_timestamp()) as start_date,from_unixtime(unix_timestamp('9999-12-31 23:59:59','yyyy-mm-dd hh:mm:ss')) as end_date,'current' as current_status from (select '123')x;

insert into table harsha.emp select 104 as eid,'afga' as ename,3000 as sal,'seattle' as loc,10 as did,from_unixtime(unix_timestamp()) as start_date,from_unixtime(unix_timestamp('9999-12-31 23:59:59','yyyy-mm-dd hh:mm:ss')) as end_date,'current' as current_status from (select '123')x;

insert into table harsha.emp select 105 as eid,'ikaa' as ename,1400 as sal,'LA' as loc,30 as did,from_unixtime(unix_timestamp()) as start_date,from_unixtime(unix_timestamp('9999-12-31 23:59:59','yyyy-mm-dd hh:mm:ss')) as end_date,'current' as current_status from (select '123')x;

insert into table harsha.emp select 106 as eid,'cccc' as ename,3499 as sal,'spokane' as loc,20 as did,from_unixtime(unix_timestamp()) as start_date,from_unixtime(unix_timestamp('9999-12-31 23:59:59','yyyy-mm-dd hh:mm:ss')) as end_date,'current' as current_status from (select '123')x;

insert into table harsha.emp select 107 as eid,'toiz' as ename,4000 as sal,'WA.DC' as loc,40 as did,from_unixtime(unix_timestamp()) as start_date,from_unixtime(unix_timestamp('9999-12-31 23:59:59','yyyy-mm-dd hh:mm:ss')) as end_date,'current' as current_status from (select '123')x;

load data local inpath 'Documents/hadoop_scripts/t3.txt' into table harsha.emp;

load data local inpath 'Documents/hadoop_scripts/t4.txt' into table harsha.init_load;

insert into table harsha.emp_tmp1 select eid,ename,sal,loc,dept,from_unixtime(unix_timestamp()) as start_date,from_unixtime(unix_timestamp('9999-12-31 23:59:59','yyyy-mm-dd hh:mm:ss')) as end_date,'current' as current_status 
from harsha.init_load;

insert into table harsha.emp_tmp2
select a.eid,a.ename,a.sal,a.loc,a.dept,from_unixtime(unix_timestamp()) as start_date,from_unixtime(unix_timestamp('9999-12-31 23:59:59','yyyy-mm-dd hh:mm:ss')) as end_date,'updated' as current_status from emp_tmp1 a
left outer join emp b on
a.eid=b.eid and 
a.ename=b.ename and
a.sal=b.sal and 
a.loc = b.loc and 
a.dept = b.dept
where b.eid is null
union all
select a.eid,a.ename,a.sal,a.loc,a.dept,from_unixtime(unix_timestamp()) as start_date,from_unixtime(unix_timestamp('9999-12-31 23:59:59','yyyy-mm-dd hh:mm:ss')) as end_date,'current' as current_status from emp_tmp1 a
left outer join emp b on
a.eid = b.eid and
a.ename=b.ename and
a.sal=b.sal and 
a.loc=b.loc and 
a.dept=b.dept
where b.eid is not null
union all
select b.eid,b.ename,b.sal,b.loc,b.dept,b.start_date as start_date,from_unixtime(unix_timestamp()) as end_date,'expired' as current_status from emp b
inner join emp_tmp1 a on
a.eid=b.eid  
where
a.ename <> b.ename or
a.sal <> b.sal or 
a.loc <> b.loc or 
a.dept <> b.dept 
;

insert into table harsha.emp select eid,ename,sal,loc,dept,start_date,end_date,current_status from emp_tmp2;

records including expired:

select * from harsha.emp order by eid;

latest recods:

select a.* from emp a inner join (select eid ,max(start_date) as start_date from emp where current_status <> 'expired' group by eid) b on a.eid=b.eid and a.start_date=b.start_date;
dy1byipe

dy1byipe2#

我确实使用了另一种方法来管理数据 SCDs :
永远不要更新历史文件或表中确实存在的数据。
确保新行将与最新生成的行进行比较,例如,加载逻辑将添加控制列: loaded_on , checksum 如果需要,在同一天发生多个加载时将使用的序列列,那么将新数据与最新生成的数据进行比较时,将同时使用控制列和存在于数据中的键列(如客户键或产品键)。
现在,通过计算 checksum 除了控制列以外的所有相关列中,为每行创建唯一的指纹。指纹( checksum )列,然后将用于确定与最近一代相比是否有任何列发生了更改(最近一代基于数据的最新状态,基于密钥、加载时间和序列)。
现在,您知道来自每日更新的行是否是新的,因为没有上一代,或者来自每日更新的行是否需要在历史文件或表中创建新行(新一代),以及来自每日更新的行是否没有任何更改,因此不需要创建行,因为没有更改与上一代相比没有区别。
所需的逻辑类型可以使用 Apache Spark ,在一份声明中你可以问 Spark 连接任意数量的列 datatypes 然后计算一个 hash 用于指纹的值。
现在,您可以基于 spark 这将接受任何数据源,并输出一个组织良好,干净和慢维度意识的历史文件,表格,。。。最后,永远不要只更新附加!

lrl1mhuk

lrl1mhuk3#

由于hdfs是不可变的存储,因此可以认为版本化数据和保留历史(scd2)应该是加载维度的默认行为。您可以在hadoopsql查询引擎(hive、impala、drill等)中创建一个视图,使用窗口函数检索当前状态/最新值。你可以在我的博客中找到更多关于hadoop上维度模型的信息,例如如何处理大维度和事实表。

qni6mghb

qni6mghb4#

嗯,我用两个临时表来解决这个问题:

drop table if exists administrator_tmp1;
drop table if exists administrator_tmp2;

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;

--review_administrator
CREATE TABLE if not exists review_administrator(
    admin_id bigint ,
    admin_name string,
    create_time string,
    email string ,
    password string,
    status_description string,
    token string ,
    expire_time string ,
    granter_user_id bigint ,
    admin_time string ,
    effect_start_date string ,
    effect_end_date string 
)
partitioned by (current_row_indicator string comment 'current, expired')
stored as parquet;

--tmp1 is used for saving origin data
CREATE TABLE if not exists administrator_tmp1(
    admin_id bigint ,
    admin_name string,
    create_time string,
    email string ,
    password string ,
    status_description string ,
    token string ,
    expire_time string ,
    granter_user_id bigint ,
    admin_time string ,
    effect_start_date string ,
    effect_end_date string 
)
partitioned by (current_row_indicator string comment 'current, expired:')
stored as parquet;

--tmp2 saving the scd data
CREATE TABLE if not exists administrator_tmp2(
    admin_id bigint ,
    admin_name string,
    create_time string,
    email string ,
    password string ,
    status_description string ,
    token string ,
    expire_time string ,
    granter_user_id bigint ,
    admin_time string ,
    effect_start_date string ,
    effect_end_date string 
)
partitioned by (current_row_indicator string comment 'current, expired')
stored as parquet;

--insert origin data into tmp1
INSERT OVERWRITE TABLE administrator_tmp1 PARTITION(current_row_indicator)
SELECT 
    user_id as admin_id,
    name as admin_name,
    time as create_time,
    email as email,
    password as password,
    status as status_description,
    token as token,
    expire_time as expire_time,
    admin_id as granter_user_id,
    admin_time as admin_time,
    '{{ ds }}' as effect_start_date,
    '9999-12-31' as effect_end_date,
    'current' as current_row_indicator
FROM 
    ks_db_origin.gifshow_administrator_origin
;

--insert scd data into tmp2
--for the data unchanged
INSERT INTO TABLE administrator_tmp2 PARTITION(current_row_indicator)
SELECT
    t2.admin_id,
    t2.admin_name,
    t2.create_time,
    t2.email,
    t2.password,
    t2.status_description,
    t2.token,
    t2.expire_time,
    t2.granter_user_id,
    t2.admin_time,
    t2.effect_start_date,
    t2.effect_end_date as effect_end_date,
    t2.current_row_indicator
FROM
    administrator_tmp1 t1
INNER JOIN 
    (
        SELECT * FROM review_administrator 
        WHERE current_row_indicator = 'current'
    ) t2
ON 
    t1.admin_id = t2.admin_id
AND t1.admin_name = t2.admin_name
AND t1.create_time = t2.create_time
AND t1.email = t2.email
AND t1.password = t2.password
AND t1.status_description = t2.status_description
AND t1.token = t2.token
AND t1.expire_time = t2.expire_time
AND t1.granter_user_id = t2.granter_user_id
AND t1.admin_time = t2.admin_time
;

--for the data changed , update the effect_end_date
INSERT INTO TABLE administrator_tmp2 PARTITION(current_row_indicator)
SELECT
    t2.admin_id,
    t2.admin_name,
    t2.create_time,
    t2.email,
    t2.password,
    t2.status_description,
    t2.token,
    t2.expire_time,
    t2.granter_user_id,
    t2.admin_time,
    t2.effect_start_date as effect_start_date,
    '{{ yesterday_ds }}' as effect_end_date,
    'expired' as current_row_indicator
FROM
    administrator_tmp1 t1
INNER JOIN 
    (
        SELECT * FROM review_administrator 
        WHERE current_row_indicator = 'current'
    ) t2
ON 
    t1.admin_id = t2.admin_id
WHERE NOT 
    (
        t1.admin_name = t2.admin_name
    AND t1.create_time = t2.create_time
    AND t1.email = t2.email
    AND t1.password = t2.password
    AND t1.status_description = t2.status_description
    AND t1.token = t2.token
    AND t1.expire_time = t2.expire_time
    AND t1.granter_user_id = t2.granter_user_id
    AND t1.admin_time = t2.admin_time
    )
;

--for the changed data and the new data
INSERT INTO TABLE administrator_tmp2 PARTITION(current_row_indicator)
SELECT
    t1.admin_id,
    t1.admin_name,
    t1.create_time,
    t1.email,
    t1.password,
    t1.status_description,
    t1.token,
    t1.expire_time,
    t1.granter_user_id,
    t1.admin_time,
    t1.effect_start_date,
    t1.effect_end_date,
    t1.current_row_indicator
FROM
    administrator_tmp1 t1
LEFT OUTER JOIN 
    (
        SELECT * FROM review_administrator 
        WHERE current_row_indicator = 'current'
    ) t2
ON 
    t1.admin_id = t2.admin_id
AND t1.admin_name = t2.admin_name
AND t1.create_time = t2.create_time
AND t1.email = t2.email
AND t1.password = t2.password
AND t1.status_description = t2.status_description
AND t1.token = t2.token
AND t1.expire_time = t2.expire_time
AND t1.granter_user_id = t2.granter_user_id
AND t1.admin_time = t2.admin_time
WHERE t2.admin_id IS NULL
;

--for the data already marked by 'expired'
INSERT INTO TABLE administrator_tmp2 PARTITION(current_row_indicator)
SELECT
    t1.admin_id,
    t1.admin_name,
    t1.create_time,
    t1.email,
    t1.password,
    t1.status_description,
    t1.token,
    t1.expire_time,
    t1.granter_user_id,
    t1.admin_time,
    t1.effect_start_date,
    t1.effect_end_date,
    t1.current_row_indicator
FROM
    review_administrator t1
WHERE t1.current_row_indicator = 'expired'
;

--populate the dim table
INSERT OVERWRITE TABLE review_administrator PARTITION(current_row_indicator)
SELECT
    t1.admin_id,
    t1.admin_name,
    t1.create_time,
    t1.email,
    t1.password,
    t1.status_description,
    t1.token,
    t1.expire_time,
    t1.granter_user_id,
    t1.admin_time,
    t1.effect_start_date,
    t1.effect_end_date,
    t1.current_row_indicator
FROM
    administrator_tmp2 t1
;

--drop the two temp table
drop table administrator_tmp1;
drop table administrator_tmp2;

-- --example data
-- --2017-01-01
-- insert into table review_administrator PARTITION(current_row_indicator)
-- SELECT '1','a','2016-12-31','a@ks.com','password','open','token1','2017-12-31',
-- 0,'2017-12-31','2017-01-01','9999-12-31','current' 
-- FROM default.sample_07 limit 1;

-- --2017-01-02
-- insert into table administrator_tmp1 PARTITION(current_row_indicator)
-- SELECT '1','a','2016-12-31','a01@ks.com','password','open','token1','2017-12-31',
-- 0,'2017-12-31','2017-01-02','9999-12-31','current' 
-- FROM default.sample_07 limit 1;

-- insert into table administrator_tmp1 PARTITION(current_row_indicator)
-- SELECT '2','b','2016-12-31','a@ks.com','password','open','token1','2017-12-31',
-- 0,'2017-12-31','2017-01-02','9999-12-31','current' 
-- FROM default.sample_07 limit 1;

-- --2017-01-03
-- --id 1 is changed
-- insert into table administrator_tmp1 PARTITION(current_row_indicator)
-- SELECT '1','a','2016-12-31','a03@ks.com','password','open','token1','2017-12-31',
-- 0,'2017-12-31','2017-01-03','9999-12-31','current' 
-- FROM default.sample_07 limit 1;
-- --id 2 is not changed at all
-- insert into table administrator_tmp1 PARTITION(current_row_indicator)
-- SELECT '2','b','2016-12-31','a@ks.com','password','open','token1','2017-12-31',
-- 0,'2017-12-31','2017-01-03','9999-12-31','current' 
-- FROM default.sample_07 limit 1;
-- --id 3 is a new record
-- insert into table administrator_tmp1 PARTITION(current_row_indicator)
-- SELECT '3','c','2016-12-31','c@ks.com','password','open','token1','2017-12-31',
-- 0,'2017-12-31','2017-01-03','9999-12-31','current' 
-- FROM default.sample_07 limit 1;

-- --now dim table will show you the right SCD.
smdnsysy

smdnsysy5#

下面是使用独占连接方法在配置单元中缓慢更改维度类型2的详细实现。
假设源正在发送一个完整的数据文件,即旧的、更新的和新的记录。

Steps-

将最近的文件数据加载到stg表
从hist表中选择所有过期的记录 select * from HIST_TAB where exp_dt != '2099-12-31' 使用内部联接和hist.column=stg.column上的过滤器,从stg和hist中选择所有未更改的记录,如下所示 select hist.* from HIST_TAB hist inner join STG_TAB stg on hist.key = stg.key where hist.column = stg.column 选择所有新的和更新的记录,这些记录是从stg\u选项卡使用独占左连接hist\u选项卡更改的,并设置到期和生效日期,如下所示 select stg.*, eff_dt (yyyy-MM-dd), exp_dt (2099-12-31) from STG_TAB stg left join (select * from HIST_TAB where exp_dt = '2099-12-31') hist on hist.key = stg.key where hist.key is null or hist.column != stg.column 使用exclusive left join with stg table从hist表中选择所有更新的旧记录,并设置其到期日期,如下所示:
select hist.*, exp_dt(yyyy-MM-dd) from (select * from HIST_TAB where exp_dt = '2099-12-31') hist left join STG_TAB stg on hist.key= stg.key where hist.key is null or hist.column!= stg.column unionall 从2-5查询并将覆盖结果插入hist表
更详细的scd类型2的实现可以在这里找到-
https://github.com/sahilbhange/slowly-changing-dimension

相关问题