Neo4j在取消事务时抛出ProtocolException

r6l8ljro  于 12个月前  发布在  其他
关注(0)|答案(1)|浏览(109)

使用neo4j作为API的数据库,如果用户使用相同或相似的参数重复请求(例如,他们通过滚动列表请求许多页数据,实际上只需要最后一页),我想取消前一个查询并运行最新的查询。我对此有一个可行的解决方案,取消由请求取消的CancellationTokenSource存储管理。db查询代码使用单个Driver示例和一个函数来读取数据;

//get the session
let getSession (sessionMode : AccessMode) (idriver:IDriver) =
        idriver.AsyncSession(Action<SessionConfigBuilder>(fun b -> b.WithDefaultAccessMode(sessionMode) |> ignore))

let readData
    (timezone : Timezone)
    idriver
    typeConversions
    (query:ReadQuery)
    (queryParams: QueryParams)
    : Async<Result<seq<Val list>, GraphError>> =
    asyncResult
        {
            use session = getSession AccessMode.Read idriver

            //provides cancellation
            let! token = Async.CancellationToken
            use! holder = Async.OnCancel(fun () -> session.CloseAsync() |> ignore)

            return!
                session.ExecuteReadAsync<Result<Val list seq, GraphError>>(fun tx ->
                    task {
                        try
                            let resultProcessF = asTypes (Timezone.value timezone) typeConversions
                            let! runResult = tx.RunAsync(ReadQuery.value query, queryParams)
                            let! results = runResult.ToListAsync(token)
                            let resultsSeq = results |> Seq.map resultProcessF
                            return Ok resultsSeq
                        with
                        | ex ->
                            return
                                Error (GraphError.ReadError
                                    {
                                        Error = (sprintf "Failed to run read data with query: %s" (ReadQuery.value query))
                                        ErrorDetails = ex.Message
                                    })
                    })
        }

如图所示,提供了取消功能,其中,PDMC函数使用令牌来取消ToListAsync方法,Async.OnCancel(fun () -> session.CloseAsync() |> ignore)函数在请求取消时关闭会话并回滚事务。
这通常是有效的,并且可以成功地回滚10或100个请求。然而,我经常会收到一个我很难诊断的异常;

---> System.AggregateException: One or more errors occurred. (Message 'ROLLBACK' cannot be handled by a session in the READY state.)
 ---> Neo4j.Driver.ProtocolException: Message 'ROLLBACK' cannot be handled by a session in the READY state.

这个例外似乎很简单;会话已就绪,但代码正在尝试ROLLBACK,这可能是session.CloseAsync()请求的结果。似乎没有一种机制可以在运行CloseAsync()之前检查会话状态来防止这种情况,在它周围加上一个try catch感觉有点笨拙。我使用的是默认的螺栓螺纹配置。
如果我的假设不正确,什么会导致此异常,以及如何解决此问题?

pkln4tw6

pkln4tw61#

我认为你的问题是你正在创建一个比赛,取消令牌被传递到runResult.ToListAsync(token),并使用注册来关闭会话。

  • 当会话关闭时,它会尝试回滚并关闭当前事务。
  • 当事务函数(ExecuteReadAsync等)捕获异常时,它们也会自动回滚。

我建议将令牌传递给runResult.ToListAsync(token),而不要使用Async.OnCancel(fun () -> session.CloseAsync() |> ignore)
这可能对您不起作用,因为驱动程序只在结果返回时检查取消令牌,因此需要一段时间才能响应的查询仍然会被阻止(这对驱动程序来说是一个悬而未决的问题)。如果您需要能够取消您的查询太你可以尝试使用赛车呼叫.

let racing(t: Task): Async<unit> =
    task {
        let! ct = Async.CancellationToken
        use race = Task.Delay(-1, ct)
        let! _ = Task.WhenAny(t, race)
        ct.ThrowIfCancellationRequested()
        return ()
    } |> Async.AwaitTask
    
let racingResult<'a>(t: Task<'a>): Async<'a> =
    task {
        let! ct = Async.CancellationToken
        use race = Task.Delay(-1, ct)
        let! _ = Task.WhenAny(t, race)
        ct.ThrowIfCancellationRequested()
        return t.Result
    } |> Async.AwaitTask

此外,你应该避免在事务的lambda内部捕获异常,因为这些异常被驱动程序用来知道它是否应该回滚,我认为更好的解决方案是在调用周围移动try catch。所以应该以这样的话结尾

let readData
    (timezone : Timezone)
    idriver
    typeConversions
    (query:ReadQuery)
    (queryParams: QueryParams)
    : Async<Result<seq<Val list>, GraphError>> =
    asyncResult
        {
            use session = getSession AccessMode.Read idriver

            //provides cancellation
            let! token = Async.CancellationToken

            return!
                try
                    session.ExecuteReadAsync<Result<Val list seq, GraphError>>(fun tx ->
                        task {
                                let resultProcessF = asTypes (Timezone.value timezone) typeConversions
                                let! results = racingResult (task {
                                    let! result = tx.RunAsync(ReadQuery.value query, queryParams)
                                    return! result.ToListAsync(token)
                                }, token) 
                                let resultsSeq = results |> Seq.map resultProcessF
                                return Ok resultsSeq
                        })
                with
                | ex ->
                    return
                        Error (GraphError.ReadError
                            {
                                Error = (sprintf "Failed to run read data with query: %s" (ReadQuery.value query))
                                ErrorDetails = ex.Message
                            })
        }

我不是一个F#用户,但我希望这能有所帮助。

相关问题