Azure存储队列到Cosmos DB

b5lpy0ml  于 2023-03-31  发布在  其他
关注(0)|答案(1)|浏览(109)

我尝试从http触发器接收有效载荷,并将有效载荷与信封一起发送到服务总线,最终发送到Cosmos DB,以便执行CRUD操作。
有效负载被正确地接收并被给予一个信封,信封被正确地发送到服务总线并从服务总线接收。不幸的是,我一直无法弄清楚如何在没有全新的http触发器的情况下将消息从服务总线发送到Cosmos DB,所以我不得不尝试设置存储队列来发送消息。如果可以使用服务总线来完成此操作,请让我知道如何.否则,我不知道为什么我得到这个错误时运行我的程序.下面是所有的代码段和错误消息:
Handler.java:

import java.util.*;
import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.*;

public class Handler {

    @FunctionName("HttpHandler")
    public HttpResponseMessage runHttp(
            @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS)
            HttpRequestMessage<Optional<String>> req, ExecutionContext context) {
        try {
        
            // request to String (payload)
            String request = req.getBody().get();
            // build Envelope
            MessageEnvelope env = cf.buildEnvelope(request, context);
            // log 
            CfLog envelope = cf.log(env, context, request);
            // send envelope to Service Bus Queue
            ServiceBusSend.send(envelope);
            // send envelope to Storage Queue
            context.getLogger().info(StorageQueueSend.send(envelope));
            // Create in Cosmos
            //context.getLogger().info(CosmosOperation.create(envelope, context));
            
            context.getLogger().info("Made it 1");
            return req.createResponseBuilder(HttpStatus.OK).body("Data logged").build();
        } catch (Exception e) {
            // log the exception
            context.getLogger().severe(e.toString());
            return req.createResponseBuilder(HttpStatus.INTERNAL_SERVER_ERROR).body("An error occurred").build();
        }
    }

    @FunctionName("ReceiveFromQueue")
    public void runReceiveFromQueue(
            @ServiceBusQueueTrigger(name= "envelope", queueName = "hyperscalerapi-queue", connection = "hyperScalerAPIServiceBus")
            CfLog envelope, final ExecutionContext context) {
        try {       
            context.getLogger().info(envelope.toString());
            context.getLogger().info("Made it 2");
            // Cosmos DB upload

        } catch (Exception e) {
            context.getLogger().severe("An error occurred while processing the message: " + e.getMessage());
        }
    }
}`

StorageQueueSend.java:

import com.azure.storage.queue.QueueClientBuilder;
import com.azure.storage.queue.models.QueueStorageException;

import org.mozilla.javascript.tools.shell.Environment;

import com.azure.storage.queue.QueueClient;

public class StorageQueueSend {
    public static String send(CfLog envelope) {
        try{
            System.out.println(System.getenv("hyperScalerAPIStorageQueue"));
            QueueClient queueClient = new QueueClientBuilder()
                                            .connectionString(System.getenv("hyperScalerAPIStorageQueue"))
                                            .queueName("hyperscalerapistorage-queue")
                                            .buildClient();
            queueClient.sendMessage(envelope.toString());
            return "Envelope added to storage queue\n";
        } catch(QueueStorageException e) {
            return e.getMessage();
        }
    }
}`

CosmosOperation.java (由于上述函数不工作,因此当前已注解掉该函数):

import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.*;
import java.util.*;
import java.util.logging.*;
import com.microsoft.azure.documentdb.Document;

/**
 * Azure Functions in Java with Cosmos DB Trigger.
 */
public class CosmosOperation {
    @FunctionName("create")
    @CosmosDBOutput(name = "database", databaseName = "hyperscalerapi-cosmos-dev", collectionName = "Items",
            connectionStringSetting = "cosmosDBConnectionString")
        public static String create (
            @QueueTrigger(name = "envelope", queueName = "hyperscalerapi-queue",
            connection = "hyperScalerAPIStorageQueue") CfLog envelope,
            final ExecutionContext context) {
                return "Payload: " + envelope.getPayload() + "\nWith ID: " + 
                envelope.getExecution().getCorrelationId() + "added to Cosmos DB";
            }
}`

运行时出现错误消息(实际上从未完成运行,我必须终止):

[2023-02-27T17:12:43.295Z] Function "ReceiveFromQueue" (Id: ece2405b-1ddf-4f48-a1d9-9d35f4fe4d83) invoked by Java Worker
[2023-02-27T17:12:43.386Z] Executed 'Functions.ReceiveFromQueue' (Succeeded, Id=ece2405b-1ddf-4f48-a1d9-9d35f4fe4d83, Duration=173ms)
[2023-02-27T17:12:44.571Z] Feb 27, 2023 12:12:44 PM io.netty.channel.ChannelInitializer exceptionCaught
[2023-02-27T17:12:44.576Z] WARNING: Failed to initialize a channel. Closing: [id: 0x62de8a42]
[2023-02-27T17:12:44.578Z] java.lang.NoSuchMethodError: 'void io.netty.handler.codec.http.HttpClientCodec.<init>(int, int, int, boolean, boolean, int, boolean, boolean)'
...
[2023-02-27T17:12:44.751Z]      at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:192)
[2023-02-27T17:12:44.752Z]      at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:201)
[2023-02-27T17:12:44.753Z]      at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect$ClientTransportSubscriber.onError(HttpClientConnect.java:304)
[2023-02-27T17:12:44.754Z]      at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:201)
[2023-02-27T17:12:44.757Z]      at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onError(DefaultPooledConnectionProvider.java:172)
[2023-02-27T17:12:44.760Z]      at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.fail(AbstractPool.java:445)
[2023-02-27T17:12:44.762Z]      at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.lambda$drainLoop$9(SimpleDequePool.java:400)
[2023-02-27T17:12:44.765Z]      at reactor.core.publisher.FluxDoOnEach$DoOnEachSubscriber.onError(FluxDoOnEach.java:186)
[2023-02-27T17:12:44.766Z]      at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:201)
[2023-02-27T17:12:44.768Z]      at reactor.netty.resources.DefaultPooledConnectionProvider$PooledConnectionAllocator$PooledConnectionInitializer.onError(DefaultPooledConnectionProvider.java:554)
[2023-02-27T17:12:44.770Z]      at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192)
[2023-02-27T17:12:44.771Z]      at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259)
[2023-02-27T17:12:44.776Z]      at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106)
[2023-02-27T17:12:44.778Z]      at reactor.core.publisher.Operators.error(Operators.java:198)
[2023-02-27T17:12:44.780Z]      at reactor.core.publisher.MonoError.subscribe(MonoError.java:53)
[2023-02-27T17:12:44.781Z]      at reactor.core.publisher.Mono.subscribe(Mono.java:4490)
[2023-02-27T17:12:44.782Z]      at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
[2023-02-27T17:12:44.782Z]      at reactor.netty.transport.TransportConnector$MonoChannelPromise.tryFailure(TransportConnector.java:517)
[2023-02-27T17:12:44.783Z]      at reactor.netty.transport.TransportConnector$MonoChannelPromise.setFailure(TransportConnector.java:471)

这只是错误消息的一个片段。它一直在继续。

svgewumm

svgewumm1#

我不是100%确定错误消息本身,但看起来你返回的是日志消息,而不是文档的JSON字符串。参考文档中的这个示例,同样可以使用服务总线队列触发器来完成。
下面是文档中的代码示例以供参考

@FunctionName("getItem")
@CosmosDBOutput(name = "database",
  databaseName = "ToDoList",
  collectionName = "Items",
  connectionStringSetting = "AzureCosmosDBConnection")
public String cosmosDbQueryById(
    @QueueTrigger(name = "msg",
      queueName = "myqueue-items",
      connection = "AzureWebJobsStorage")
    String message,
    final ExecutionContext context)  {
     return "{ id: \"" + System.currentTimeMillis() + "\", Description: " + message + " }";
   }

如果需要按原样存储envelope对象,则应直接将其作为JSON字符串返回。

相关问题