我使用的是在dataproc上运行的spark2.4,每15分钟运行一次批处理作业,从一个bq表中获取一些数据,聚合它(sum)并通过pyspark.sql将它存储在另一个bq表中(overwrite)。
如果我在spark中查询表,看起来数据落后了大约一个小时。或者更确切地说,它在大约一个小时前就切断了。如果我在spark中查询的表上使用完全相同的查询,而是在bqweb控制台中,那么所有数据都在那里并且是最新的。我做错什么了吗?或者这是连接器的预期行为?
下面是我使用的基本代码:
orders_by_hour_query = """
SELECT
_id as app_id,
from_utc_timestamp(DATE_TRUNC('HOUR', created_at), 'America/Los_Angeles') as ts_hour,
SUM(total_price_usd) as gmv,
COUNT(order_id) as orders
FROM `orders`
WHERE DATE(from_utc_timestamp(created_at, 'America/Los_Angeles')) BETWEEN "2020-11-23" AND "2020-11-27"
GROUP BY 1, 2
ORDER BY 1, 2 ASC
"""
orders_df = spark.read.format("bigquery").load(bq_dataset+".orders")
orders_df.createOrReplaceTempView("orders")
orders_by_hour_df = spark.sql(orders_by_hour_query)
编辑:似乎每小时的截止时间几乎是任意的。例如,当前是“2020-11-25 06:31 utc”,但通过spark连接器从bq查询的最大时间戳是:“2020-11-25 05:56:39 utc。”
有关该表的详细信息:
Table size 2.65 GB
Long-term storage size 1.05 GB
Number of rows 4,120,280
Created Jun 3, 2020, 4:56:11 PM
Table expiration Never
Last modified Nov 24, 2020, 10:07:54 PM
Data location US
Table type Partitioned
Partitioned by Day
Partitioned on field created_at
Partition filter Not required
Streaming buffer statistics
Estimated size 1.01 MB
Estimated rows 1,393
Earliest entry time Nov 24, 2020, 9:57:00 PM
提前谢谢!
1条答案
按热度按时间j2qf4p5b1#
看起来丢失的数据可能在流缓冲区中,尚未到达bq存储。
这意味着您可以直接从bq查询它,但不能使用bq spark连接器,因为它通过存储api工作(https://cloud.google.com/bigquery/docs/reference/storage)
作为一种解决方法,您可以尝试以下方法。因为只有一小时的数据,如果数据足够小,你也可以直接使用bqapi,把pandasDataframe转换成sparkDataframe。