我正在通过kafka从mongodb获得一个pyspark中具有timestamp值的集合。在mongodb中,模式如下:
"Timestamp": {
"$date": "2020-02-28T11:24:28.810Z"
}
在pyspark中,我使用以下模式:
StructType([...
StructField("Timestamp",StructType([StructField("$date",TimestampType(),True)]), True), \
...
我使用from_json()解析json字符串:
data_stream_clean = data_stream_after \
.select(from_json(col("json_string"), self.schema) \
.alias("detail")) \
.select("detail.*") \
.withColumn("Timestamp", col("Timestamp").getField("$date"))
然后我创建一个tempview来访问这些列,它显示:
+---+--------------------+
| Id| Timestamp|
+---+--------------------+
|231|52129-10-04 10:00...
这是2020-02-28t11:24:28.810z的错误转换。我无法将其转换为显示以下错误的df:
ValueError: year 52129 is out of range
我还使用了unix\u timestamp(),它显示了正确的转换,即1582889068810,但使用int数据类型。不过,我想在时间戳我的数据。
1条答案
按热度按时间afdcj2ne1#
我试着以josn的形式阅读您的示例输入,它在scala中运行良好。你能告诉我你是怎么加载Dataframe或者场景的吗?
代码
结果-
就连我也试着把它读成
string
然后把柱子铸成Timestamp
,但结果是一样的-结果:
如果您正在使用加载数据
DataFrameReader
然后您可以使用下面的选项更改格式-来自\u json
从json加载数据对我来说很好