postgresql 查询发现账户共享违规(超人雷达)

bis0qfac  于 2023-05-06  发布在  PostgreSQL
关注(0)|答案(1)|浏览(95)

我正在尝试实现一个商业产品的跟踪系统,以帮助检测帐户共享违规行为。为此,系统具有以下tracking表:

  • created-发出HTTP请求的日期/时间
  • user_id-发出请求的用户的ID
  • lat-发出请求的IP地址的纬度
  • lng-发出请求的IP地址的经度

我完全被困在实现PostgreSQL的过程中了(+Postgis)查询,会给予我一个在过去N小时内的条目列表,按user_id分组,其中至少有一个违规记录-请求是在距离原始M分钟内发出的,而距离比可以用直线跨越的距离更远,以S km/h的速度移动。
它应该显示所有的用户,设法使请求从这样的距离彼此,他们不能跨越在这样的速度和时间间隔,除非他们是一个超人。
例如,一组典型的参数将是:定位并分组在过去24小时内设法在10分钟间隔内从两个或更多位置使用该系统的所有用户,而在这样的距离下,不能以120 km/h的直线移动。

f45qwnt8

f45qwnt81#

tl;dr

使用lead()按用户和日期查找下一个日志条目。加入下一个日志条目,并使用ST_LengthSperoid()和age()计算时间和距离增量。使用此增量计算速度并过滤数据。

说来话长

让我们创建一些演示数据:

--DROP TABLE IF EXISTS tracking;

CREATE TABLE tracking(
    created timestamp,
    user_id integer,
    lat numeric,
    lng numeric
);

--TRUNCATE TABLE tracking;

INSERT INTO tracking VALUES
-- user 1
('2023-05-02 8:00', 1, 50.01, 8.02),
('2023-05-02 8:30', 1, 50.02, 8.03),
('2023-05-02 8:55', 1, 50.03, 8.01),
-- user 1 violation
('2023-05-02 8:05', 1, 53.00, 7.00),
-- user 2
('2023-05-02 9:00', 2, 40.01, 8.03),
('2023-05-02 9:30', 2, 40.02, 8.02),
('2023-05-02 9:55', 2, 40.03, 8.03);

我猜lead()可以做到这一点,因为它返回以下条目的时间戳,按时间戳排序:

SELECT 
    user_id,
    created as t1_created,
    lat as t1_lat,
    lng as t1_lng,
    
    LEAD(created,1) OVER(
        PARTITION BY user_id
        ORDER BY created
    ) as t2_created 
FROM 
    tracking
-- also possible to check for M and N at this stage
;

这导致(没有双关语😉)

然后,您可以连接数据:

WITH next_timestamp as (
SELECT 
    user_id,
    created as t1_created,
    lat as t1_lat,
    lng as t1_lng,
    
    LEAD(created,1) OVER(
        PARTITION BY user_id
        ORDER BY created
    ) as t2_created
FROM 
    tracking
-- also possible to check for M and N at this stage
)

SELECT 
    next_timestamp.*,
    t2.lng as t2_lng,
    t2.lat as t2_lat
FROM
    next_timestamp
JOIN
    tracking t2
ON 
    next_timestamp.t2_created = t2.created AND
    next_timestamp.user_id = t2.user_id

其返回

做点数学题

WITH next_timestamp as (
SELECT 
    user_id,
    created as t1_created,
    lat as t1_lat,
    lng as t1_lng,
    
    LEAD(created,1) OVER(
        PARTITION BY user_id
        ORDER BY created
    ) as t2_created
    
FROM 
    tracking
),

next_location AS (
    SELECT 
        next_timestamp.*,
        t2.lng as t2_lng,
        t2.lat as t2_lat
    FROM
        next_timestamp
    JOIN
        tracking t2
    ON 
        next_timestamp.t2_created = t2.created AND
        next_timestamp.user_id = t2.user_id
),

with_deltas AS (

    SELECT
        *,
        age(t2_created, t1_created) as delta_time,
        ST_LengthSpheroid(
            ST_MakeLine(
                ST_Point(t1_lng, t1_lat),
                ST_Point(t2_lng, t2_lat)
            ),
            'SPHEROID["GRS_1980",6378137,298.257222101]'
            )/1000 as delta_distance_km

    FROM
        next_location   
)

SELECT 
    *,
    EXTRACT(epoch FROM delta_time)/3600 as delta_time_hours,
    delta_distance_km / (EXTRACT(epoch FROM delta_time)/3600) as speed_kmh
FROM 
    with_deltas

它返回所需的结构:

此时,您可以添加一个过滤器,如

-- include CTEs from above

SELECT 
    *,
    EXTRACT(epoch FROM delta_time)/3600 as delta_time_hours,
    delta_distance_km / (EXTRACT(epoch FROM delta_time)/3600) as speed_kmh
FROM 
    with_deltas
WHERE
     age(now(), t1_created) < '24 hour'::interval AND -- test for M
     age(t2_created, t1_created) < '10 Minute'::interval AND --test for N
     delta_distance_km / (EXTRACT(epoch FROM delta_time)/3600) > 120 --test for S

获取所需的user_id:

-- include CTEs from above

SELECT 
    DISTINCT(user_id)
FROM 
    with_deltas
WHERE
     age(now(), t1_created) < '24 hour'::interval AND -- test for M
     age(t2_created, t1_created) < '10 Minute'::interval AND --test for N
     delta_distance_km / (EXTRACT(epoch FROM delta_time)/3600) > 120 --test for S

相关问题