我 从 RDS 运行 一 个 查询 , 并 使用 Pyspark 将 该 查询 转换 为 DataFrame 。
这 是 我 的 代码
query= "Select * from profit"
profit=pd.read_sql(query, con=db_connection)
StructureSechma=StructType([
StructField("id",IntegerType(), True),
StructField("type",StringType(), False),
StructField("userId",IntegerType(), True),
StructField("amount",FloatType(), False),
StructField("sell",StringType(), False),
StructField("buy",StringType(), False),
StructField("createdAt",DateType(), False),
StructField("updatedAt",DateType(), False)
])
profit_df = spark.createDataFrame(profit,,schema=StructureSechma)
中 的 每 一 个
我 收到 这 期
File "<stdin>", line 1, in <module>
File "/home/ec2-user/anaconda3/lib/python3.6/site-packages/pyspark/sql/session.py", line 748, in createDataFrame
rdd, schema = self._createFromLocal(map(prepare, data), schema)
File "/home/ec2-user/anaconda3/lib/python3.6/site-packages/pyspark/sql/session.py", line 413, in _createFromLocal
data = list(data)
File "/home/ec2-user/anaconda3/lib/python3.6/site-packages/pyspark/sql/session.py", line 730, in prepare
verify_func(obj)
File "/home/ec2-user/anaconda3/lib/python3.6/site-packages/pyspark/sql/types.py", line 1391, in verify
verify_value(obj)
File "/home/ec2-user/anaconda3/lib/python3.6/site-packages/pyspark/sql/types.py", line 1370, in verify_struct
"length of fields (%d)" % (len(obj), len(verifiers))))
ValueError: Length of object (25) does not match with length of fields (8)
格式
对于 如何 解决 此 问题 有 何 建议 ?
谢谢
1条答案
按热度按时间nx7onnlm1#
你 不 需要 Pandas 。
使用 Spark 直接 查询 使用
spark.read.jdbc
的 RDS , 然后 会 自动 从 数据 库 本身 推断 出 您 的 模式 。https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html 的 最 大 值
否则 , 看看 考拉 库 , 它 有
from_pandas
函数https://koalas.readthedocs.io/en/latest/user_guide/pandas_pyspark.html 格式