如何在pyspark-hive-sql中获得postgres命令的'nth\u值'等价物?

amrnrhlw  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(366)

我在解决这个例子:https://www.windowfunctions.com/questions/grouping/5
在这里,它们使用oracle或postgres命令 nth_value 为了得到答案,但这并不是在pyspark使用的hivesql中实现的,我想知道如何在pyspark中获得相同的结果。

postgres sql代码

所有大于第4个的权重都被指定为第4个最小权重
前三个最轻的权重赋值为99.9

select name, weight, 
coalesce(nth_value(weight, 4) over (order by weight), 99.9) as imagined_weight
from cats 
order by weight

问题:如何使用pyspark获得以下结果?

name    weight  imagined_weight
Tigger  3.8 99.9
Molly   4.2 99.9
Ashes   4.5 99.9
Charlie 4.8 4.8
Smudge  4.9 4.8
Felix   5.0 4.8
Puss    5.1 4.8
Millie  5.4 4.8
Alfie   5.5 4.8
Misty   5.7 4.8
Oscar   6.1 4.8
Smokey  6.1 4.8

数据

import numpy as np
import pandas as pd

import pyspark
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark import SparkConf, SparkContext, SQLContext
spark = pyspark.sql.SparkSession.builder.appName('app').getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
sqc = sqlContext

# spark_df = sqlContext.createDataFrame(pandas_df)

df = pd.DataFrame({
    'name': [
        'Molly', 'Ashes', 'Felix', 'Smudge', 'Tigger', 'Alfie', 'Oscar',
        'Millie', 'Misty', 'Puss', 'Smokey', 'Charlie'
    ],
    'breed': [
        'Persian', 'Persian', 'Persian', 'British Shorthair',
        'British Shorthair', 'Siamese', 'Siamese', 'Maine Coon', 'Maine Coon',
        'Maine Coon', 'Maine Coon', 'British Shorthair'
    ],
    'weight': [4.2, 4.5, 5.0, 4.9, 3.8, 5.5, 6.1, 5.4, 5.7, 5.1, 6.1, 4.8],
    'color': [
        'Black', 'Black', 'Tortoiseshell', 'Black', 'Tortoiseshell', 'Brown',
        'Black', 'Tortoiseshell', 'Brown', 'Tortoiseshell', 'Brown', 'Black'
    ],
    'age': [1, 5, 2, 4, 2, 5, 1, 5, 2, 2, 4, 4]
})

schema = StructType([
    StructField('name', StringType(), True),
    StructField('breed', StringType(), True),
    StructField('weight', DoubleType(), True),
    StructField('color', StringType(), True),
    StructField('age', IntegerType(), True),
])

sdf = sqlContext.createDataFrame(df, schema)
sdf.createOrReplaceTempView("cats")

spark.sql('select * from cats limit 2').show()

我目前的努力


# My attempt

q = """
select weight from (
  select name,weight, 
         ROW_NUMBER() over (ORDER BY weight) as row_no
  from cats group by weight,name
  ) res 
where res.row_no = 4
"""
spark.sql(q).show()
m528fe3b

m528fe3b1#

另一种选择是 row_number() 以及条件窗口函数:

select
    name,
    weight,
    coalesce(
        max(case when rn = 4 then weight end) over(order by rn),
        99.9
    ) imagined_weight
from (select c.*, row_number() over(order by weight) rn from cats c) c

相关问题