在dataproc上使用spark bigquery连接器,数据似乎延迟了一个小时

xe55xuns  于 2021-05-17  发布在  Spark
关注(0)|答案(1)|浏览(522)

我使用的是在dataproc上运行的spark2.4,每15分钟运行一次批处理作业,从一个bq表中获取一些数据,聚合它(sum)并通过pyspark.sql将它存储在另一个bq表中(overwrite)。
如果我在spark中查询表,看起来数据落后了大约一个小时。或者更确切地说,它在大约一个小时前就切断了。如果我在spark中查询的表上使用完全相同的查询,而是在bqweb控制台中,那么所有数据都在那里并且是最新的。我做错什么了吗?或者这是连接器的预期行为?
下面是我使用的基本代码:

orders_by_hour_query = """
SELECT

_id as app_id,
from_utc_timestamp(DATE_TRUNC('HOUR', created_at), 'America/Los_Angeles') as ts_hour,
SUM(total_price_usd) as gmv,
COUNT(order_id) as orders

FROM `orders`

WHERE DATE(from_utc_timestamp(created_at, 'America/Los_Angeles')) BETWEEN "2020-11-23" AND "2020-11-27"

GROUP BY 1, 2

ORDER BY 1, 2 ASC
"""

orders_df = spark.read.format("bigquery").load(bq_dataset+".orders")
orders_df.createOrReplaceTempView("orders")
orders_by_hour_df = spark.sql(orders_by_hour_query)

编辑:似乎每小时的截止时间几乎是任意的。例如,当前是“2020-11-25 06:31 utc”,但通过spark连接器从bq查询的最大时间戳是:“2020-11-25 05:56:39 utc。”
有关该表的详细信息:

Table size  2.65 GB
Long-term storage size  1.05 GB
Number of rows  4,120,280
Created Jun 3, 2020, 4:56:11 PM
Table expiration    Never
Last modified   Nov 24, 2020, 10:07:54 PM
Data location   US
Table type  Partitioned
Partitioned by  Day
Partitioned on field    created_at
Partition filter    Not required

Streaming buffer statistics

Estimated size  1.01 MB
Estimated rows  1,393
Earliest entry time Nov 24, 2020, 9:57:00 PM

提前谢谢!

j2qf4p5b

j2qf4p5b1#

看起来丢失的数据可能在流缓冲区中,尚未到达bq存储。
这意味着您可以直接从bq查询它,但不能使用bq spark连接器,因为它通过存储api工作(https://cloud.google.com/bigquery/docs/reference/storage)
作为一种解决方法,您可以尝试以下方法。因为只有一小时的数据,如果数据足够小,你也可以直接使用bqapi,把pandasDataframe转换成sparkDataframe。

`def bq2df(QUERY):
    bq = bigquery.Client()
    query_job = bq.query(QUERY)
    query_job.result()

    df = spark.read.format('bigquery') \
        .option('dataset', query_job.destination.dataset_id) \
        .load(query_job.destination.table_id) \
        .persist(StorageLevel.MEMORY_AND_DISK)

    return df

相关问题