oracle分区表到datalake

kqlmhetl  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(189)

我将在oracle数据库中每个月自动将一个间隔一个月的分区表放入datalake中。在插入datalake之后,我将删除分区。
这段代码是oracle表到Parquet数据湖

from pyspark.sql import SparkSession
from datetime import date

db_host = "IP"
db_port = "PORT"
db_service = "SERVICE_NAME"
schema_name = "schema"
table_name = "T_name"

# today = date.today()

# dir_name = today.strftime("%Y-%m-%d")

db_user = "oracle_user"
db_pass = "oracle_password"

spark = SparkSession.builder \
        .appName("Load " + schema_name + " " + table_name + " from Oracle into Parquet and creating Table") \
        .getOrCreate()

# load both CSV/DSEFS files into DataFrames

query = '(SELECT * FROM ' + schema_name + '.' + table_name + ' c) '+ table_name + ''

df = spark.read \
    .format("jdbc") \
    .option("url","jdbc:oracle:thin:@" + db_host + ":" + db_port + "/" + db_service) \
    .option("dbtable",query) \
    .option("user",db_user) \
    .option("password",db_pass) \
    .option("driver","oracle.jdbc.OracleDriver") \
    .option("encoding","UTF-8") \
    .option("fetchSize", 10000) \
    .option("numPartitions",40) \
    .load()

# Convert it to Spark SQL table and save it as parquet format

df.write \
    .format("parquet") \
    .option("path","/archive/" + schema_name + "_" + table_name + ".parquet") \
    .mode("overwrite") \
    .saveAsTable(table_name)

我正在使用ca workload automation ae,我创建了这个作业。


# !/bin/bash

. /ca/.profile

pgm=`basename $0`

source app3

sqlplus /"Pass"@S_name @/ca/autosys_script/archive_sms_alerts_history.sql > ${sysout}/${pgm}.log 2>&1

echo "************************************************"
echo "log file dir: " ${sysout}/${pgm}.log

cat ${sysout}/${pgm}.log

if grep -q "ORA-" ${sysout}/${pgm}.log; then
  echo "************************************************"
  echo " ERROR 06 : SMS_ALERTS_HISTORY table archive job failed   "
  echo "            Please call database administrator. "
  echo "************************************************"
  echo " "
  exit 6
else
  echo "*******SMS ALERTS HISTORY ARCHIVE SUCCESSFULLY COMPLETED********"
fi

这是sql

DECLARE
    v_partname   VARCHAR2 (50);
    v_cnt        INT;
BEGIN
    SELECT COUNT (*)
      INTO v_cnt
      FROM dba_tab_partitions
     WHERE table_owner = 'SMSALERT' AND table_name = 'SMS_ALERTS_HISTORY';

    ###rename partion table   I will rename the partition table SYS_P### to P202005 etc.
    rename partition table 
    IF v_cnt > 3
    THEN

        SELECT partition_name
          INTO v_partname
          FROM dba_tab_partitions
         WHERE     table_owner = 'schema'
               AND table_name = 'table'
               AND partition_position IN
                       (SELECT MIN (partition_position)
                          FROM dba_tab_partitions
                         WHERE table_name = 't_name');

# checking Partition name

       verify v_partname = 202005

        EXECUTE IMMEDIATE
               'insert /*+ PARALLEL(10) */ into SMSALERT.ARC_SMS_ALERTS_HISTORY select * from SMSALERT.SMS_ALERTS_HISTORY partition('
            || v_partname
            || ')';

        EXECUTE IMMEDIATE
               'alter table SMSALERT.SMS_ALERTS_HISTORY drop partition '
            || v_partname
            || ' UPDATE INDEXES';

        EXECUTE IMMEDIATE 'ALTER TABLE SMSALERT.SMS_ALERTS_HISTORY SET INTERVAL ( NUMTOYMINTERVAL(1, ''MONTH''))';            
        COMMIT;
    ELSE
        dbms_output.put_line('PARTITION IS LESS THAN 3 - SKIPPED');
    END IF;
END;
/

EXIT;

制作4 ca作业
如果分区cnt>3,则检查分区
查找最旧的分区执行python作业
检查datalake中从oracle插入的数据——我需要这个<<<<<
删除oracle中最旧的分区谢谢^_^

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题