flinkkinesisconsumer不在nohttpresponseexception上重试?

xriantvc  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(309)

(aws emr发布标签5.28.x上的apache flink1.8)
我们的数据源是一个aws kinesis流(如果需要的话,有450个碎片)。我们使用flinkkinesis消费者来读取运动流。我们的应用程序偶尔(每两天一次)会因“目标服务器响应失败”错误而崩溃。完整堆栈跟踪位于底部。
进一步查看代码库,我发现“ProvisionedthroughputeExceedeDexception”是唯一可以重试的异常类型。代码
1.想知道为什么kinesis连接器不重试一个 transient http响应异常吗?
2.有没有一种方法可以传递一个重试配置来重试这些错误?
作为补充说明,我们设置了以下重试配置-

env.setRestartStrategy(RestartStrategies.failureRateRestart(12,
      org.apache.flink.api.common.time.Time.of(60, TimeUnit.MINUTES),
                org.apache.flink.api.common.time.Time.of(300, TimeUnit.SECONDS)));

异常的完整堆栈跟踪-

at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1201)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1147)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2809)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2776)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2765)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetRecords(AmazonKinesisClient.java:1292)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1263)
    at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:250)
    at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:400)
    at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:243)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
wj8zmpe1

wj8zmpe11#

正在配置的重新启动策略 env.setRestartStrategy() 是关于在失败的情况下重新启动整个flink作业。它不会影响flink的动觉连接。
kinesis使用者具有以下用于更改重启行为的配置设置(从1.11开始):

/**The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard. */
    public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount";

    /**The maximum number of getRecords attempts if we get a recoverable exception. */
    public static final String SHARD_GETRECORDS_RETRIES = "flink.shard.getrecords.maxretries";

    /**The base backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException. */
    public static final String SHARD_GETRECORDS_BACKOFF_BASE = "flink.shard.getrecords.backoff.base";

    /**The maximum backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException. */
    public static final String SHARD_GETRECORDS_BACKOFF_MAX = "flink.shard.getrecords.backoff.max";

    /**The power constant for exponential backoff between each getRecords attempt. */
    public static final String SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = "flink.shard.getrecords.backoff.expconst";

    /**The interval between each getRecords request to a AWS Kinesis shard in milliseconds. */
    public static final String SHARD_GETRECORDS_INTERVAL_MILLIS = "flink.shard.getrecords.intervalmillis";
ldfqzlk8

ldfqzlk82#

kinesproxy支持重试异常,并且可以使用上一个答案中提到的设置来控制重试行为。但是,并不是所有的异常都将被重试,并且默认的白名单并没有涵盖通常会在kinesis服务中发生的所有暂时性问题。我们定制了如下代理(随时间推移),以实现稳定的生产设置:

@Override
  protected boolean isRecoverableSdkClientException(SdkClientException ex) {
    if (ex instanceof KMSThrottlingException) {
      // not handled in KinesisProxy in 1.5.x
      return true;
    } else if (ex instanceof AmazonServiceException) {
      return KinesisProxy.isRecoverableException((AmazonServiceException)ex);
    } else if (ex.getCause() instanceof SocketTimeoutException) {
      return true;
    } else if (ex.getCause() instanceof NoHttpResponseException) {
      return true;
    } else if (ex.getCause() instanceof ConnectTimeoutException) {
      return true;
    } else if (ex.getCause() instanceof java.net.UnknownHostException) {
      return true;
    } else if (ex.getCause() instanceof javax.net.ssl.SSLHandshakeException) {
      return true;
    }
    return false;
  }

相关问题