我尝试从http触发器接收有效载荷,并将有效载荷与信封一起发送到服务总线,最终发送到Cosmos DB,以便执行CRUD操作。
有效负载被正确地接收并被给予一个信封,信封被正确地发送到服务总线并从服务总线接收。不幸的是,我一直无法弄清楚如何在没有全新的http触发器的情况下将消息从服务总线发送到Cosmos DB,所以我不得不尝试设置存储队列来发送消息。如果可以使用服务总线来完成此操作,请让我知道如何.否则,我不知道为什么我得到这个错误时运行我的程序.下面是所有的代码段和错误消息:
Handler.java:
import java.util.*;
import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.*;
public class Handler {
@FunctionName("HttpHandler")
public HttpResponseMessage runHttp(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS)
HttpRequestMessage<Optional<String>> req, ExecutionContext context) {
try {
// request to String (payload)
String request = req.getBody().get();
// build Envelope
MessageEnvelope env = cf.buildEnvelope(request, context);
// log
CfLog envelope = cf.log(env, context, request);
// send envelope to Service Bus Queue
ServiceBusSend.send(envelope);
// send envelope to Storage Queue
context.getLogger().info(StorageQueueSend.send(envelope));
// Create in Cosmos
//context.getLogger().info(CosmosOperation.create(envelope, context));
context.getLogger().info("Made it 1");
return req.createResponseBuilder(HttpStatus.OK).body("Data logged").build();
} catch (Exception e) {
// log the exception
context.getLogger().severe(e.toString());
return req.createResponseBuilder(HttpStatus.INTERNAL_SERVER_ERROR).body("An error occurred").build();
}
}
@FunctionName("ReceiveFromQueue")
public void runReceiveFromQueue(
@ServiceBusQueueTrigger(name= "envelope", queueName = "hyperscalerapi-queue", connection = "hyperScalerAPIServiceBus")
CfLog envelope, final ExecutionContext context) {
try {
context.getLogger().info(envelope.toString());
context.getLogger().info("Made it 2");
// Cosmos DB upload
} catch (Exception e) {
context.getLogger().severe("An error occurred while processing the message: " + e.getMessage());
}
}
}`
StorageQueueSend.java:
import com.azure.storage.queue.QueueClientBuilder;
import com.azure.storage.queue.models.QueueStorageException;
import org.mozilla.javascript.tools.shell.Environment;
import com.azure.storage.queue.QueueClient;
public class StorageQueueSend {
public static String send(CfLog envelope) {
try{
System.out.println(System.getenv("hyperScalerAPIStorageQueue"));
QueueClient queueClient = new QueueClientBuilder()
.connectionString(System.getenv("hyperScalerAPIStorageQueue"))
.queueName("hyperscalerapistorage-queue")
.buildClient();
queueClient.sendMessage(envelope.toString());
return "Envelope added to storage queue\n";
} catch(QueueStorageException e) {
return e.getMessage();
}
}
}`
CosmosOperation.java (由于上述函数不工作,因此当前已注解掉该函数):
import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.*;
import java.util.*;
import java.util.logging.*;
import com.microsoft.azure.documentdb.Document;
/**
* Azure Functions in Java with Cosmos DB Trigger.
*/
public class CosmosOperation {
@FunctionName("create")
@CosmosDBOutput(name = "database", databaseName = "hyperscalerapi-cosmos-dev", collectionName = "Items",
connectionStringSetting = "cosmosDBConnectionString")
public static String create (
@QueueTrigger(name = "envelope", queueName = "hyperscalerapi-queue",
connection = "hyperScalerAPIStorageQueue") CfLog envelope,
final ExecutionContext context) {
return "Payload: " + envelope.getPayload() + "\nWith ID: " +
envelope.getExecution().getCorrelationId() + "added to Cosmos DB";
}
}`
运行时出现错误消息(实际上从未完成运行,我必须终止):
[2023-02-27T17:12:43.295Z] Function "ReceiveFromQueue" (Id: ece2405b-1ddf-4f48-a1d9-9d35f4fe4d83) invoked by Java Worker
[2023-02-27T17:12:43.386Z] Executed 'Functions.ReceiveFromQueue' (Succeeded, Id=ece2405b-1ddf-4f48-a1d9-9d35f4fe4d83, Duration=173ms)
[2023-02-27T17:12:44.571Z] Feb 27, 2023 12:12:44 PM io.netty.channel.ChannelInitializer exceptionCaught
[2023-02-27T17:12:44.576Z] WARNING: Failed to initialize a channel. Closing: [id: 0x62de8a42]
[2023-02-27T17:12:44.578Z] java.lang.NoSuchMethodError: 'void io.netty.handler.codec.http.HttpClientCodec.<init>(int, int, int, boolean, boolean, int, boolean, boolean)'
...
[2023-02-27T17:12:44.751Z] at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:192)
[2023-02-27T17:12:44.752Z] at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:201)
[2023-02-27T17:12:44.753Z] at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect$ClientTransportSubscriber.onError(HttpClientConnect.java:304)
[2023-02-27T17:12:44.754Z] at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:201)
[2023-02-27T17:12:44.757Z] at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onError(DefaultPooledConnectionProvider.java:172)
[2023-02-27T17:12:44.760Z] at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.fail(AbstractPool.java:445)
[2023-02-27T17:12:44.762Z] at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.lambda$drainLoop$9(SimpleDequePool.java:400)
[2023-02-27T17:12:44.765Z] at reactor.core.publisher.FluxDoOnEach$DoOnEachSubscriber.onError(FluxDoOnEach.java:186)
[2023-02-27T17:12:44.766Z] at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:201)
[2023-02-27T17:12:44.768Z] at reactor.netty.resources.DefaultPooledConnectionProvider$PooledConnectionAllocator$PooledConnectionInitializer.onError(DefaultPooledConnectionProvider.java:554)
[2023-02-27T17:12:44.770Z] at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192)
[2023-02-27T17:12:44.771Z] at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259)
[2023-02-27T17:12:44.776Z] at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106)
[2023-02-27T17:12:44.778Z] at reactor.core.publisher.Operators.error(Operators.java:198)
[2023-02-27T17:12:44.780Z] at reactor.core.publisher.MonoError.subscribe(MonoError.java:53)
[2023-02-27T17:12:44.781Z] at reactor.core.publisher.Mono.subscribe(Mono.java:4490)
[2023-02-27T17:12:44.782Z] at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
[2023-02-27T17:12:44.782Z] at reactor.netty.transport.TransportConnector$MonoChannelPromise.tryFailure(TransportConnector.java:517)
[2023-02-27T17:12:44.783Z] at reactor.netty.transport.TransportConnector$MonoChannelPromise.setFailure(TransportConnector.java:471)
这只是错误消息的一个片段。它一直在继续。
1条答案
按热度按时间svgewumm1#
我不是100%确定错误消息本身,但看起来你返回的是日志消息,而不是文档的JSON字符串。参考文档中的这个示例,同样可以使用服务总线队列触发器来完成。
下面是文档中的代码示例以供参考
如果需要按原样存储envelope对象,则应直接将其作为JSON字符串返回。