websocket Web套接字执行延迟(可能)和其他一些问题

fslejnso  于 2023-10-20  发布在  其他
关注(0)|答案(1)|浏览(173)

我的应用程序与Binance相关,我使用Binance.Net(来自JKopf)作为API Package 器,并在某些时间间隔内使用Hangfire进行cron作业任务。
我必须执行三种类型的任务:

  • cron作业任务在完成的蜡烛(为所有蜡烛之前的最后一个).例如,这些在下一个小时开始时执行,例如。晚上11:00,延迟1秒,因为币安有时会延迟数据。
  • 使用网络套接字的真实的时间检查。基本上,他们的API只为最后一支蜡烛提供了一个流,因为它是唯一一个改变的,这是很明显的。
  • 当满足条件时,在10分钟后执行计划任务。看看PerformInRealTimeLogicAsyncisTriggered。当isTriggered为true时,它应该在10分钟后安排另一次检查。如果到一小时结束时还剩下不到10分钟,它应该在最后一分钟执行检查。例如,假设当前时间是10:52 pm,这意味着没有10分钟了。它应该在下一个小时(10:59 pm)之前的最后一分钟执行检查。如果第二个10分钟检查返回false,它应该在另一个10分钟后安排另一个任务。

主要问题是Web套接字逻辑(Web套接字)延迟,即。它没有按时执行,而是有一些延迟,并且 *10分钟后 * 逻辑不能正常工作。
问题:
1.在结束前1分钟执行检查的逻辑,当没有10分钟时,不起作用。
1.就像我上面说的,整个逻辑在某种程度上是延迟的。可能是await Task.Delay(1000),我不确定。
我知道这是延迟,因为它有时会忘记执行某些分钟。

Current time: 12/6/2019 12:41:00 AM
// note that there isn't 12:42:00, it forgot to execute it
Current time: 12/6/2019 12:43:00 AM

1.目前,我订阅了cron作业(DoWork())中的web套接字,我认为这就是延迟的问题。是在cron作业之外订阅并在应用程序停止之前取消订阅的正确方法。

/ Start a cron job on each hour
RecurringJob.AddOrUpdate("task{i}", () => DoWork(), "0 */1 * * *", TimeZoneInfo.Local);
await _socketClient.Unsubscribe(_subscription);

public void DoWork()
{
    ...

    // That delay is needed because Binance sometimes delays the new candles
    await Task.Delay(1000);

    var klines = await _binanceService.GetAllKlinesAsync();

    // Web sockets
    await PerformInRealTimeLogicAsync();

    ...
}

// Web sockets
private bool _isProcessing = false;

public void async Task PerformInRealTimeLogicAsync()
{
    var result = await _socketClient.SubscribeToKlineUpdatesAsync(symbol, interval, async data =>
    {
        if (!_isProcessing && Data?.Data.Close != data.Data.Close)
        {
            _isProcessing = true;
            Data = data;

            // First logic
            var result = await FirstLogicAsync();
            if (result)
            {
                _isProcessing = false;
                return;
            }

            // Second logic
            var isTriggered = await SecondLogicAsync();
            if (isTriggered)
            {
                // Wait 10 minutes and check again same condition
                double seconds = RemainingSecondsUntilNextHour();
                if (seconds <= 10 * 60)
                {
                    BackgroundJob.Schedule(() => DelayedJob(), TimeSpan.FromSeconds(seconds - (1 * 60)));
                }
                else
                {
                    BackgroundJob.Schedule(() => DelayedJob(), TimeSpan.FromMinutes(10));
                }

                _isProcessing = false;
                return;
            }

            _isProcessing = false;
        }
    });

    if (result.Success)
    {
        _subscription = result.Data;
    }
}

private double RemainingSecondsUntilNextHour()
{
    var timeOfDay = DateTime.Now.TimeOfDay;
    var nextFullHour = TimeSpan.FromHours(Math.Ceiling(timeOfDay.TotalHours));
    var delta = (nextFullHour - timeOfDay).TotalSeconds;
    return delta;
}

// Scheduled job
public void DelayedJob()
{
    if (condition)
    {
        // Conditions are met, so we stop
    }
    else
    {
        // Conditions are not met, so we schedule another job after another 10 minutes
        double seconds = RemainingSecondsUntilNextHour();
        if (seconds <= 10 * 60)
        {
            BackgroundJob.Schedule(() => DelayedJob(), TimeSpan.FromSeconds(seconds - (1 * 60)));
        }
        else
        {
            BackgroundJob.Schedule(() => DelayedJob(), TimeSpan.FromMinutes(10));
        }
    }
}

**编辑:**问题是我总是在每个时间间隔调用DoWork()方法订阅流,我从来没有取消订阅。我应该在最后一分钟取消订阅吗?

请注意,我仍然没有修复它,我只是意识到这是原因之一。

lnvxswe2

lnvxswe21#

让我们分解您的担忧并逐一解决它们:
1.在没有10分钟的情况下,在结束前1分钟执行检查的逻辑不起作用
乍一看,逻辑似乎是正确的。当剩余秒数小于或等于10分钟(600秒)时,它将在下一小时前1分钟调度作业。如果逻辑没有按预期触发,您应该验证RemainingSecondsUntilNextHour()返回的值是否正确。
1.整个逻辑延迟
await Task.Delay(1000);将引入1秒延迟。如果你观察到它偶尔会跳过几分钟,那么可能是代码的其他部分引入了延迟(例如,阻塞调用或其他密集操作)。如果后台作业队列过载,Hangfire本身可能会引入延迟。

*在CronJob内部订阅WebSockets

保持每小时订阅一个WebSocket而不取消订阅以前的订阅是一个坏主意。这将产生大量的开放连接,从而导致各种问题。在再次订阅之前,请务必取消订阅以前的任何WebSocket连接。
将WebSocket连接生命周期与作业逻辑分开管理可能会更好。这里有一个潜在的解决方案:

*订阅:与其在DoWork()内部订阅,不如考虑在应用程序启动时设置订阅。
*退订:设置一个清理作业,例如,每天运行一次,以取消订阅任何陈旧的WebSocket连接。或者,您可以跟踪最后一次从连接接收消息的时间,如果经过一定的时间没有任何新消息,则取消订阅。
*修复重复订阅

你的认识是正确的。重复订阅而不取消订阅可能会导致多个WebSocket连接同时打开。这可能会导致处理延迟,甚至可能导致某些消息被丢弃。
要解决此问题,您可以:

*每小时结束时退订WebSocket:这确保在任何给定时间只有一个WebSocket连接。
*实现检测非活动WebSocket连接的机制:例如,如果在一定时间内没有收到任何消息,则将连接视为非活动连接并取消订阅。

以下是一种可能的修订方法:
1.修改DoWork()

public async Task DoWork()
{
    ...

    // Unsubscribe previous socket
    if (_subscription != null)
    {
        await _socketClient.Unsubscribe(_subscription);
        _subscription = null;
    }

    // That delay is needed because Binance sometimes delays the new candles
    await Task.Delay(1000);

    var klines = await _binanceService.GetAllKlinesAsync();

    // Web sockets
    await PerformInRealTimeLogicAsync();

    ...
}

1.将WebSocket逻辑调整为仅在没有活动订阅的情况下进行订阅:

public async Task PerformInRealTimeLogicAsync()
{
    if (_subscription != null) return;

    var result = await _socketClient.SubscribeToKlineUpdatesAsync(symbol, interval, async data =>
    {
        ...
    });

    if (result.Success)
    {
        _subscription = result.Data;
    }
}

通过这些更改,您将在每个小时结束时取消订阅WebSocket,并且仅在没有活动订阅时订阅。这将防止多个订阅的累积。请记住,在这些更改后彻底测试您的代码,以确保其按预期工作。

相关问题