如何在pysparK中为半结构化文本文件定义Schema

w8rqjzmb  于 2023-08-02  发布在  Spark
关注(0)|答案(1)|浏览(95)

以上数据是半结构化的文本文件数据。
第二列由空格分隔。第3列由制表符分隔。第四列由,分隔。
如何为每一列定义模式(数据类型),如第一列'int',第二列'timestamp',第三列'int',第四列'string'。
我已经试着通过下面的代码将这条记录分成每一行

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pysp  ark.sql.functions import regexp_extract
from pyspark.sql.types import IntegerType, StructField, StructType, 
     StringType, TimestampType
 
my_conf = SparkConf()
my_conf.set("spark.app.name", "my first application")

my_conf.set("spark.master","local[*]")

spark = SparkSession.builder.config(conf=my_conf).getOrCreate()

schema1 = StructType([StructField("order_id", IntegerType(),True),
StructField("date", TimestampType(),True),
StructField("customer_id", IntegerType(),True),
StructField("status", StringType(),True)])

myregex = r'^(\S+) (\S+)\t(\S+)\,(\S+)'

lines_df = spark.read.format("text")\
           .option("path","C:/Users/Lenovo/Desktop/week11/week 11 
            datasets/orders_new.csv").load()
 
 
final_df =lines_df.select(regexp_extract('value',myregex,1).alias("order_id"),
regexp_extract('value',myregex,2).alias("date"),
regexp_extract('value',myregex,3).alias("customer_id"),
regexp_extract('value',myregex,4).alias("status"))

final_df.show()`

ans========
+--------+----------+-----------+---------------+
|order_id|      date|customer_id|         status|
+--------+----------+-----------+---------------+
|       1|2013-07-25|      11599|         CLOSED|
|       2|2013-07-25|        256|PENDING_PAYMENT|
|       3|2013-07-25|      12111|       COMPLETE|
|       4|2013-07-25|       8827|         CLOSED|
|       5|2013-07-25|      11318|       COMPLETE|
|       6|2013-07-25|       7130|       COMPLETE|
|       7|2013-07-25|       4530|       COMPLETE|
|       8|2013-07-25|       2911|     PROCESSING|
|       9|2013-07-25|       5657|PENDING_PAYMENT|
|      10|2013-07-25|       5648|PENDING_PAYMENT|
|      11|2013-07-25|        918| PAYMENT_REVIEW|
|      12|2013-07-25|       1837|         CLOSED|
+--------+----------+-----------+---------------+

final_df.printSchema()

|-- order_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- status: string (nullable = true)

as in printschema i get string datatype only...


------but now to define schema for this
whenever i do

    df=spark.createDataFrame(final_df.rdd,schema1)

    final_df.show()    -------i get error here

so how to define schema
plzz tell..

字符串

j5fpnvbx

j5fpnvbx1#

regexp_extract之后,您需要执行cast
试试下面的语法。

Example:

final_df =lines_df.select(regexp_extract('value',myregex,1).cast("int").alias("order_id"),
regexp_extract('value',myregex,2).cast("timestamp").alias("date"),
regexp_extract('value',myregex,3).cast("int").alias("customer_id"),
regexp_extract('value',myregex,4).alias("status"))

final_df.show()
df=spark.createDataFrame(final_df.rdd,schema1)
df.printSchema()
#root
# |-- order_id: integer (nullable = true)
# |-- date: timestamp (nullable = true)
# |-- customer_id: integer (nullable = true)
# |-- status: string (nullable = true)

字符串

相关问题