如何访问Azure订阅上的死信子队列?

33qvvth1  于 2023-03-09  发布在  其他
关注(0)|答案(6)|浏览(168)

当我使用以下内容时:

var deadLetterPath = SubscriptionClient.FormatDeadLetterPath(topicPath,subName);
var client = SubscriptionClient.CreateFromConnectionString(connectionString, deadLetterPath, subName);

我收到无效操作异常
无法直接在子队列上创建客户端。请在主队列上创建客户端,然后使用该客户端在相应的子队列上创建接收器
azure文档的某些部分要求使用SubscriptionClient.CreateReceiver访问子队列,但该方法并不存在。

11dmarpk

11dmarpk1#

这种方法对你有用吗?

MessagingFactory factory = MessagingFactory.CreateFromConnectionString(cnxString);
var deadLetterPath = SubscriptionClient.FormatDeadLetterPath(topicPath,subName);
var dlqReceiver = factory.CreateMessageReceiver(deadLetterPath, ReceiveMode.ReceiveAndDelete);

我还没有在这里(会议上)测试过,但是给予看
干杯

u5i3ibmn

u5i3ibmn2#

使用Azure服务总线的死信队列有一个约定命名:

  • 对于服务总线队列:队列路径/$死信队列
  • 对于服务总线订阅:主题路径/订阅名称/$死信队列

因此,您可以像访问队列一样访问死信队列。
从@SamVanhoutte回答中,您可以看到ServiceBus框架提供了格式化死信队列名称的方法:

  • 对于服务总线队列:QueueClient.FormatDeadLetterPath(queuePath)
  • 对于服务总线订阅:SubscriptionClient.FormatDeadLetterPath(topicPath, subscriptionName)

我编写了两个小方法来创建队列和订阅的消息接收器,您可以在其中设置是否需要交易函队列:

/// <summary>
/// Create a new <see cref="MessageReceiver"/> object using the specified Service Bus Queue path.
/// </summary>
/// <param name="connectionString">The connection string to access the desired service namespace.</param>
/// <param name="queuePath">The Service Bus Queue path.</param>
/// <param name="isDeadLetter">True if the desired path is the deadletter queue.</param>
public static MessageReceiver CreateMessageReceiver(string connectionString, string queuePath,
    bool isDeadLetter = false)
{
    return MessagingFactory.CreateFromConnectionString(connectionString)
        .CreateMessageReceiver(isDeadLetter
            ? QueueClient.FormatDeadLetterPath(queuePath)
            : queuePath);
}

/// <summary>
/// Create a new <see cref="MessageReceiver"/> object using the specified Service Bus Topic Subscription path.
/// </summary>
/// <param name="connectionString">The connection string to access the desired service namespace.</param>
/// <param name="topicPath">The Service Bus Topic path.</param>
/// <param name="subscriptionName">The Service Bus Topic Subscription name.</param>
/// <param name="isDeadLetter">True if the desired path is the deadletter subqueue.</param>
public static MessageReceiver CreateMessageReceiver(string connectionString, string topicPath,
    string subscriptionName, bool isDeadLetter = false)
{
    return MessagingFactory.CreateFromConnectionString(connectionString)
        .CreateMessageReceiver(isDeadLetter
            ? SubscriptionClient.FormatDeadLetterPath(topicPath, subscriptionName)
            : SubscriptionClient.FormatSubscriptionPath(topicPath, subscriptionName));
}
332nm8kg

332nm8kg3#

如果您使用的是Microsoft.Azure.ServiceBus而不是Microsoft.ServiceBus,则会略有不同。

var deadQueuePath = EntityNameHelper.FormatDeadLetterPath(your_queue_name);
var deadQueueReceiver = new MessageReceiver(connectionString, deadQueuePath);

根据Microsoft.Azure.ServiceBus命名空间中的EntityNameHelper类,对于主题,使用订阅路径而不是your_queue_name。
队列的名称或订阅的路径。

/// <summary>
/// Formats the dead letter path for either a queue, or a subscription.
/// </summary>
/// <param name="entityPath">The name of the queue, or path of the subscription.</param>
/// <returns>The path as a string of the dead letter entity.</returns>
public static string FormatDeadLetterPath(string entityPath)
{
    return EntityNameHelper.FormatSubQueuePath(entityPath, EntityNameHelper.DeadLetterQueueName);
}
2hh7jdfx

2hh7jdfx4#

使用最新的Service Bus客户端库,您可以像这样访问死信队列。请注意,在创建接收方时指定死信队列时,不同语言之间的一致性:

  • JS系统
const { ServiceBusClient } = require("@azure/service-bus");

const client = new ServiceBusClient("<connectionstring>");
const deadletterReceiver = client.createReceiver("queueName", { subQueueType: "deadLetter" });
const messages = await deadletterReceiver.receiveMessages(1);

if (messages.length > 0) {
    console.log("Received the message from DLQ - ", messages[0].body);

    // Mark message as complete, i.e. remove from DLQ
    await deadletterReceiver.completeMessage(messages[0]);
} else {
    console.log("Error: No messages were received from the DLQ.");
}

await deadletterReceiver.close();
  • java
import com.azure.messaging.servicebus.*;
import com.azure.messaging.servicebus.models.SubQueue;

ServiceBusReceiverClient deadletterReceiver = new ServiceBusClientBuilder()
    .connectionString("<connectionstring>")
    .receiver() // Use this for session or non-session enabled queue or topic/subscriptions
    .queueName("queuename")
    .subQueue(SubQueue.DEAD_LETTER_QUEUE)
    .buildClient();

IterableStream<ServiceBusReceivedMessage> messages = receiver.receiveMessages(10);
messages.forEach(message -> {
    System.out.printf("Id: %s. Contents: %s%n", message.getMessageId(),
        message.getBody().toString());

    //Remove message from DLQ
    deadletterReceiver.complete(message);
});

deadletterReceiver.close();
  • 巨蟒
import asyncio
from azure.servicebus import ServiceBusMessage, ServiceBusSubQueue
from azure.servicebus.aio import ServiceBusClient

servicebus_client = ServiceBusClient.from_connection_string(conn_str="<connectionstring>")
dlq_receiver = servicebus_client.get_queue_receiver(queue_name="queuename", 
               sub_queue=ServiceBusSubQueue.DEAD_LETTER)

async with dlq_receiver:
    received_msgs = await dlq_receiver.receive_messages(max_message_count=10, max_wait_time=5)
    for msg in received_msgs:
        print(msg)
        
        # remove the message from the DLQ
        await dlq_receiver.complete_message(msg)
  • .NET语言
using Azure.Messaging.ServiceBus;

await using var client = new ServiceBusClient("<connectionstring>");
ServiceBusReceiver deadletterReceiver = client.CreateReceiver(
    "queuename",
     new ServiceBusReceiverOptions { SubQueue = SubQueue.DeadLetter }
);

ServiceBusReceivedMessage message = await deadletterReceiver.ReceiveMessageAsync();
            
if (message != null)
{
    Console.WriteLine($"DeadLetter message = {message.Body}");
    
    // remove the message from the DLQ
    await deadletterReceiver.CompleteMessageAsync(message);
}
else
{
    // DLQ was empty on last receive attempt
    Console.WriteLine("Error: No messages were received from the DLQ."); 
}

await deadletterReceiver.CloseAsync();
5us2dqdw

5us2dqdw5#

那些想用python做的人。

从死信队列接收消息:

from azure.servicebus import ServiceBusClient
import json
connectionString = "Your Connection String to Service Bus"
serviceBusClient = ServiceBusClient.from_connection_string(connectionString)
queueName = "Your Queue Name created in the Service Bus"
queueClient = serviceBusClient.get_queue(queueName)
with queueClient.get_deadletter_receiver(prefetch=5) as queueReceiver:
    messages = queueReceiver.fetch_next(timeout=100)
    for message in messages:
        # message.body is a generator object. Use next() to get the body.
        body = next(message.body)
        message.complete()

希望对谁有帮助。

4ioopgfo

4ioopgfo6#

使用新的Azure.Messaging.ServiceBus SDK为处理器模式编写代码

private void  RegisterDeadLetteredMessageHandler(string servicebusNamespace, string topicName, string subscriptionName)
    {
      var sbClient =  new ServiceBusClient($"sb://{servicebusNamespace}.servicebus.windows.net", TokenCredential());
      var dlqProcessor = sbClient.CreateProcessor(topicName, 
        subscriptionName, new ServiceBusProcessorOptions { SubQueue = SubQueue.DeadLetter });

        dlqProcessor.ProcessMessageAsync += HandleMessageAsync;   
    
    }
    
    protected async Task HandleMessageAsync(ProcessMessageEventArgs arg)
            {
                   // Process the Message arg.Message
            }

相关问题