无法在C#异步方法中完成任务之前发送WebSocket消息

1cosmwyk  于 2023-10-20  发布在  C#
关注(0)|答案(1)|浏览(144)

我面临的一个挑战是确保在拥有WebSocket的任务完成之前发送WebSocket消息。
我有一个Main方法,核心逻辑驻留在其中,包括启动WebSocket连接和从IP摄像机流式传输。创建任务并将其存储在具有CancellationTokenSource的字典中,以允许任务取消。取消令牌也传递给Main方法。

下面是我的Main方法的简化版本:

public async Task Main(System.Windows.Controls.Image imageControl, int cameraIndex, CancellationToken token, string cameraUrl)
{
    // ... Initialize WebSocket and other operations
    while (!token.IsCancellationRequested)
    {
        // ... Fetch and display video frames
    }
    if (token.IsCancellationRequested)
    {
        var Stop_Server = new
        {
            command = "Stop_Stream"
        };
        await SendSocket(cameraIndex, Stop_Server, token);
    }
}

我尝试使用以下方法取消此任务时出现问题:

private async Task IPCameraMethod()
    {
        // ... some other logic
        if (tokenSources.TryGetValue(cameraIndex, out var tuple))
        {
            tuple.Item2.Cancel();
            try
            {
                await tuple.Item3;  // Await the task's completion
            }
            catch (OperationCanceledException)
            {
                // Task was cancelled, expected behavior
            }
        }
        // Now that the old task has been cancelled and awaited, start the new task.
        var newCts = new CancellationTokenSource();
        var newTask = Camera.IPCameraService.Main(SourceImageControl, selectedItemIndex, newCts.Token, url);

        Debug.WriteLine("New Task Created!");
        tokenSources[cameraIndex] = Tuple.Create(selectedItemIndex, newCts, newTask);
    }

只要排队等候tuple.Item3;执行时,Main方法中的WebSocket断开连接,甚至在调用SendSocket函数发送“Stop_Stream”消息之前。
如何确保WebSocket消息在WebSocket因任务完成而关闭之前发送?
本质上,IPCameraMethod应该在关闭Main任务并因此关闭WebSocket连接之前等待该任务完成,但它似乎立即关闭。
这是我得到的输出:

Exception thrown: 'System.OperationCanceledException' in mscorlib.dll
ReceiveSocket operation was cancelled.
Exception thrown: 'System.ArgumentNullException' in OpenCvSharp.dll
Exception caught while processing: Value cannot be null.
Parameter name: buf
Details: System.ArgumentNullException: Value cannot be null.
Parameter name: buf
   at OpenCvSharp.Cv2.ImDecode(Byte[] buf, ImreadModes flags)
   at Iris.Services.IPCameraService.<RunYolo>d__74.MoveNext() in C:\Users\nico_\Desktop\Projects\Iris\Iris WPF\Iris Y\Iris-Latest\Iris\Services\IPCameraService.cs:line 317

这是IPCameraMethod的完整代码:

private async Task IPCameraMethod()
        {
            if (string.IsNullOrWhiteSpace(textBoxUrl.Text) ||
                !int.TryParse(SourceComboBox.Name.Replace("comboBox", ""), out int cameraIndex) ||
                comboBoxSavedCameras.SelectedIndex < 0)
            {
                return;
            }

            string url = textBoxUrl.Text;
            int selectedItemIndex = comboBoxSavedCameras.SelectedIndex;

            if (tokenSources.TryGetValue(cameraIndex, out var tuple))
            {
                if (selectedItemIndex != tuple.Item1)
                {
                    tuple.Item2.Cancel();
                    
                    try
                    {
                        await tuple.Item3; // Await the task's completion

                    }

                    catch (OperationCanceledException)
                    {
                        // Task was cancelled, expected behavior
                    }
                    tuple.Item2.Dispose();
                    tokenSources.Remove(cameraIndex);
                    
                }
                else
                {
                    return; // If selected item is the same, we do not need to create a new task
                }
            }

            // Now that the old task has been cancelled and awaited, start the new task.
            var newCts = new CancellationTokenSource();
            var newTask = Camera.IPCameraService.Main(SourceImageControl, selectedItemIndex, newCts.Token, url);

            Debug.WriteLine("New Task Created!");
            tokenSources[cameraIndex] = Tuple.Create(selectedItemIndex, newCts, newTask);
        }

这是Main及其调用和其他方法的完整代码:

public async Task Main(System.Windows.Controls.Image imageControl, int cameraIndex, CancellationToken token, string cameraUrl)
        {

            CameraURLs[cameraIndex] = cameraUrl;

            await StartSocket(cameraIndex, token);
            await StartStream(cameraIndex, cameraUrl, token);
            var stopStreamCompletion = new TaskCompletionSource<bool>();
            EventHandler(cameraUrl, cameraIndex, token);
            while (!token.IsCancellationRequested) // Added a token check to allow stopping the loop externally
            {
                var frame = await RunYolo(cameraIndex, token);
                if (frame != null)
                    await UpdateDisplay(frame, imageControl, token);
            }
            // Close WebSocket when CancellationToken is triggered
            if (token.IsCancellationRequested)
            {
                var Stop_Server = new
                {
                    command = "Stop_Stream"
                };
                await SendSocket(cameraIndex, Stop_Server, token);
                Debug.WriteLine("Websocket Cleared!");
            }
        }



        public async Task StartStream(int cameraIndex, string cameraUrl, CancellationToken token)
        {
            try
            {
                // Check for cancellation before sending the request
                token.ThrowIfCancellationRequested();

                var settings = new
                {
                    command = "Start_Stream",
                    camera_url = cameraUrl
                };

                await SendSocket(cameraIndex, settings, token); // Pass the cancellation token to SendSocket
            }
            catch (OperationCanceledException)
            {
                // Handle cancellation
                Debug.WriteLine("StartStream was cancelled.");
                // If needed, you can add additional logic here to clean up or log.
                return;
            }
            catch (System.Net.WebSockets.WebSocketException)
            {
                // Existing logic
                Console.WriteLine("WebSocketException occurred while starting Stream. Retrying...");
                await Task.Delay(1000); // Wait for a second before retrying
            }
        }


        private async Task<Mat> RunYolo(int cameraIndex, CancellationToken token)
        {
            try
            {
                // Check for cancellation before doing any work
                token.ThrowIfCancellationRequested();

                // Run your operations
                byte[] imageBytes = await ReceiveSocket(cameraIndex, token);

                // Convert bytes to Mat
                Mat frame = Cv2.ImDecode(imageBytes, ImreadModes.Color);

                return frame;
            }
            catch (OperationCanceledException)
            {
                // Handle cancellation here, logging or other side-effects if needed
                Debug.WriteLine("RunYolo operation was cancelled.");
                return null;
            }
            catch (Exception ex)
            {
                // Log or handle other types of exceptions as needed
                Console.WriteLine($"Exception caught while processing: {ex.Message}");
                Console.WriteLine($"Details: {ex}");
                return null;
            }
        }

        public async Task<byte[]> ReceiveSocket(int cameraIndex, CancellationToken token)
        {
            try
            {
                // Check for cancellation before receiving
                token.ThrowIfCancellationRequested();
                var ws = CameraWebsockets[cameraIndex];
                List<byte> fullMessage = new List<byte>();
                WebSocketReceiveResult result;

                do
                {
                    var messageReceived = new ArraySegment<byte>(new byte[4096]);
                    result = await ws.ReceiveAsync(messageReceived, token);
                    fullMessage.AddRange(messageReceived.Skip(messageReceived.Offset).Take(result.Count));
                } while (!result.EndOfMessage); // Continue reading until you get the entire message.

                return fullMessage.ToArray();
            }
            catch (OperationCanceledException)
            {
                // Handle cancellation by logging or other side-effects if needed
                Debug.WriteLine("ReceiveSocket operation was cancelled.");
                return null;
            }
            catch (System.Net.WebSockets.WebSocketException)
            {
                await StartSocket(cameraIndex, token);
                await StartStream(cameraIndex, CameraURLs[cameraIndex],token);

                return await ReceiveSocket(cameraIndex,token);
            }
        }

        public async Task SendSocket(int cameraIndex, object message, CancellationToken token)
        {
            try
            {
                token.ThrowIfCancellationRequested();

                var ws = CameraWebsockets[cameraIndex];
                var Start_ServerJson = JsonConvert.SerializeObject(message);
                var Start_ServerBytes = Encoding.UTF8.GetBytes(Start_ServerJson);

                await ws.SendAsync(new ArraySegment<byte>(Start_ServerBytes, 0, Start_ServerBytes.Length), WebSocketMessageType.Text, true, token);
            }
            catch (OperationCanceledException)
            {
                // Handle cancellation here, perhaps log it if needed
                Debug.WriteLine("SendSocket operation was cancelled.");
            }
            catch (System.Net.WebSockets.WebSocketException)
            {
                // When a WebSocketException occurs, you restart the socket and stream.
                // Be sure to consider the cancellation token when you're restarting as well.
                await StartSocket(cameraIndex, token);  // Consider passing token here too
                await StartStream(cameraIndex, CameraURLs[cameraIndex], token);  // And here
                await SendSocket(cameraIndex, message, token);  // Recursion with the same token
            }
        }

        public async Task StartSocket(int cameraIndex, CancellationToken token)
        {
            while (true)
            {
                try
                {
                    // Check for cancellation before trying to connect
                    token.ThrowIfCancellationRequested();

                    var ws = new ClientWebSocket();
                    await ws.ConnectAsync(new Uri(wsUrl), token); // Pass the cancellation token here

                    if (CameraWebsockets.ContainsKey(cameraIndex))
                    {
                        CameraWebsockets[cameraIndex] = ws;
                    }
                    else
                    {
                        CameraWebsockets.TryAdd(cameraIndex, ws);
                    }
                    return; // Break the loop if connected successfully
                }
                catch (OperationCanceledException)
                {
                    // Handle cancellation by logging or other side-effects if needed
                    Debug.WriteLine("StartSocket operation was cancelled.");
                    return;  // Exit the loop if cancelled
                }
                catch (System.Net.WebSockets.WebSocketException)
                {
                    Console.WriteLine("WebSocketException occurred while starting socket. Retrying...");
                    await Task.Delay(1000, token); // Pass the cancellation token here too
                }
            }
        }
        private async Task UpdateDisplay(Mat frame, System.Windows.Controls.Image imageControl, CancellationToken token)
        {
            // Check for cancellation before updating the UI
            if (token.IsCancellationRequested)
            {
                Debug.WriteLine("UpdateDisplay operation was cancelled.");
                return;
            }
            await System.Windows.Application.Current.Dispatcher.InvokeAsync(() =>
            {
                try
                {
                    if (frame.Empty() || frame.Width == 0 || frame.Height == 0)
                    {
                        Console.WriteLine("Received an invalid frame.");
                        return; // Exit and don't process further
                    }
                    var bitmap = OpenCvSharp.Extensions.BitmapConverter.ToBitmap(frame);
                    var bitmapSource = ConvertToBitmap(bitmap);
                    imageControl.Source = bitmapSource;
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"Failed to convert frame to bitmap. Frame Size: {frame.Size()}. Channels: {frame.Channels()}. Error: {ex.Message}");
                }
                
            });
        }
5tmbdcev

5tmbdcev1#

您面临的问题是由于当您在IPCameraMethod中调用await tuple.Item3时,实际上等待Main任务的完成。当Main任务完成时(正常或由于取消),它也可能导致立即关闭WebSocket
public void Task Main(System.Windows.Controls.Image imageControl,int cameraIndex,CancellationToken token,string cameraUrl){ //.初始化WebSocket等操作

// Create a TaskCompletionSource to signal when the "Stop_Stream" message is sent
var stopStreamCompletion = new TaskCompletionSource<bool>();

while (!token.IsCancellationRequested)
{
    // ... Fetch and display video frames
}

if (token.IsCancellationRequested)
{
    var Stop_Server = new
    {
        command = "Stop_Stream"
    };

    // Send the "Stop_Stream" message and await its completion
    await SendSocket(cameraIndex, Stop_Server, token);

    // Signal that the message has been sent
    stopStreamCompletion.SetResult(true);
}

// Now, await the completion of the Main task
await stopStreamCompletion.Task;

}

相关问题