pyspark错误,当我试图修改一个基于when-otherwise条件的列时

6yoyoihd  于 2021-05-16  发布在  Spark
关注(0)|答案(1)|浏览(936)

我正在使用pyspark 3.0.1
我想在满足条件时修改列,否则我想保持原来的值。

df.printSchema()

root
 |-- ID: decimal(4,0) (nullable = true)
 |-- Provider: string (nullable = true)
 |-- Principal: float (nullable = false)
 |-- PRINCIPALBALANCE: float (nullable = true)
 |-- STATUS: integer (nullable = true)
 |-- Installment Rate: float (nullable = true)
 |-- Yearly Percentage: float (nullable = true)
 |-- Processing Fee Percentage: double (nullable = true)
 |-- Disb Date: string (nullable = true)
 |-- ZOHOID: integer (nullable = true)
 |-- UPFRONTPROCESSINGFEEBALANCE: float (nullable = true)
 |-- WITHHOLDINGTAXBALANCE: float (nullable = true)
 |-- UPFRONTPROCESSINGFEEPERCENTAGE: float (nullable = true)
 |-- UPFRONTPROCESSINGFEEWHTPERCENTAGE: float (nullable = true)
 |-- PROCESSINGFEEWHTPERCENTAGE: float (nullable = true)
 |-- PROCESSINGFEEVATPERCENTAGE: float (nullable = true)
 |-- BUSINESSSHORTCODE: string (nullable = true)
 |-- EXCTRACTIONDATE: timestamp (nullable = true)
 |-- fake Fee: double (nullable = false)
 |-- fake WHT: string (nullable = true)
 |-- fake Fee_WHT: string (nullable = true)
 |-- Agency Fee CP: string (nullable = true)
 |-- Agency VAT CP: string (nullable = true)
 |-- Agency WHT CP: string (nullable = true)
 |-- Agency Fee_VAT_WHT CP: string (nullable = true)

df.head(1)

[Row(ID=Decimal('16'), Provider='fake', Principal=2000.01, PRINCIPALBALANCE=0.2, STATUS=4, Installment Rate=0.33333333, Yearly Percentage=600.0, Processing Fee Percentage=0.20, Disb Date=None, ZOHOID=3000, UPFRONTPROCESSINGFEEBALANCE=None, WITHHOLDINGTAXBALANCE=None, UPFRONTPROCESSINGFEEPERCENTAGE=None, UPFRONTPROCESSINGFEEWHTPERCENTAGE=None, PROCESSINGFEEWHTPERCENTAGE=None, PROCESSINGFEEVATPERCENTAGE=16.0, BUSINESSSHORTCODE='20005', EXCTRACTIONDATE=datetime.datetime(2020, 11, 25, 5, 7, 58, 6000), fake Fee=1770.7, fake WHT='312.48', fake Fee_WHT='2,083.18', Agency Fee CP='566.62', Agency VAT CP='566.62', Agency WHT CP='186.39', Agency Fee_VAT_WHT CP='5,394.41')]

我已经读到,我可以用when和otherwise来完成它,但是当我用以下代码运行它时,我有一个错误:

from pyspark.sql.functions import when
df.withColumn('Gross Loan Amount',when(((df['Disb Date'] <='2018-03-19') &(df['ID']!=457))
                                       ,(df['Principal']+df['Agency Fee CP']+df['Agency VAT CP']).otherwise(df['Gross Loan Amount'])))

---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
<ipython-input-18-abd559e65640> in <module>
      1 from pyspark.sql.functions import when
      2 df.withColumn('Gross Loan Amount',when(((df['Disb Date'] <='2018-03-19') &(df['ID']!=457))
----> 3                                        ,(df['Principal']+df['Agency Fee CP']+df['Agency VAT CP']).otherwise(df['Gross Loan Amount'])))

/usr/local/spark/python/pyspark/sql/dataframe.py in __getitem__(self, item)
   1378         """
   1379         if isinstance(item, basestring):
-> 1380             jc = self._jdf.apply(item)
   1381             return Column(jc)
   1382         elif isinstance(item, Column):

/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/usr/local/spark/python/pyspark/sql/utils.py in deco(*a,**kw)
    132                 # Hide where the exception came from that shows a non-Pythonic
    133                 # JVM exception message.
--> 134                 raise_from(converted)
    135             else:
    136                 raise

/usr/local/spark/python/pyspark/sql/utils.py in raise_from(e)

AnalysisException: Cannot resolve column name "Gross Loan Amount" among (ID, Provider, Principal, PRINCIPALBALANCE, STATUS, Installment Rate, Yearly Percentage, Processing Fee Percentage, Disb Date, ZOHOID, UPFRONTPROCESSINGFEEBALANCE, WITHHOLDINGTAXBALANCE, UPFRONTPROCESSINGFEEPERCENTAGE, UPFRONTPROCESSINGFEEWHTPERCENTAGE, PROCESSINGFEEWHTPERCENTAGE, PROCESSINGFEEVATPERCENTAGE, BUSINESSSHORTCODE, EXCTRACTIONDATE, fake Fee, fake WHT, fake Fee_WHT, Agency Fee CP, Agency VAT CP, Agency WHT CP, Agency Fee_VAT_WHT CP);

我创建了一个假Dataframe来测试它是否真的有效:

df_test = spark.createDataFrame(
    [
        (1, 'foo','a'), # create your data here, be consistent in the types.
        (2, 'bar','b'),
        (2, 'fee','c'),
    ],
    ['id', 'txt','letter'] # add your columns label here
)

df_test.show()

+---+---+------+
| id|txt|letter|
+---+---+------+
|  1|foo|     a|
|  2|bar|     b|
|  2|fee|     c|
+---+---+------+

df_test.withColumn('txt',when(df_test['id']==1,'change').otherwise(df_test['txt'])).show()

+---+------+------+
| id|   txt|letter|
+---+------+------+
|  1|change|     a|
|  2|   bar|     b|
|  2|   fee|     c|
+---+------+------+

我做错了什么或者我没有考虑什么?

w8ntj3qf

w8ntj3qf1#

Gross Loan Amount 不是dataframe中的列,因此 otherwise 在第一个示例中无法解析语句。
但在你的第二个例子中, txt 是现有列,因此 otherwise 可以解析语句。
是否确实要修改 Gross Loan Amount 不存在的列?

相关问题