我尝试使用pyspark在teradata中运行一个查询,我可以使用这个函数拉取整个表,但是当我尝试运行查询时遇到了一个错误。你们能检查并告诉我哪里出错了吗。
def from_rdbms(spark,user, password, driver, jdbc_url, p_type, p_table, p_query, p_partition, p_numpartitions, p_lower=1, p_upper=1, p_fetchsize=1000):
df_ret = None
dbProperties = {
"user": user,
"password": password,
"driver": driver
}
jdbcUrl = jdbc_url
dbProperties["fetchsize"] = str(p_fetchsize)
dbPropertiesExtended = dbProperties
if p_type == "Table":
query = p_table
else:
query = p_query
if p_partition == '':
df_ret = spark.read.jdbc(url=jdbcUrl , table=query , properties=dbProperties)
else:
dbPropertiesExtended["partition"] = str(p_partition)
dbPropertiesExtended["lower"] = str(p_lower)
dbPropertiesExtended["upper"] = str(p_upper)
dbPropertiesExtended["numpartitions"] = str(p_numpartitions)
df_ret = spark.read.jdbc(url=jdbcUrl, table=query , properties=dbPropertiesExtended)
return df_ret
运行功能
query1="select count(*) as c from "+db_name+"."+table_name + " t1"
count_td = from_rdbms(spark,user_name, password,driver="com.teradata.jdbc.TeraDriver" , jdbc_url= source_url, p_type="Query", p_table="", p_query=query1, p_partition="", p_numpartitions="", p_lower=1, p_upper=1, p_fetchsize=1000)
我得到的错误是:
java.sql.SQLException: [Teradata Database] [TeraJDBC 16.20.00.08] [Error 3707] [SQLState 42000] Syntax error, expected something like a name or a Unicode delimited identifier or an 'UDFCALLNAME' keyword or '(' between the 'FROM' keyword and the 'select' keyword.
at com.teradata.jdbc.jdbc_4.util.ErrorFactory.makeDatabaseSQLException(ErrorFactory.java:309)
at com.teradata.jdbc.jdbc_4.statemachine.ReceiveInitSubState.action(ReceiveInitSubState.java:103)
at com.teradata.jdbc.jdbc_4.statemachine.StatementReceiveState.subStateMachine(StatementReceiveState.java:311)
at com.teradata.jdbc.jdbc_4.statemachine.StatementReceiveState.action(StatementReceiveState.java:200)
at com.teradata.jdbc.jdbc_4.statemachine.StatementController.runBody(StatementController.java:137)
at com.teradata.jdbc.jdbc_4.statemachine.StatementController.run(StatementController.java:128)
at com.teradata.jdbc.jdbc_4.TDStatement.executeStatement(TDStatement.java:389)
at com.teradata.jdbc.jdbc_4.TDStatement.prepareRequest(TDStatement.java:576)
at com.teradata.jdbc.jdbc_4.TDPreparedStatement.<init>(TDPreparedStatement.java:131)
at com.teradata.jdbc.jdk6.JDK6_SQL_PreparedStatement.<init>(JDK6_SQL_PreparedStatement.java:30)
at com.teradata.jdbc.jdk6.JDK6_SQL_Connection.constructPreparedStatement(JDK6_SQL_Connection.java:82)
at com.teradata.jdbc.jdbc_4.TDSession.prepareStatement(TDSession.java:1337)
at com.teradata.jdbc.jdbc_4.TDSession.prepareStatement(TDSession.java:1381)
at com.teradata.jdbc.jdbc_4.TDSession.prepareStatement(TDSession.java:1367)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:60)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:114)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:52)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:309)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:193)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
1条答案
按热度按时间x8diyxa71#
当我把查询写成