以上数据是半结构化的文本文件数据。
第二列由空格分隔。第3列由制表符分隔。第四列由,分隔。
如何为每一列定义模式(数据类型),如第一列'int',第二列'timestamp',第三列'int',第四列'string'。
我已经试着通过下面的代码将这条记录分成每一行
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pysp ark.sql.functions import regexp_extract
from pyspark.sql.types import IntegerType, StructField, StructType,
StringType, TimestampType
my_conf = SparkConf()
my_conf.set("spark.app.name", "my first application")
my_conf.set("spark.master","local[*]")
spark = SparkSession.builder.config(conf=my_conf).getOrCreate()
schema1 = StructType([StructField("order_id", IntegerType(),True),
StructField("date", TimestampType(),True),
StructField("customer_id", IntegerType(),True),
StructField("status", StringType(),True)])
myregex = r'^(\S+) (\S+)\t(\S+)\,(\S+)'
lines_df = spark.read.format("text")\
.option("path","C:/Users/Lenovo/Desktop/week11/week 11
datasets/orders_new.csv").load()
final_df =lines_df.select(regexp_extract('value',myregex,1).alias("order_id"),
regexp_extract('value',myregex,2).alias("date"),
regexp_extract('value',myregex,3).alias("customer_id"),
regexp_extract('value',myregex,4).alias("status"))
final_df.show()`
ans========
+--------+----------+-----------+---------------+
|order_id| date|customer_id| status|
+--------+----------+-----------+---------------+
| 1|2013-07-25| 11599| CLOSED|
| 2|2013-07-25| 256|PENDING_PAYMENT|
| 3|2013-07-25| 12111| COMPLETE|
| 4|2013-07-25| 8827| CLOSED|
| 5|2013-07-25| 11318| COMPLETE|
| 6|2013-07-25| 7130| COMPLETE|
| 7|2013-07-25| 4530| COMPLETE|
| 8|2013-07-25| 2911| PROCESSING|
| 9|2013-07-25| 5657|PENDING_PAYMENT|
| 10|2013-07-25| 5648|PENDING_PAYMENT|
| 11|2013-07-25| 918| PAYMENT_REVIEW|
| 12|2013-07-25| 1837| CLOSED|
+--------+----------+-----------+---------------+
final_df.printSchema()
|-- order_id: string (nullable = true)
|-- date: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- status: string (nullable = true)
as in printschema i get string datatype only...
------but now to define schema for this
whenever i do
df=spark.createDataFrame(final_df.rdd,schema1)
final_df.show() -------i get error here
so how to define schema
plzz tell..
字符串
1条答案
按热度按时间j5fpnvbx1#
在
regexp_extract
之后,您需要执行cast
。试试下面的语法。
Example:
字符串