我面临的一个挑战是确保在拥有WebSocket的任务完成之前发送WebSocket消息。
我有一个Main方法,核心逻辑驻留在其中,包括启动WebSocket连接和从IP摄像机流式传输。创建任务并将其存储在具有CancellationTokenSource的字典中,以允许任务取消。取消令牌也传递给Main方法。
下面是我的Main方法的简化版本:
public async Task Main(System.Windows.Controls.Image imageControl, int cameraIndex, CancellationToken token, string cameraUrl)
{
// ... Initialize WebSocket and other operations
while (!token.IsCancellationRequested)
{
// ... Fetch and display video frames
}
if (token.IsCancellationRequested)
{
var Stop_Server = new
{
command = "Stop_Stream"
};
await SendSocket(cameraIndex, Stop_Server, token);
}
}
我尝试使用以下方法取消此任务时出现问题:
private async Task IPCameraMethod()
{
// ... some other logic
if (tokenSources.TryGetValue(cameraIndex, out var tuple))
{
tuple.Item2.Cancel();
try
{
await tuple.Item3; // Await the task's completion
}
catch (OperationCanceledException)
{
// Task was cancelled, expected behavior
}
}
// Now that the old task has been cancelled and awaited, start the new task.
var newCts = new CancellationTokenSource();
var newTask = Camera.IPCameraService.Main(SourceImageControl, selectedItemIndex, newCts.Token, url);
Debug.WriteLine("New Task Created!");
tokenSources[cameraIndex] = Tuple.Create(selectedItemIndex, newCts, newTask);
}
只要排队等候tuple.Item3;执行时,Main方法中的WebSocket断开连接,甚至在调用SendSocket函数发送“Stop_Stream”消息之前。
如何确保WebSocket消息在WebSocket因任务完成而关闭之前发送?
本质上,IPCameraMethod
应该在关闭Main
任务并因此关闭WebSocket连接之前等待该任务完成,但它似乎立即关闭。
这是我得到的输出:
Exception thrown: 'System.OperationCanceledException' in mscorlib.dll
ReceiveSocket operation was cancelled.
Exception thrown: 'System.ArgumentNullException' in OpenCvSharp.dll
Exception caught while processing: Value cannot be null.
Parameter name: buf
Details: System.ArgumentNullException: Value cannot be null.
Parameter name: buf
at OpenCvSharp.Cv2.ImDecode(Byte[] buf, ImreadModes flags)
at Iris.Services.IPCameraService.<RunYolo>d__74.MoveNext() in C:\Users\nico_\Desktop\Projects\Iris\Iris WPF\Iris Y\Iris-Latest\Iris\Services\IPCameraService.cs:line 317
这是IPCameraMethod的完整代码:
private async Task IPCameraMethod()
{
if (string.IsNullOrWhiteSpace(textBoxUrl.Text) ||
!int.TryParse(SourceComboBox.Name.Replace("comboBox", ""), out int cameraIndex) ||
comboBoxSavedCameras.SelectedIndex < 0)
{
return;
}
string url = textBoxUrl.Text;
int selectedItemIndex = comboBoxSavedCameras.SelectedIndex;
if (tokenSources.TryGetValue(cameraIndex, out var tuple))
{
if (selectedItemIndex != tuple.Item1)
{
tuple.Item2.Cancel();
try
{
await tuple.Item3; // Await the task's completion
}
catch (OperationCanceledException)
{
// Task was cancelled, expected behavior
}
tuple.Item2.Dispose();
tokenSources.Remove(cameraIndex);
}
else
{
return; // If selected item is the same, we do not need to create a new task
}
}
// Now that the old task has been cancelled and awaited, start the new task.
var newCts = new CancellationTokenSource();
var newTask = Camera.IPCameraService.Main(SourceImageControl, selectedItemIndex, newCts.Token, url);
Debug.WriteLine("New Task Created!");
tokenSources[cameraIndex] = Tuple.Create(selectedItemIndex, newCts, newTask);
}
这是Main及其调用和其他方法的完整代码:
public async Task Main(System.Windows.Controls.Image imageControl, int cameraIndex, CancellationToken token, string cameraUrl)
{
CameraURLs[cameraIndex] = cameraUrl;
await StartSocket(cameraIndex, token);
await StartStream(cameraIndex, cameraUrl, token);
var stopStreamCompletion = new TaskCompletionSource<bool>();
EventHandler(cameraUrl, cameraIndex, token);
while (!token.IsCancellationRequested) // Added a token check to allow stopping the loop externally
{
var frame = await RunYolo(cameraIndex, token);
if (frame != null)
await UpdateDisplay(frame, imageControl, token);
}
// Close WebSocket when CancellationToken is triggered
if (token.IsCancellationRequested)
{
var Stop_Server = new
{
command = "Stop_Stream"
};
await SendSocket(cameraIndex, Stop_Server, token);
Debug.WriteLine("Websocket Cleared!");
}
}
public async Task StartStream(int cameraIndex, string cameraUrl, CancellationToken token)
{
try
{
// Check for cancellation before sending the request
token.ThrowIfCancellationRequested();
var settings = new
{
command = "Start_Stream",
camera_url = cameraUrl
};
await SendSocket(cameraIndex, settings, token); // Pass the cancellation token to SendSocket
}
catch (OperationCanceledException)
{
// Handle cancellation
Debug.WriteLine("StartStream was cancelled.");
// If needed, you can add additional logic here to clean up or log.
return;
}
catch (System.Net.WebSockets.WebSocketException)
{
// Existing logic
Console.WriteLine("WebSocketException occurred while starting Stream. Retrying...");
await Task.Delay(1000); // Wait for a second before retrying
}
}
private async Task<Mat> RunYolo(int cameraIndex, CancellationToken token)
{
try
{
// Check for cancellation before doing any work
token.ThrowIfCancellationRequested();
// Run your operations
byte[] imageBytes = await ReceiveSocket(cameraIndex, token);
// Convert bytes to Mat
Mat frame = Cv2.ImDecode(imageBytes, ImreadModes.Color);
return frame;
}
catch (OperationCanceledException)
{
// Handle cancellation here, logging or other side-effects if needed
Debug.WriteLine("RunYolo operation was cancelled.");
return null;
}
catch (Exception ex)
{
// Log or handle other types of exceptions as needed
Console.WriteLine($"Exception caught while processing: {ex.Message}");
Console.WriteLine($"Details: {ex}");
return null;
}
}
public async Task<byte[]> ReceiveSocket(int cameraIndex, CancellationToken token)
{
try
{
// Check for cancellation before receiving
token.ThrowIfCancellationRequested();
var ws = CameraWebsockets[cameraIndex];
List<byte> fullMessage = new List<byte>();
WebSocketReceiveResult result;
do
{
var messageReceived = new ArraySegment<byte>(new byte[4096]);
result = await ws.ReceiveAsync(messageReceived, token);
fullMessage.AddRange(messageReceived.Skip(messageReceived.Offset).Take(result.Count));
} while (!result.EndOfMessage); // Continue reading until you get the entire message.
return fullMessage.ToArray();
}
catch (OperationCanceledException)
{
// Handle cancellation by logging or other side-effects if needed
Debug.WriteLine("ReceiveSocket operation was cancelled.");
return null;
}
catch (System.Net.WebSockets.WebSocketException)
{
await StartSocket(cameraIndex, token);
await StartStream(cameraIndex, CameraURLs[cameraIndex],token);
return await ReceiveSocket(cameraIndex,token);
}
}
public async Task SendSocket(int cameraIndex, object message, CancellationToken token)
{
try
{
token.ThrowIfCancellationRequested();
var ws = CameraWebsockets[cameraIndex];
var Start_ServerJson = JsonConvert.SerializeObject(message);
var Start_ServerBytes = Encoding.UTF8.GetBytes(Start_ServerJson);
await ws.SendAsync(new ArraySegment<byte>(Start_ServerBytes, 0, Start_ServerBytes.Length), WebSocketMessageType.Text, true, token);
}
catch (OperationCanceledException)
{
// Handle cancellation here, perhaps log it if needed
Debug.WriteLine("SendSocket operation was cancelled.");
}
catch (System.Net.WebSockets.WebSocketException)
{
// When a WebSocketException occurs, you restart the socket and stream.
// Be sure to consider the cancellation token when you're restarting as well.
await StartSocket(cameraIndex, token); // Consider passing token here too
await StartStream(cameraIndex, CameraURLs[cameraIndex], token); // And here
await SendSocket(cameraIndex, message, token); // Recursion with the same token
}
}
public async Task StartSocket(int cameraIndex, CancellationToken token)
{
while (true)
{
try
{
// Check for cancellation before trying to connect
token.ThrowIfCancellationRequested();
var ws = new ClientWebSocket();
await ws.ConnectAsync(new Uri(wsUrl), token); // Pass the cancellation token here
if (CameraWebsockets.ContainsKey(cameraIndex))
{
CameraWebsockets[cameraIndex] = ws;
}
else
{
CameraWebsockets.TryAdd(cameraIndex, ws);
}
return; // Break the loop if connected successfully
}
catch (OperationCanceledException)
{
// Handle cancellation by logging or other side-effects if needed
Debug.WriteLine("StartSocket operation was cancelled.");
return; // Exit the loop if cancelled
}
catch (System.Net.WebSockets.WebSocketException)
{
Console.WriteLine("WebSocketException occurred while starting socket. Retrying...");
await Task.Delay(1000, token); // Pass the cancellation token here too
}
}
}
private async Task UpdateDisplay(Mat frame, System.Windows.Controls.Image imageControl, CancellationToken token)
{
// Check for cancellation before updating the UI
if (token.IsCancellationRequested)
{
Debug.WriteLine("UpdateDisplay operation was cancelled.");
return;
}
await System.Windows.Application.Current.Dispatcher.InvokeAsync(() =>
{
try
{
if (frame.Empty() || frame.Width == 0 || frame.Height == 0)
{
Console.WriteLine("Received an invalid frame.");
return; // Exit and don't process further
}
var bitmap = OpenCvSharp.Extensions.BitmapConverter.ToBitmap(frame);
var bitmapSource = ConvertToBitmap(bitmap);
imageControl.Source = bitmapSource;
}
catch (Exception ex)
{
Console.WriteLine($"Failed to convert frame to bitmap. Frame Size: {frame.Size()}. Channels: {frame.Channels()}. Error: {ex.Message}");
}
});
}
1条答案
按热度按时间5tmbdcev1#
您面临的问题是由于当您在IPCameraMethod中调用await tuple.Item3时,实际上等待Main任务的完成。当Main任务完成时(正常或由于取消),它也可能导致立即关闭WebSocket
public void Task Main(System.Windows.Controls.Image imageControl,int cameraIndex,CancellationToken token,string cameraUrl){ //.初始化WebSocket等操作
}