Pyspark:检查一个列是否包含来自另一个列的值

atmip9wb  于 2023-04-29  发布在  Spark
关注(0)|答案(2)|浏览(158)

我有一个这样的数据集:
| 触发剂A|触发剂B|触发剂C|触发器D|
| --------------|--------------|--------------|--------------|
| 一种|零|零|零|
| 零|零|B|零|
| 零|c|零|零|
另一个像这样的数据集
| 集管|问题|
| --------------|--------------|
| 1|一种|
| 二|B|
| 三|x|
| 四|e|
我想创建一个新列“check”,如果问题在触发器A、B、c或D中,则返回1,否则返回0
在这个例子中,这是结果:
| 集管|问题|核对|
| --------------|--------------|--------------|
| 1|一种|1|
| 二|B|1|
| 三|x|0|
| 四|e|0|
谢谢!

6rqinv9w

6rqinv9w1#

如果你提到每一行最多可以有一个触发器值,你可以把第一个 Dataframe 转换成一个列,然后和第二个 Dataframe 连接。
这里有一个例子

trigger_issue_sdf = data1_sdf.select(
    func.coalesce(*data1_sdf.columns).alias("issue"), func.lit(1).alias("check")
).dropDuplicates()

data2_sdf.join(trigger_issue_sdf, ["issue"], "left").fillna(0, subset=["check"]).show()

# +-----+------+-----+
# |issue|header|check|
# +-----+------+-----+
# |    b|     2|    1|
# |    a|     1|    1|
# |    x|     3|    0|
# |    e|     4|    0|
# +-----+------+-----+
wtlkbnrh

wtlkbnrh2#

示例表:

from pyspark.sql import types as T
from pyspark.sql import functions as F

triggers = spark_session.createDataFrame(
    data=[("a", None, None, None), (None, None, "b", None), (None, "c", None, None)],
    schema=T.StructType(
        [
            T.StructField("A", T.StringType()),
            T.StructField("B", T.StringType()),
            T.StructField("C", T.StringType()),
            T.StructField("D", T.StringType()),
        ]
    ),
)
triggers.show()

issues = spark_session.createDataFrame(
    data=[("1", "a"), ("2", "b"), ("3", "x"), ("4", "e")],
    schema=T.StructType(
        [
            T.StructField("header", T.StringType()),
            T.StructField("issue", T.StringType()),
        ]
    ),
)
issues.show()

输出:

+----+----+----+----+
|   A|   B|   C|   D|
+----+----+----+----+
|   a|null|null|null|
|null|null|   b|null|
|null|   c|null|null|
+----+----+----+----+

+------+-----+
|header|issue|
+------+-----+
|     1|    a|
|     2|    b|
|     3|    x|
|     4|    e|
+------+-----+

首先,union从triggers表发出值:

distinct_trigger_issues = (
    (
        triggers.select("A")
        .unionAll(triggers.select("B"))
        .unionAll(triggers.select("C"))
        .unionAll(triggers.select("D"))
        .distinct()
    )
    .withColumnRenamed("A", "issue")
    .withColumn("check", F.lit(1))
)
distinct_trigger_issues.show()

输出:

+-----+-----+
|issue|check|
+-----+-----+
| null|    1|
|    c|    1|
|    b|    1|
|    a|    1|
+-----+-----+

接下来,左连接issues表和distinct_trigger_issues。在结果表中,当问题在distinct_trigger_issues中时,check列值等于1,否则为null。最后,用0替换null。

issues_checked = issues.join(distinct_trigger_issues, how="left", on=["issue"]).fillna(
    value=0, subset=["check"]
)
issues_checked.show()

输出:

+-----+------+-----+
|issue|header|check|
+-----+------+-----+
|    x|     3|    0|
|    e|     4|    0|
|    b|     2|    1|
|    a|     1|    1|
+-----+------+-----+

相关问题