使用schema从文件中读取时间戳

eqoofvh9  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(449)

我在做一个结构化的流媒体工作。
我从文件中读取的数据包含时间戳(毫秒)、设备ID和该设备报告的值。多个设备报告数据。
我正在尝试编写一个作业,将所有设备发送的值聚合(求和)到1分钟的滚动窗口中。
我遇到的问题是时间戳。
当我试图将“timestamp”解析为long时,window函数抱怨它需要“timestamp type”。当我试图解析为timestamptype时,我得到了 .MatchError 异常(下面可以看到完整的异常),我正在努力找出原因以及正确的处理方法

  1. // Create schema
  2. StructType readSchema = new StructType().add("value" , "integer")
  3. .add("deviceId", "long")
  4. .add("timestamp", new TimestampType());
  5. // Read data from file
  6. Dataset<Row> inputDataFrame = sparkSession.readStream()
  7. .schema(readSchema)
  8. .parquet(path);
  9. Dataset<Row> aggregations = inputDataFrame.groupBy(window(inputDataFrame.col("timestamp"), "1 minutes"),
  10. inputDataFrame.col("deviceId"))
  11. .agg(sum("value"));

例外情况:

  1. org.apache.spark.sql.types.TimestampType@3eeac696 (of class org.apache.spark.sql.types.TimestampType)
  2. scala.MatchError: org.apache.spark.sql.types.TimestampType@3eeac696 (of class org.apache.spark.sql.types.TimestampType)
  3. at org.apache.spark.sql.catalyst.encoders.RowEncoder$.externalDataTypeFor(RowEncoder.scala:215)
  4. at org.apache.spark.sql.catalyst.encoders.RowEncoder$.externalDataTypeForInput(RowEncoder.scala:212)
  5. at org.apache.spark.sql.catalyst.expressions.objects.ValidateExternalType.<init>(objects.scala:1692)
  6. at org.apache.spark.sql.catalyst.encoders.RowEncoder$.$anonfun$serializerFor$3(RowEncoder.scala:175)
  7. at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
  8. at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
  9. at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
  10. at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
  11. at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
  12. at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
  13. at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:198)
  14. at org.apache.spark.sql.catalyst.encoders.RowEncoder$.serializerFor(RowEncoder.scala:171)
  15. at org.apache.spark.sql.catalyst.encoders.RowEncoder$.apply(RowEncoder.scala:66)
  16. at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:92)
  17. at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  18. at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
  19. at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:232)
  20. at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:242)
  21. at org.apache.spark.sql.streaming.DataStreamReader.parquet(DataStreamReader.scala:450)
omhiaaxx

omhiaaxx1#

通常,当时间戳作为 long 你可以把它转换成 timestamp 键入如下所示:

  1. // Create schema and keep column 'timestamp' as long
  2. StructType readSchema = new StructType()
  3. .add("value", "integer")
  4. .add("deviceId", "long")
  5. .add("timestamp", "long");
  6. // Read data from file
  7. Dataset<Row> inputDataFrame = sparkSession.readStream()
  8. .schema(readSchema)
  9. .parquet(path);
  10. // convert timestamp column into a proper timestamp type
  11. Dataset<Row> df1 = inputDataFrame.withColumn("new_timestamp", expr("timestamp/1000").cast(DataTypes.TimestampType));
  12. df1.show(false)
  13. +-----+--------+-------------+-----------------------+
  14. |value|deviceId|timestamp |new_timestamp |
  15. +-----+--------+-------------+-----------------------+
  16. |1 |1337 |1618836775397|2021-04-19 14:52:55.397|
  17. +-----+--------+-------------+-----------------------+
  18. df1.printSchema();
  19. root
  20. |-- value: integer (nullable = true)
  21. |-- deviceId: long (nullable = true)
  22. |-- timestamp: long (nullable = true)
  23. |-- new_timestamp: timestamp (nullable = true)
展开查看全部

相关问题