我想在pyspark中创建一个泛型函数,将dataframe和datatype作为参数,并过滤不满足条件的列。我对python不是很在行,我被困在一个点上,从那里我无法找到我可以如何做到这一点。我有一个代码的scala表示,它做同样的事情。
//sample data
val df = Seq(("587","mumbai",Some(5000),5.05),("786","chennai",Some(40000),7.055),("432","Gujarat",Some(20000),6.75),("2","Delhi",None,10.0)).toDF("Id","City","Salary","Increase").withColumn("RefID",$"Id")
import org.apache.spark.sql.functions.col
def selectByType(colType: DataType, df: DataFrame) = {
val cols = df.schema.toList
.filter(x => x.dataType == colType)
.map(c => col(c.name))
df.select(cols:_*)
}
val res = selectByType(IntegerType, df)
res是只包含整数列的Dataframe,在本例中是salary列,我们动态删除了所有其他具有不同类型的列。
我不想在pyspark有同样的行为,但我不能做到这一点。
这就是我尝试过的
//sample data
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DoubleType
schema = StructType([ \
StructField("firstname",StringType(),True), \
StructField("middlename",StringType(),True), \
StructField("lastname",StringType(),True), \
StructField("id", StringType(), True), \
StructField("gender", StringType(), True), \
StructField("salary", IntegerType(), True), \
StructField("raise",DoubleType(),True) \
])
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data2 = [("James","","Smith","36636","M",3000,2.5),
("Michael","Rose","","40288","M",4000,4.7),
("Robert","","Williams","42114","M",4000,8.9),
("Maria","Anne","Jones","39192","F",4000,0.0),
("Jen","Mary","Brown","","F",-1,-1.2)
]
df = spark.createDataFrame(data=data2,schema=schema)
//getting the column list from schema of the dataframe
pschema = df.schema.fields
datatypes = [IntegerType,DoubleType] //column datatype that I want.
out = filter(lambda x: x.dataType.isin(datatypes), pschema) //gives invalid syntax error.
有人能帮我解决我做错了什么事吗。scala代码只传递一个数据类型,但根据我的用例,我想处理这样一个场景:我们可以传递多个数据类型,然后我们得到具有指定数据类型的所需列的Dataframe。
一开始,如果有人能告诉我如何使它在单个数据类型上工作,那么我可以尝试一下,看看我是否能在多个数据类型上工作。
注意:scala和pyspark的样本数据是不同的,因为我从某处复制pyspark样本数据只是为了加快操作,因为我只关心最终的输出需求。
暂无答案!
目前还没有任何答案,快来回答吧!