我正在使用pyspark 3.0.1
我想在满足条件时修改列,否则我想保持原来的值。
df.printSchema()
root
|-- ID: decimal(4,0) (nullable = true)
|-- Provider: string (nullable = true)
|-- Principal: float (nullable = false)
|-- PRINCIPALBALANCE: float (nullable = true)
|-- STATUS: integer (nullable = true)
|-- Installment Rate: float (nullable = true)
|-- Yearly Percentage: float (nullable = true)
|-- Processing Fee Percentage: double (nullable = true)
|-- Disb Date: string (nullable = true)
|-- ZOHOID: integer (nullable = true)
|-- UPFRONTPROCESSINGFEEBALANCE: float (nullable = true)
|-- WITHHOLDINGTAXBALANCE: float (nullable = true)
|-- UPFRONTPROCESSINGFEEPERCENTAGE: float (nullable = true)
|-- UPFRONTPROCESSINGFEEWHTPERCENTAGE: float (nullable = true)
|-- PROCESSINGFEEWHTPERCENTAGE: float (nullable = true)
|-- PROCESSINGFEEVATPERCENTAGE: float (nullable = true)
|-- BUSINESSSHORTCODE: string (nullable = true)
|-- EXCTRACTIONDATE: timestamp (nullable = true)
|-- fake Fee: double (nullable = false)
|-- fake WHT: string (nullable = true)
|-- fake Fee_WHT: string (nullable = true)
|-- Agency Fee CP: string (nullable = true)
|-- Agency VAT CP: string (nullable = true)
|-- Agency WHT CP: string (nullable = true)
|-- Agency Fee_VAT_WHT CP: string (nullable = true)
df.head(1)
[Row(ID=Decimal('16'), Provider='fake', Principal=2000.01, PRINCIPALBALANCE=0.2, STATUS=4, Installment Rate=0.33333333, Yearly Percentage=600.0, Processing Fee Percentage=0.20, Disb Date=None, ZOHOID=3000, UPFRONTPROCESSINGFEEBALANCE=None, WITHHOLDINGTAXBALANCE=None, UPFRONTPROCESSINGFEEPERCENTAGE=None, UPFRONTPROCESSINGFEEWHTPERCENTAGE=None, PROCESSINGFEEWHTPERCENTAGE=None, PROCESSINGFEEVATPERCENTAGE=16.0, BUSINESSSHORTCODE='20005', EXCTRACTIONDATE=datetime.datetime(2020, 11, 25, 5, 7, 58, 6000), fake Fee=1770.7, fake WHT='312.48', fake Fee_WHT='2,083.18', Agency Fee CP='566.62', Agency VAT CP='566.62', Agency WHT CP='186.39', Agency Fee_VAT_WHT CP='5,394.41')]
我已经读到,我可以用when和otherwise来完成它,但是当我用以下代码运行它时,我有一个错误:
from pyspark.sql.functions import when
df.withColumn('Gross Loan Amount',when(((df['Disb Date'] <='2018-03-19') &(df['ID']!=457))
,(df['Principal']+df['Agency Fee CP']+df['Agency VAT CP']).otherwise(df['Gross Loan Amount'])))
---------------------------------------------------------------------------
AnalysisException Traceback (most recent call last)
<ipython-input-18-abd559e65640> in <module>
1 from pyspark.sql.functions import when
2 df.withColumn('Gross Loan Amount',when(((df['Disb Date'] <='2018-03-19') &(df['ID']!=457))
----> 3 ,(df['Principal']+df['Agency Fee CP']+df['Agency VAT CP']).otherwise(df['Gross Loan Amount'])))
/usr/local/spark/python/pyspark/sql/dataframe.py in __getitem__(self, item)
1378 """
1379 if isinstance(item, basestring):
-> 1380 jc = self._jdf.apply(item)
1381 return Column(jc)
1382 elif isinstance(item, Column):
/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/usr/local/spark/python/pyspark/sql/utils.py in deco(*a,**kw)
132 # Hide where the exception came from that shows a non-Pythonic
133 # JVM exception message.
--> 134 raise_from(converted)
135 else:
136 raise
/usr/local/spark/python/pyspark/sql/utils.py in raise_from(e)
AnalysisException: Cannot resolve column name "Gross Loan Amount" among (ID, Provider, Principal, PRINCIPALBALANCE, STATUS, Installment Rate, Yearly Percentage, Processing Fee Percentage, Disb Date, ZOHOID, UPFRONTPROCESSINGFEEBALANCE, WITHHOLDINGTAXBALANCE, UPFRONTPROCESSINGFEEPERCENTAGE, UPFRONTPROCESSINGFEEWHTPERCENTAGE, PROCESSINGFEEWHTPERCENTAGE, PROCESSINGFEEVATPERCENTAGE, BUSINESSSHORTCODE, EXCTRACTIONDATE, fake Fee, fake WHT, fake Fee_WHT, Agency Fee CP, Agency VAT CP, Agency WHT CP, Agency Fee_VAT_WHT CP);
我创建了一个假Dataframe来测试它是否真的有效:
df_test = spark.createDataFrame(
[
(1, 'foo','a'), # create your data here, be consistent in the types.
(2, 'bar','b'),
(2, 'fee','c'),
],
['id', 'txt','letter'] # add your columns label here
)
df_test.show()
+---+---+------+
| id|txt|letter|
+---+---+------+
| 1|foo| a|
| 2|bar| b|
| 2|fee| c|
+---+---+------+
df_test.withColumn('txt',when(df_test['id']==1,'change').otherwise(df_test['txt'])).show()
+---+------+------+
| id| txt|letter|
+---+------+------+
| 1|change| a|
| 2| bar| b|
| 2| fee| c|
+---+------+------+
我做错了什么或者我没有考虑什么?
1条答案
按热度按时间w8ntj3qf1#
Gross Loan Amount
不是dataframe中的列,因此otherwise
在第一个示例中无法解析语句。但在你的第二个例子中,
txt
是现有列,因此otherwise
可以解析语句。是否确实要修改
Gross Loan Amount
不存在的列?