我有一个从azure服务总线主题发送和接收消息的方法,我需要使用junit5为该方法模拟和编写测试用例。
基于microsoft文档的示例代码
public class MyServiceBusTopicClient {
static final Gson GSON = new Gson();
public static void main(String[] args) throws Exception, ServiceBusException {
// TODO Auto-generated method stub
TopicClient sendClient;
String connectionString = "Endpoint=sb://<NameOfServiceBusNamespace>.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=<AccessKey>";
sendClient = new TopicClient(new ConnectionStringBuilder(connectionString, "BasicTopic"));
sendMessagesAsync(sendClient).thenRunAsync(() -> sendClient.closeAsync());
}
static CompletableFuture<Void> sendMessagesAsync(TopicClient sendClient) {
List<HashMap<String, String>> data =
GSON.fromJson(
"[" +
"{'name' = 'Einstein', 'firstName' = 'Albert'}," +
"{'name' = 'Heisenberg', 'firstName' = 'Werner'}," +
"{'name' = 'Curie', 'firstName' = 'Marie'}," +
"{'name' = 'Hawking', 'firstName' = 'Steven'}," +
"{'name' = 'Newton', 'firstName' = 'Isaac'}," +
"{'name' = 'Bohr', 'firstName' = 'Niels'}," +
"{'name' = 'Faraday', 'firstName' = 'Michael'}," +
"{'name' = 'Galilei', 'firstName' = 'Galileo'}," +
"{'name' = 'Kepler', 'firstName' = 'Johannes'}," +
"{'name' = 'Kopernikus', 'firstName' = 'Nikolaus'}" +
"]",
new TypeToken<List<HashMap<String, String>>>() {
}.getType());
List<CompletableFuture> tasks = new ArrayList<>();
for (int i = 0; i < data.size(); i++) {
final String messageId = Integer.toString(i);
Message message = new Message(GSON.toJson(data.get(i), Map.class).getBytes(UTF_8));
message.setContentType("application/json");
message.setLabel("Scientist");
message.setMessageId(messageId);
message.setTimeToLive(Duration.ofMinutes(2));
System.out.printf("Message sending: Id = %s\n", message.getMessageId());
tasks.add(
sendClient.sendAsync(message).thenRunAsync(() -> {
System.out.printf("\tMessage acknowledged: Id = %s\n", message.getMessageId());
}));
}
return CompletableFuture.allOf(tasks.toArray(new CompletableFuture<?>[tasks.size()]));
}
}
用于从azure服务总线订阅接收消息的方法。
import com.microsoft.azure.servicebus.*;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import com.google.gson.Gson;
import static java.nio.charset.StandardCharsets.*;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
public class MyServiceBusSubscriptionClient {
static final Gson GSON = new Gson();
public static void main(String[] args) throws Exception, ServiceBusException {
String connectionString = "Endpoint=sb://<NameOfServiceBusNamespace>.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=<AccessKey>";
SubscriptionClient subscription1Client = new SubscriptionClient(new ConnectionStringBuilder(connectionString, "BasicTopic/subscriptions/Subscription1"), ReceiveMode.PEEKLOCK);
SubscriptionClient subscription2Client = new SubscriptionClient(new ConnectionStringBuilder(connectionString, "BasicTopic/subscriptions/Subscription2"), ReceiveMode.PEEKLOCK);
SubscriptionClient subscription3Client = new SubscriptionClient(new ConnectionStringBuilder(connectionString, "BasicTopic/subscriptions/Subscription3"), ReceiveMode.PEEKLOCK);
ExecutorService executorService = Executors.newCachedThreadPool();
registerMessageHandlerOnClient(subscription1Client, executorService);
registerMessageHandlerOnClient(subscription2Client, executorService);
registerMessageHandlerOnClient(subscription3Client, executorService);
}
static void registerMessageHandlerOnClient(SubscriptionClient receiveClient, ExecutorService executorService) throws Exception {
// register the RegisterMessageHandler callback
receiveClient.registerMessageHandler(
new IMessageHandler() {
// callback invoked when the message handler loop has obtained a message
public CompletableFuture<Void> onMessageAsync(IMessage message) {
// receives message is passed to callback
if (message.getLabel() != null &&
message.getContentType() != null &&
message.getLabel().contentEquals("Scientist") &&
message.getContentType().contentEquals("application/json")) {
byte[] body = message.getBody();
Map scientist = GSON.fromJson(new String(body, UTF_8), Map.class);
System.out.printf(
"\n\t\t\t\t%s Message received: \n\t\t\t\t\t\tMessageId = %s, \n\t\t\t\t\t\tSequenceNumber = %s, \n\t\t\t\t\t\tEnqueuedTimeUtc = %s," +
"\n\t\t\t\t\t\tExpiresAtUtc = %s, \n\t\t\t\t\t\tContentType = \"%s\", \n\t\t\t\t\t\tContent: [ firstName = %s, name = %s ]\n",
receiveClient.getEntityPath(),
message.getMessageId(),
message.getSequenceNumber(),
message.getEnqueuedTimeUtc(),
message.getExpiresAtUtc(),
message.getContentType(),
scientist != null ? scientist.get("firstName") : "",
scientist != null ? scientist.get("name") : "");
}
return receiveClient.completeAsync(message.getLockToken());
}
// callback invoked when the message handler has an exception to report
public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
System.out.printf(exceptionPhase + "-" + throwable.getMessage());
}
},
// 1 concurrent call, messages are auto-completed, auto-renew duration
new MessageHandlerOptions(1, false, Duration.ofMinutes(1)),
executorService);
}
}
我不确定模仿上述方法的具体实现。如果我能得到任何实现的文档,那会很有帮助的。
有人能给我建议吗?
暂无答案!
目前还没有任何答案,快来回答吧!