我将在oracle数据库中每个月自动将一个间隔一个月的分区表放入datalake中。在插入datalake之后,我将删除分区。
这段代码是oracle表到Parquet数据湖
from pyspark.sql import SparkSession
from datetime import date
db_host = "IP"
db_port = "PORT"
db_service = "SERVICE_NAME"
schema_name = "schema"
table_name = "T_name"
# today = date.today()
# dir_name = today.strftime("%Y-%m-%d")
db_user = "oracle_user"
db_pass = "oracle_password"
spark = SparkSession.builder \
.appName("Load " + schema_name + " " + table_name + " from Oracle into Parquet and creating Table") \
.getOrCreate()
# load both CSV/DSEFS files into DataFrames
query = '(SELECT * FROM ' + schema_name + '.' + table_name + ' c) '+ table_name + ''
df = spark.read \
.format("jdbc") \
.option("url","jdbc:oracle:thin:@" + db_host + ":" + db_port + "/" + db_service) \
.option("dbtable",query) \
.option("user",db_user) \
.option("password",db_pass) \
.option("driver","oracle.jdbc.OracleDriver") \
.option("encoding","UTF-8") \
.option("fetchSize", 10000) \
.option("numPartitions",40) \
.load()
# Convert it to Spark SQL table and save it as parquet format
df.write \
.format("parquet") \
.option("path","/archive/" + schema_name + "_" + table_name + ".parquet") \
.mode("overwrite") \
.saveAsTable(table_name)
我正在使用ca workload automation ae,我创建了这个作业。
# !/bin/bash
. /ca/.profile
pgm=`basename $0`
source app3
sqlplus /"Pass"@S_name @/ca/autosys_script/archive_sms_alerts_history.sql > ${sysout}/${pgm}.log 2>&1
echo "************************************************"
echo "log file dir: " ${sysout}/${pgm}.log
cat ${sysout}/${pgm}.log
if grep -q "ORA-" ${sysout}/${pgm}.log; then
echo "************************************************"
echo " ERROR 06 : SMS_ALERTS_HISTORY table archive job failed "
echo " Please call database administrator. "
echo "************************************************"
echo " "
exit 6
else
echo "*******SMS ALERTS HISTORY ARCHIVE SUCCESSFULLY COMPLETED********"
fi
这是sql
DECLARE
v_partname VARCHAR2 (50);
v_cnt INT;
BEGIN
SELECT COUNT (*)
INTO v_cnt
FROM dba_tab_partitions
WHERE table_owner = 'SMSALERT' AND table_name = 'SMS_ALERTS_HISTORY';
###rename partion table I will rename the partition table SYS_P### to P202005 etc.
rename partition table
IF v_cnt > 3
THEN
SELECT partition_name
INTO v_partname
FROM dba_tab_partitions
WHERE table_owner = 'schema'
AND table_name = 'table'
AND partition_position IN
(SELECT MIN (partition_position)
FROM dba_tab_partitions
WHERE table_name = 't_name');
# checking Partition name
verify v_partname = 202005
EXECUTE IMMEDIATE
'insert /*+ PARALLEL(10) */ into SMSALERT.ARC_SMS_ALERTS_HISTORY select * from SMSALERT.SMS_ALERTS_HISTORY partition('
|| v_partname
|| ')';
EXECUTE IMMEDIATE
'alter table SMSALERT.SMS_ALERTS_HISTORY drop partition '
|| v_partname
|| ' UPDATE INDEXES';
EXECUTE IMMEDIATE 'ALTER TABLE SMSALERT.SMS_ALERTS_HISTORY SET INTERVAL ( NUMTOYMINTERVAL(1, ''MONTH''))';
COMMIT;
ELSE
dbms_output.put_line('PARTITION IS LESS THAN 3 - SKIPPED');
END IF;
END;
/
EXIT;
制作4 ca作业
如果分区cnt>3,则检查分区
查找最旧的分区执行python作业
检查datalake中从oracle插入的数据——我需要这个<<<<<
删除oracle中最旧的分区谢谢^_^
暂无答案!
目前还没有任何答案,快来回答吧!