sqsacksink使akka流永远挂起,导致图形重新启动

piztneat  于 2021-06-27  发布在  Java
关注(0)|答案(0)|浏览(338)

我尝试用一个sqs源代码和一个localstack测试来实现一个简单的工作流。如果我再加上 SqsAckSink ,也不适用于 SqsAckFlow . 但如果我移除 SqsAckSink 就用它吧 Sink.seq() ,则测试通过。拥有 SqsAckSink 或者 SqsAckFlow 让考试永远悬着。我也在测试中启用了debug,看到同样的错误一次又一次地重复,使得图形重新启动,但这对我来说没有多大意义。我将在下面的代码片段之后发布错误消息。
代码是:

public class DefaultWorkflow implements Workflow {

  private Function<Throwable, Supervision.Directive> errorStrategy;
  private final ActorSystem actorSystem;
  private FlowMonitor<ProvisioningResult> workflowMonitor;
  private final String queueUrl;
  private final SqsAsyncClient asyncClient;

  @Inject
  public DefaultWorkflow(ActorSystem actorSystem, String queueUrl) {

    this.errorStrategy = exc -> (Supervision.Directive) Supervision.resume();
    this.actorSystem = actorSystem;
    this.queueUrl = queueUrl;

    asyncClient =
        SqsAsyncClient.builder()
            .region(Region.of(Localstack.getDefaultRegion()))
            .httpClient(AkkaHttpClient.builder().withActorSystem(actorSystem).build())
            .endpointOverride(new URI(queueUrl))
            .build();
    doWork();
  }

  private Flow<Message, ProvisioningResult, NotUsed> buildFlow() {
    return Flow.of(Message.class)
        .via(Flow.of(Message.class).map(m -> ProvisioningResult.builder().body(m.body()).build()));
  }

  @Override
  public Source<ProvisioningResult, FlowMonitor<ProvisioningResult>> getSource() {
    Source<Message, NotUsed> sqsSource =
        RestartSource.onFailuresWithBackoff(
            Duration.ofSeconds(1), Duration.ofSeconds(2), 0.1, this::createSQSSource);
    return sqsSource
        .via(buildFlow())
        .withAttributes(ActorAttributes.withSupervisionStrategy(errorStrategy))
        .monitorMat(Keep.right());
  }

  private Source<Message, NotUsed> createSQSSource() {
    SqsSourceSettings sqsSourceSettings = SqsSourceSettings.create().withMaxBatchSize(1);
    return SqsSource.create(queueUrl, sqsSourceSettings, asyncClient);
  }

  @Override
  public FlowMonitor<ProvisioningResult> getWorkflowMonitor() {
    return this.workflowMonitor;
  }

  private void doWork() {
    Pair<FlowMonitor<ProvisioningResult>, CompletionStage<Done>> run =
        getSource()
            .toMat(SqsAckSink.create(queueUrl, SqsAckSettings.create(), asyncClient), Keep.both())
            .run(actorSystem);
    workflowMonitor = run.first();
  }
}

测试如下所示:

@Test
  public void getSource_givenMessage_shouldProduceResult()
      throws InterruptedException, ExecutionException, TimeoutException, URISyntaxException {
    String sqsName = "sqs2";
    String messageBody = "someMessage";
    String sqsUrl = initSQS(sqsName);
    generateSourceData(sqsUrl, messageBody);

    this.defaultWorkflow = new DefaultWorkflow(this.actorSystem, sqsUrl);

    Source<ProvisioningResult, FlowMonitor<ProvisioningResult>> source =
        defaultWorkflow.getSource();
    final CompletionStage<List<ProvisioningResult>> future =
        source.take(1).runWith(Sink.seq(), materializer);
    final List<ProvisioningResult> result = future.toCompletableFuture().join();
    assertEquals(1, result.size());
    assertEquals(result.get(0).getBody(), messageBody);
  }

  public void generateSourceData(String queueUrl, String messageBody) {
    client
        .sendMessage(
            SendMessageRequest.builder().queueUrl(queueUrl).messageBody(messageBody).build())
        .join();
  }

  private void initClient() throws URISyntaxException {
    System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
    AwsCredentials credentials = AwsBasicCredentials.create("somekey", "somevalue");
    StaticCredentialsProvider provider = StaticCredentialsProvider.create(credentials);

    client =
        SqsAsyncClient.builder()
            .region(Region.of(Localstack.getDefaultRegion()))
            .httpClient(AkkaHttpClient.builder().withActorSystem(ActorSystem.create()).build())
            .credentialsProvider(provider)
            .endpointOverride(new URI(Localstack.INSTANCE.getEndpointSQS()))
            .build();
  }

  protected String initSQS(String queueName) throws URISyntaxException {
    initClient();
    client
        .createQueue(CreateQueueRequest.builder().queueName(queueName).build())
        .join()
        .queueUrl();
    GetQueueUrlResponse response = client.getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build()).join();
    System.out.println("Using queue " + response);
    log.info("Using queue {}", response);
    return response.queueUrl();
  }

当我启用调试时,我看到这个错误会永远重复:
[info][debug]s.a.a.c.i.executioninterceptorchain-创建一个拦截器链,该链将按以下顺序应用拦截器:[software.amazon.awssdk.awscore.interceptor。helpfulunknownhostexceptioninterceptor@41a58812,software.amazon.awssdk.services.sqs.internal。messagemd5checksuminterceptor@4f626e56,software.amazon.awssdk.protocols.query.interceptor。queryparameterstobodyinterceptor@4b5de362][信息][调试]s.a.a.c.i.executioninterceptorchain-interceptor'software.amazon.awssdk.protocols.query.interceptor。queryparameterstobodyinterceptor@4b5de362'使用modifyhttprequest方法修改了消息[info][debug]s.a.a.request-发送请求:defaultsdkhttpfullrequest(httpmethod=post,protocol=http,host=localhost,port=4566,encodedpath=,headers=[amz sdk invocation id,content length,content type,user agent],queryparameters=[])[info][debug]s.a.a.s.aws4signer-aws4要签名的字符串:aws4-hmac-sha256[info]20201230t233655z[info]20201230/us-east-1/sqs/aws4\u request[info]5e95b0e6707472e57434cb0da9516ef2ef4c747760bda87e9207161f9834d6dc01[info][debug]s.a.request-收到的错误响应:500[info][debug]s.a.a.request-可重试的错误检测。将在47ms后重试。请求尝试次数2[info][debug]s.a.a.request-重试请求:defaultsdkhttpfullrequest(httpmethod=post,protocol=http,host=localhost,port=4566,encodedpath=,headers=[amz sdk调用id,内容长度,内容类型,用户代理],queryparameters=[])[info][debug]s.a.a.s.aws4signer-aws4要签名的字符串:aws4-hmac-sha256[info]20201230t233655z[info]20201230/us-east-1/sqs/aws4\u request[info]ebce46865d8fface363b83481672fa3a7a11f584f2ea7c1e2b56e3813afc[info][debug]s.a.request-收到的错误响应:500[info][debug]s.a.a.request-可重试的错误检测。将在51毫秒后重试。请求尝试次数3[info][debug]s.a.a.request-重试请求:defaultsdkhttpfullrequest(httpmethod=post,protocol=http,host=localhost,port=4566,encodedpath=,headers=[amz sdk调用id,内容长度,内容类型,用户代理],queryparameters=[])[info][debug]s.a.a.s.aws4signer-aws4要签名的字符串:aws4-hmac-sha256[info]20201230t233655z[info]20201230/us-east-1/sqs/aws4\u request[info]4085d30b4fa9cce89c435ed7a640539a665bc2bfd4322fafd4095d2cb58fab1[info][debug]s.a.request-收到的错误响应:500[info][debug]s.a.a.request-可重试的错误检测。将在230ms后重试。请求尝试次数4[info][debug]s.a.a.request-重试请求:defaultsdkhttpfullrequest(httpmethod=post,protocol=http,host=localhost,port=4566,encodedpath=,headers=[amz sdk调用id,内容长度,内容类型,用户代理],queryparameters=[])[info][debug]s.a.a.s.aws4signer-aws4要签名的字符串:aws4-hmac-sha256[info]20201230t233655z[info]20201230/us-east-1/sqs/aws4\u request[info]618037af1ce6008ce721fd3ca9cddde2e3f2893d50bd9ff5a241cc2061e1d67f[info][debug]s.a.request-收到的错误响应:500[info][warn]a.s.restartwackoffsource-由于故障而重新启动图形。堆栈跟踪:[info]java.util.concurrent.completionexception:software.amazon.awssdk.services.sqs.model.sqseexception:null(服务:sqs,状态代码:500,请求id:null)[info]位于software.amazon.awssdk.utils.completablefutureutils.errorascompletionexception(completablefutureutils)。java:60)[信息]位于software.amazon.awssdk.core.internal.http.pipeline.stages.asyncexecutionfailureexceptionreportingstage.lambda$execute$0(asyncexecutionfailureexceptionreportingstage)。java:51)[信息]在java.base/java.util.concurrent.completablefuture.unihandle(completablefuture。java:930)[信息]位于java.base/java.util.concurrent.completablefuture$unihandle.tryfire(completablefuture。java:907)[信息]位于java.base/java.util.concurrent.completablefuture.postcomplete(completablefuture。java:506)[信息]在java.base/java.util.concurrent.completablefuture.completeexceptionally(completablefuture。java:2088)[信息]位于software.amazon.awssdk.utils.completablefutureutils.lambda$forwardexception$0(completablefutureutils)。java:74)[信息]位于java.base/java.util.concurrent.completablefuture.uniwhencomplete(completablefuture。java:859)[信息]在java.base/java.util.concurrent.completablefuture$uniwhencomplete.tryfire(completablefuture)。java:837)[信息]位于java.base/java.util.concurrent.completablefuture.postcomplete(completablefuture。java:506)[信息]位于java.base/java.util.concurrent.completablefuture.completeexceptional(completablefuture。java:2088)[信息]在software.amazon.awssdk.core.internal.http.pipeline.stages.asyncretryablestage$retryingexecutor.maybeatTestExecute(asyncretryablestage)。java:85)[信息]位于software.amazon.awssdk.core.internal.http.pipeline.stages.asyncretryablestage$retryingexecutor.mayberetryexecute(asyncretryablestage)。java:144)[信息]在software.amazon.awssdk.core.internal.http.pipeline.stages.asyncretryablestage$retryingexecutor.lambda$attemptexecute$1(asyncretryablestage)。java:133)[信息]位于java.base/java.util.concurrent.completablefuture.uniwhencomplete(completablefuture。java:859)[信息]在java.base/java.util.concurrent.completablefuture$uniwhencomplete.tryfire(completablefuture)。java:837)[信息]位于java.base/java.util.concurrent.completablefuture.postcomplete(completablefuture。java:506)[信息]位于java.base/java.util.concurrent.completablefuture.complete(completablefuture。java:2073)[信息]在software.amazon.awssdk.core.internal.http.pipeline.stages.makeasynchttprequeststage.lambda$executehttprequest$1(makeasynchttprequeststage)。java:167)[信息]位于java.base/java.util.concurrent.completablefuture.uniwhencomplete(completablefuture。java:859)[信息]在java.base/java.util.concurrent.completablefuture$uniwhencomplete.tryfire(completablefuture)。java:837)[信息]位于java.base/java.util.concurrent.completablefuture$completion.run(completablefuture。java:478)[信息]位于java.base/java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1128)[信息]在java.base/java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:628)[信息]位于java.base/java.lang.thread.run(thread。java:834)[信息]原因:software.amazon.awssdk.services.sqs.model.sqsexception:null(服务:sqs,状态代码:500,请求id:null)[info]位于software.amazon.awssdk.services.sqs.model.sqsexception$builderimpl.build(sqsexception)。java:95)[信息]在software.amazon.awssdk.services.sqs.model.sqsexception$builderimpl.build(sqsexception。java:55)[信息]在software.amazon.awssdk.protocols.query.internal.unmarshall.awsxmlerorrunmarshaller.unmarshall(awsxmlerorrunmarshaller)。java:97)[信息]在software.amazon.awssdk.protocols.query.unmarshall.awsxmlerrotocolunmarshaller.handle(awsxmlerrotocolunmarshaller。java:102)[信息]在software.amazon.awssdk.protocols.query.unmarshall.awsxmlerrotocolunmarshaller.handle(awsxmlerrotocolunmarshaller)。java:82)[信息]位于software.amazon.awssdk.core.internal.http.async.asyncresponsehandler.lambda$prepare$0(asyncresponsehandler)。java:88)[信息]在java.base/java.util.concurrent.completablefuture$unicompose.tryfire(completablefuture)。java:1072)[信息]位于java.base/java.util.concurrent.completablefuture.postcomplete(completablefuture。java:506)[信息]位于java.base/java.util.concurrent.completablefuture.complete(completablefuture。java:2073)[信息]在software.amazon.awssdk.core.internal.http.async.asyncresponsehandler$subscriber.oncomplete(异步响应句柄)。java:129)[信息]在akka.stream.impl.reactivestreamscompliance$.tryoncomplete(reactivestreamscompliance。scala:114)[信息]在akka.stream.impl.fusing.actorgraphinterpreter$actorroutputboundary.complete(actorgraphinterpreter。scala:390) [信息]在akka.stream.impl.fusing.actorgraphinterpreter$actorroutputboundary.onupstreamfinish(actorgraphinterpreter。scala:416)[信息]位于akka.stream.impl.fusing.graphinterpeter.processevent(graphinterpeter。scala:523)[信息]在akka.stream.impl.fusing.graphinterpeter.execute(graphinterpeter。scala:390)[信息]在akka.stream.impl.fusing.graphinterpetershell.runbatch(actorgraphinterpreter。scala:625)[信息]位于akka.stream.impl.fusing.graphinterpetershell$asyncinput.execute(actorgraphinterpreter)。scala:502)[信息]位于akka.stream.impl.fusing.graphinterperetershell.processevent(actorgraphinterpreter。scala:600)[信息]在akka.stream.impl.fusing.actorgraphinterpreter.akka$stream$impl$fusing$actorgraphinterpreter$$processevent(actorgraphinterpreter)。scala:769)[信息]在akka.stream.impl.fusing.actorgraphinterpreter$$anonfun$接收$1.applyorelse(actorgraphinterpreter)。scala:784)[信息]在akka.actor.actor.aroundreceive(actor。scala:537)[信息]在akka.actor.actor.aroundreceive$(actor。scala:535)[信息]在akka.stream.impl.fusing.actorgraphinterpreter.aroundreceive(actorgraphinterpreter。scala:691)[信息]在akka.actor.actorcell.receivemessage(actorcell。scala:577)[信息]在akka.actor.actorcell.invoke(actorcell。scala:547)[信息]在akka.dispatch.mailbox.processmailbox(mailbox。scala:270) [信息]在akka.dispatch.mailbox.run(mailbox。scala:231)[信息]在akka.dispatch.mailbox.exec(mailbox。scala:243)[信息]位于java.base/java.util.concurrent.forkjointask.doexec(forkjointask。java:290)[信息]位于java.base/java.util.concurrent.forkjoinpool$workqueue.toplevelexec(forkjoinpool)。java:1020)[信息]在java.base/java.util.concurrent.forkjoinpool.scan(forkjoinpool。java:1656)[信息]位于java.base/java.util.concurrent.forkjoinpool.runworker(forkjoinpool。java:1594)[信息]位于java.base/java.util.concurrent.forkjoinworkerthread.run(forkjoinworke

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题