从teradata到pyspark的查询

bybem2ql  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(437)

我尝试使用pyspark在teradata中运行一个查询,我可以使用这个函数拉取整个表,但是当我尝试运行查询时遇到了一个错误。你们能检查并告诉我哪里出错了吗。

def from_rdbms(spark,user, password, driver, jdbc_url, p_type, p_table, p_query, p_partition, p_numpartitions, p_lower=1, p_upper=1, p_fetchsize=1000):
    df_ret = None
    dbProperties = {
            "user": user,
            "password": password,
            "driver": driver
            }
    jdbcUrl = jdbc_url
    dbProperties["fetchsize"] = str(p_fetchsize)
    dbPropertiesExtended = dbProperties
    if p_type == "Table":
        query = p_table
    else:
        query =  p_query 
    if p_partition == '':
        df_ret = spark.read.jdbc(url=jdbcUrl , table=query , properties=dbProperties)
    else:
        dbPropertiesExtended["partition"] = str(p_partition)
        dbPropertiesExtended["lower"] = str(p_lower)
        dbPropertiesExtended["upper"] = str(p_upper)
        dbPropertiesExtended["numpartitions"] = str(p_numpartitions)
        df_ret = spark.read.jdbc(url=jdbcUrl, table=query , properties=dbPropertiesExtended)
    return df_ret

运行功能

query1="select count(*) as c from "+db_name+"."+table_name + " t1"

count_td = from_rdbms(spark,user_name, password,driver="com.teradata.jdbc.TeraDriver" , jdbc_url= source_url, p_type="Query", p_table="", p_query=query1, p_partition="", p_numpartitions="", p_lower=1, p_upper=1, p_fetchsize=1000)

我得到的错误是:

java.sql.SQLException: [Teradata Database] [TeraJDBC 16.20.00.08] [Error 3707] [SQLState 42000] Syntax error, expected something like a name or a Unicode delimited identifier or an 'UDFCALLNAME' keyword or '(' between the 'FROM' keyword and the 'select' keyword.
        at com.teradata.jdbc.jdbc_4.util.ErrorFactory.makeDatabaseSQLException(ErrorFactory.java:309)
        at com.teradata.jdbc.jdbc_4.statemachine.ReceiveInitSubState.action(ReceiveInitSubState.java:103)
        at com.teradata.jdbc.jdbc_4.statemachine.StatementReceiveState.subStateMachine(StatementReceiveState.java:311)
        at com.teradata.jdbc.jdbc_4.statemachine.StatementReceiveState.action(StatementReceiveState.java:200)
        at com.teradata.jdbc.jdbc_4.statemachine.StatementController.runBody(StatementController.java:137)
        at com.teradata.jdbc.jdbc_4.statemachine.StatementController.run(StatementController.java:128)
        at com.teradata.jdbc.jdbc_4.TDStatement.executeStatement(TDStatement.java:389)
        at com.teradata.jdbc.jdbc_4.TDStatement.prepareRequest(TDStatement.java:576)
        at com.teradata.jdbc.jdbc_4.TDPreparedStatement.<init>(TDPreparedStatement.java:131)
        at com.teradata.jdbc.jdk6.JDK6_SQL_PreparedStatement.<init>(JDK6_SQL_PreparedStatement.java:30)
        at com.teradata.jdbc.jdk6.JDK6_SQL_Connection.constructPreparedStatement(JDK6_SQL_Connection.java:82)
        at com.teradata.jdbc.jdbc_4.TDSession.prepareStatement(TDSession.java:1337)
        at com.teradata.jdbc.jdbc_4.TDSession.prepareStatement(TDSession.java:1381)
        at com.teradata.jdbc.jdbc_4.TDSession.prepareStatement(TDSession.java:1367)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:60)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:114)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:52)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:309)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
        at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:193)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:483)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:280)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:745)
x8diyxa7

x8diyxa71#

当我把查询写成

query1="(select count(*) as c from "+db_name+"."+table_name + ") as t1"

相关问题