pyspark 获取每个字符串之间/字符串内的字符串

ca1c2owp  于 2023-03-11  发布在  Spark
关注(0)|答案(2)|浏览(186)

我有列值需要在每次\出现后拆分。我需要获取\每次出现中的每个单词并创建新列。

如何在pyspark(databricks)中执行此操作?如有帮助,我们将不胜感激。

z31licg0

z31licg01#

根据您的输入,我认为这是您的DataFrame:

+------+------+------+---------------------+
|FieldA|FieldB|FieldC|Values               |
+------+------+------+---------------------+
|1     |a     |hello |\abc\def\ghi\jk-l\mno|
|2     |b     |you   |\I\like\to\Code      |
|3     |b     |there |\Th-at\works         |
+------+------+------+---------------------+

1.将values列中所有不需要的字符替换为|

df = df \
.withColumn("Values", regexp_replace("Values", "\\\\", "|")) \
.withColumn("Values", regexp_replace("Values", "\\a", "|a")) \
.withColumn("Values", regexp_replace("Values", "\\t", "|t")) \
.withColumn("Values", regexp_replace("Values", "\\s", "|s")) \
.withColumn("Values", regexp_replace("Values", "\\n", "|n"))

1.现在使用指定的分隔符|拆分列值

df = df.withColumn("Values", split("Values", "\|"))

1.将记录提取为数组格式

records = df.rdd.map(lambda row: row.asDict()).collect()

1.为了插入可变列的记录,必须将数据作为键值对发送

output_records=[]
for record in records:
    values = record["Values"]
    words = [value for value in values if len(value)>0]
    
    for i, word in enumerate(words):
        column_name, column_values = f"col_{i+1}", word
        if record.get(column_name, None) is None:
            record[column_name] = word
        
    del record["Values"]
    output_records.append(record)

output_records的外观如下所示:

[
{'FieldA': 1, 'FieldB': 'a', 'FieldC': 'hello', 'col_1': 'abc', 'col_2': 'def', 'col_3': 'ghi', 'col_4': 'jk-l', 'col_5': 'mno'},
{'FieldA': 2, 'FieldB': 'b', 'FieldC': 'you', 'col_1': 'I', 'col_2': 'like', 'col_3': 'to', 'col_4': 'Code'},
{'FieldA': 3, 'FieldB': 'b', 'FieldC': 'there', 'col_1': 'Th-at', 'col_2': 'works'}
]

1.现在使用output_records创建一个Spark数据框

spark.createDataFrame(output_records).show()

输出:

+------+------+------+-----+-----+-----+-----+-----+
|FieldA|FieldB|FieldC|col_1|col_2|col_3|col_4|col_5|
+------+------+------+-----+-----+-----+-----+-----+
|     1|     a| hello|  abc|  def|  ghi| jk-l|  mno|
|     2|     b|   you|    I| like|   to| Code| null|
|     3|     b| there|Th-at|works| null| null| null|
+------+------+------+-----+-----+-----+-----+-----+
cig3rfwq

cig3rfwq2#

以下是我的两分钱:

from pyspark.sql.functions import *
import pyspark.sql.functions as F

from pyspark.sql.functions import split

data = [(1, 'a', 'hello', r'\abc\def\ghi\jk-l\mno'),
        (2, 'b', 'you', r'\I\like\to\Code'),
        (3, 'b', 'there', r'\Th-at\works')]

df = spark.createDataFrame(data, ['FieldA', 'FieldB', 'FieldC', 'Values'])

# Split the column, remove the blanks, create dynamic columns based on the array

df = df.withColumn('split_col',F.split(col('Values'),r'\\'))
df = df.withColumn("split_array", array_remove(df["split_col"], ""))
df = df.withColumn('cnt', F.size('split_array'))

max = df.agg(F.max('cnt')).first()[0]

textcols = [F.col('split_array')[i].alias(f'col{i+1}') for i in range(0, max)]

df.select([F.col('FieldA'),F.col('FieldB'),F.col('FieldC')] + textcols).show()

检查下面的示例输出:

相关问题