是否有一个pyspark函数可以在多个 Dataframe 列上给予我2个小数位?

x7rlezfr  于 2023-03-17  发布在  Spark
关注(0)|答案(2)|浏览(116)

我是一个编程新手,也是pysparkpython的新手(我说的新手是指我是一个学生,正在学习它)。
我不断得到错误在我的代码,我不能找出原因.我正在尝试做的是让我的代码给予我一个2十进制输出,看起来像这样.下面是一个示例输出什么我希望我的输出看起来像:

+------+--------+------+------+
|col_ID| f.name |bal   | avg. |
+------+--------+------+------+
|1234  | Henry  |350.45|400.32|
|3456  | Sam    |75.12 | 50.60|
+------+--------+------+------+

但这是我的代码,这是我得到的错误:
我的代码:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col #import col function for column manipulation
#import pyspark.sql.functions as func
spark=SparkSession.builder.getOrCreate()
    
df = spark.read.csv("/user/cloudera/Default2_Data.csv", header = True, inferSchema = True) \
.withColumn("income",round(df["income"],2)) \
.withColumn("balance",func.col("balance").cast('Float'))
#df.select(col("income").alias("income")),
#col("balance").alias("balance"),
#func.round(df["income"],2).alias("income1")
    
df.show(15)

输出:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/`spark2`/python/pyspark/sql/utils.py in deco(*a, **kw)
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:

/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in `get_return_value`(answer, gateway_client, target_id, name)
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:

Py4JJavaError: An error occurred while calling o707.withColumn.
: org.apache.spark.sql.AnalysisException: Resolved attribute(s) income#1101 missing from student#1144,income#1146,default#1143,RecordID#1142,balance#1145 in operator !Project [RecordID#1142, default#1143, student#1144, balance#1145, round(income#1101, 2) AS income#1152]. Attribute(s) with the same name appear in the operation: income. Please check if the right attribute(s) are used.;;
!Project [RecordID#1142, default#1143, student#1144, balance#1145, round(income#1101, 2) AS income#1152]
+- Relation[RecordID#1142,default#1143,student#1144,balance#1145,income#1146] csv

    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:42)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:326)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:85)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:95)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:108)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:78)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3406)
    at org.apache.spark.sql.Dataset.select(Dataset.scala:1334)
    at org.apache.spark.sql.Dataset.withColumns(Dataset.scala:2252)
    at org.apache.spark.sql.Dataset.withColumn(Dataset.scala:2219)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

During handling of the above exception, another exception occurred:

AnalysisException                         Traceback (most recent call last)
<ipython-input-102-13a967925c21> in <module>
      1 df = spark.read.csv("/user/cloudera/Default2_Data.csv", header = True, inferSchema = True) \
----> 2 .withColumn("income",round(df["income"],2)) \
      3 .withColumn("balance",func.col("balance").cast('Float'))
      4 #df.select(col("income").alias("income")),
      5 #col("balance").alias("balance"),

/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/pyspark/sql/dataframe.py in withColumn(self, colName, col)
   1987         """
   1988         assert isinstance(col, Column), "col should be Column"
-> 1989         return DataFrame(self._jdf.withColumn(colName, col._jc), self.sql_ctx)
   1990 
   1991     @ignore_unicode_prefix

/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/opt/cloudera/parcels/SPARK2-2.4.0.cloude`enter code here`ra2-1.cdh5.13.3.p0.1041012/lib/spark2/python/pyspark/sql/utils.py in deco(*a, **kw)
     67                                              e.java_exception.getStackTrace()))
     68             if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     70             if s.startswith('org.apache.spark.sql.catalyst.analysis'):
     71                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)

AnalysisException: 'Resolved attribute(s) income#1101 missing from student#1144,income#1146,default#1143,RecordID#1142,balance#1145 in operator !Project [RecordID#1142, default#1143, student#1144, balance#1145, round(income#1101, 2) AS income#1152]. Attribute(s) with the same name appear in the operation: income. Please check if the right attribute(s) are used.;;\n!Project [RecordID#1142, default#1143, student#1144, balance#1145, round(income#1101, 2) AS income#1152]\n+- Relation[RecordID#1142,default#1143,student#1144,balance#1145,income#1146] csv\n'
owfi6suc

owfi6suc1#

以下行改为:

withColumn("income",round(df["income"],2))

修改如下:

withColumn("income",round(col('income'),2))
eimct9ow

eimct9ow2#

有一个函数- KIND OF -将float格式转换为string,它叫做:format_number()
文档:pyspark.sql.函数.格式_编号

from pyspark.sql.functions import format_number

# ...

sdf.select('first_name',
           'last_name',
           format_number('salary', 2).alias('sal'))\
    .where('salary BETWEEN 2000 AND 3000')\
    .orderBy('salary', ascending=False).show(10)

注意:如果您选择的别名与数据框中的列名相同,则可能会得到意外输出。
例如,将按预期工作

format_number('salary', 2).alias('sal'))

但是

format_number('salary', 2).alias('salary'))

将返回空集。

相关问题