Pyspark:从表中读取数据并写入文件

vawmfj5a  于 2023-04-12  发布在  Apache
关注(0)|答案(2)|浏览(198)

我正在使用HDInsight spark集群运行我的Pyspark代码。我试图从postgres表读取数据并写入如下文件。pgsql_df返回DataFrameReader而不是DataFrame。所以我无法将DataFrame写入文件。为什么“spark.read”返回DataFrameReader。我在这里错过了什么?

from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark import SQLContext
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as dbpull
from datetime import datetime
from pyspark.sql.types import Row
from pyspark.sql import DataFrame
from pyspark.sql import DataFrameReader
from pyspark.sql import DataFrameWriter
import random
import string
from pyspark.sql.functions import *
import sys
spark=SparkSession.builder.master("local").appName("db pull").getOrCreate()
pgsql_df=spark.read.format("jdbc") \
    .option("driver", "org.postgresql.Driver") \
    .option("url", "jdbc:postgresql://<hostdetails>") \
    .option("dbtable", "table") \
    .option("user", "user") \
    .option("password", "password")```

>>>pgsql_df
<pyspark.sql.readwriter.DataFrameReader object at 0x7fb43ce1f890>

pgsql_df.write.format("csv").mode("overwrite").options(sep=",", header="true").save(path=output)

**Error:** 
 Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'DataFrameReader' object has no attribute 'write'
oxiaedzo

oxiaedzo1#

请检查以下代码。您缺少对DataFrameReader对象调用load()。

pgsql_df=spark.read.format("jdbc") \
    .option("driver", "org.postgresql.Driver") \
    .option("url", "jdbc:postgresql://<hostdetails>") \
    .option("dbtable", "table") \
    .option("user", "user") \
    .option("password", "password")
    .load() // this is missing 

pgsql_df.write.format("csv").mode("overwrite").options(sep=",", header="true").save(path=output)

or 

pgsql_df=spark.read.format("jdbc") \
    .option("driver", "org.postgresql.Driver") \
    .option("url", "jdbc:postgresql://<hostdetails>") \
    .option("dbtable", "table") \
    .option("user", "user") \
    .option("password", "password")

pgsql_df
.load() \ added here 
.write. \
.format("csv").mode("overwrite").options(sep=",", header="true").save(path=output)
0dxa2lsx

0dxa2lsx2#

+1以上,Pyspark读取语法应包括以下内容:

spark.read \
     .format() \ # this is the raw format you are reading from
     .option("key", "value") \
     .schema() \ # this is optional, use when you know the schema
     .load(path)

而写入保存语法应包括:

df.write \
  .mode('overwrite') \ # or append
  .partitionBy(col_name) \ # this is optional
  .format('parquet') \ # this is optional, parquet is default
  .option("key", "value") \
  .save(path)

您可以查看此博客以了解更多详细信息:https://medium.com/@yoloshe302/pyspark-tutorial-read-and-write-data-with-pyspark-7826b95f29f9

相关问题