如何使用pyspark jdbc连接器将表删除到teradata?

kx7yvsdv  于 2023-05-21  发布在  Spark
关注(0)|答案(1)|浏览(236)

我可以从teradata数据库中选择,但不能放弃使用pyspark。
我还使用了jaydebeapi在同一个spark会话中删除表,这很有效。希望有人能遇到同样的问题。

drop_sql = """ (DROP TABLE <DB_NAME>.<TABLENAME>) """

conn = spark.read \
.format("jdbc") \
.option("driver","com.teradata.jdbc.TeraDriver") \
.option("url","jdbc:teradata://<IP_ADDRESS>/DATABASE=. <DB_NAME>,TMODE=ANSI,CHARSET=UTF8,TYPE=FASTLOAD,LOGMECH=LDAP") \
.option("query", drop_sql) \
.option("user", user) \
.option("password",password)\
.option("fetchsize",10000).load()

错误:
Py4JJavaError:调用o265.load时出错。:java.sql.SQLException:[Teradata数据库] [TeraJDBC 17.20.00.15] [错误3707] [SQLState 42000]语法错误,应为名称、Unicode分隔的标识符、“UDFCALLNAME”关键字、“SELECT”关键字或“(”,介于“(”和“DROP”关键字之间。

h5qlskok

h5qlskok1#

spark.read提供了更高级的语言。它不是Python的Terradata驱动程序。

  • 传递给spark.read.format('jdbc').option(query, '...')的查询只能包含SELECT语句。
  • 无论你提供什么,在它被发送到底层驱动程序执行之前,都会被spark代码反过来 Package 在外部SELECT中。例如:
spark.read.format("jdbc") \
    .option("url", jdbcUrl) \
    .option("query", "SELECT c1, c2 FROM t1") \
    .option("partitionColumn", "partiion_id") \
    .option("lowerBound", "1") \
    .option("upperBound", "300") \
    .option("numPartitions", "3") \
    .load()

将转换为在底层DB上并行执行的3个查询。请注意真实的的会略有不同,这是出于学术目的而策划的:

SELECT t.* FROM (SELECT c1, c2 FROM t1 WHERE partiion_id BETWEEN 1 AND 100) t
SELECT t.* FROM (SELECT c1, c2 FROM t1 WHERE partiion_id BETWEEN 100 AND 200) t
SELECT t.* FROM (SELECT c1, c2 FROM t1 WHERE partiion_id BETWEEN 100 AND 300) t

因此,在您的情况下,Terradata不高兴,因为Spark正在执行以下内容:
SELECT t.* FROM (DROP TABLE <DB_NAME>.<TABLENAME>) t
你拥有的不是“pyspark jdbc连接器到teradata”。Terradata JDBC驱动程序
要在Terradata上运行Terradata specific SQL,您需要编写使用Terradata特定驱动程序的Python代码。这里有一个例子。

import teradatasql

with teradatasql.connect (host="whomooz", user="guest", password="please") as con:
    with con.cursor () as cur:
        try:
            sRequest = "DROP TABLE <DB_NAME>.<TABLENAME>"
            print (sRequest)
            cur.execute (sRequest)
        except Exception as ex:
            print ("Ignoring", str (ex).split ("\n") [0])

如果你想在Databricks/Spark-cluster上运行这段代码,那么你必须在有问题的集群上执行add the jdbc driver library。例如,作为集群库。然后在该集群上运行上面的代码。
我假设您已经这样做了,因为您得到的错误。

相关问题