HDFS 利用Pyspark通过JDBC将Spark数据框中的数据复制到Tera数据表中

m1m5dgzv  于 2022-12-09  发布在  HDFS
关注(0)|答案(1)|浏览(141)

〉我已使用Pyspark代码将HDFS数据集的内容(已转换为 Dataframe )复制到使用JDBC的teradata表中

`# -*- coding: utf-8 -*-
import dataiku
from dataiku import spark as dkuspark
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext,HiveContext,SparkSession
from pyspark.sql.functions import *
from pyspark.sql import Window
from pyspark.sql.types import *
import datetime
from pyspark.sql.functions import col,when
from pyspark.sql import DataFrame
from functools import reduce
from datetime import date,datetime
import pandas  as pd
#import psycopg2
import sys
from datetime import timedelta
import array
import numpy as np
from pyspark.sql import functions as F
import teradata
import teradatasql
import sys
from collections import OrderedDict
import pprint as pp
import logging
import logging.handlers
import smtplib
import datetime
import os
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python36'
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
**# Teradata connection details**
user = dataiku.get_custom_variables()['DB_TERADATA_BA_USER']
password = dataiku.get_custom_variables()['DB_TERADATA_BA_USER_PWD']
teradata_server = dataiku.get_custom_variables()['Teradata_server']

**# Connect to Teradata**
tera_con = teradatasql.connect(host=teradata_server, user=user, password=password)
tera_cur = tera_con.cursor()
print("connection to teradata successful")

driver = dataiku.get_custom_variables()['DB_TERADATA_DRIVER']
auditdburl = "jdbc:teradata://"+teradata_server+"/Database=DBName"

#LOGMECH=TD2"
***# Read recipe inputs***
PVS_OP_10052020_1 = dataiku.Dataset("310_PVS_OP_10052020_1")
PVS_OP_10052020_1_df = dkuspark.get_dataframe(sqlContext, PVS_OP_10052020_1)

 # Compute recipe outputs from inputs
 # TODO: Replace this part by your actual code that computes the output, as a SparkSQL dataframe
   bac_NCCT_310_PVS_OP_Test_POC_test1_df = PVS_OP_10052020_1_df  # For this sample code, simply copy 
   input to output

 bac_NCCT_310_PVS_OP_Test_POC_test1_df.write.format("jdbc")\
.option("driver",driver)\
.option("url",auditdburl)\
.option("dbtable",'BAC_NCCT_310_PVS_OP_Test_POC_test3')\
.option("user",user)\
.option("password",password)\
.option('TYPE','FASTEXPORT')\
.mode('append')\
.save()`

~运行代码时出现以下错误作业失败:Pyspark代码失败:在第74行:〈类'py4j.protocol.Py4JJavaError'〉的错误类型:调用o96.save时发生错误。有人能帮助我找出我做错了什么吗?因为我是Pyspark的新手~

deyfvvtc

deyfvvtc1#

您是否尝试过在spark配置中添加属性"mapreduce.job.queuename": "long_running"。这对有效。

相关问题