scala 原因:java.io.NotSerializableException:java.util.unction.Predicate$$Lambda

bihw5rsg  于 2022-11-09  发布在  Scala
关注(0)|答案(1)|浏览(180)

如何使java.util.unction.Predicate可序列化?不是我的应用程序中的一个类,并且我没有在任何地方使用它,我如何解决这个问题?
这是一叠东西

App > Serialization stack:
App >   - object not serializable (class: java.util.function.Predicate$$Lambda$309/931003277, value: java.util.function.Predicate$$Lambda$309/931003277@5c97826a)
App >   - field (class: io.github.resilience4j.retry.RetryConfig, name: exceptionPredicate, type: interface java.util.function.Predicate)
App >   - object (class io.github.resilience4j.retry.RetryConfig, io.github.resilience4j.retry.RetryConfig@20496d9f)
App >   - field (class: determination.resilience.GetTaxResilience, name: retryConfig, type: class io.github.resilience4j.retry.RetryConfig)
App >   - object (class determination.resilience.GetTaxResilience, determination.resilience.GetTaxResilience@af49e79)
App >   - field (class: vertex.VertexTaxComplianceOperator, name: getTaxResilience, type: class determination.resilience.GetTaxResilience)
App >   - object (class vertex.VertexTaxComplianceOperator, vertex.VertexTaxComplianceOperator@3e68a323)
App >   - field (class: vertex.VertexTaxComplianceOperator$$anonfun$5, name: $outer, type: class vertex.VertexTaxComplianceOperator)
App >   - object (class vertex.VertexTaxComplianceOperator$$anonfun$5, <function1>)
App >   - field (class: org.apache.spark.sql.execution.MapElementsExec, name: func, type: class java.lang.Object)
App >   - object (class org.apache.spark.sql.execution.MapElementsExec, MapElements <function1>, obj#512:

我很困惑,这是属于另一个jar的东西,我只是用它,为什么我在这一行得到序列化错误。

App >   - field (class: io.github.resilience4j.retry.RetryConfig, name: exceptionPredicate, type: interface java.util.function.Predicate)

以下是我的代码

@Component
class GetTaxResilience extends Serializable  {
    private val logger = LoggerFactory.getLogger(this.getClass)

    val retryConfig = RetryConfig.custom()
        .maxAttempts(RESILIENCE_RETRY_MAX_ATTEMPTS)
        .retryExceptions(classOf[StatusRuntimeException])
        .intervalFunction(IntervalFunction.ofRandomized(RESILIENCE_RETRY_INTERVAL_MILLISECONDS))
        .build

    val retryRegistry = RetryRegistry.of(retryConfig)

    val retryInstance = retryRegistry.retry(GET_VERTEX_TAX_RETRY_DEFAULT_NAME, retryConfig)

    def getDefaultRetryInstance(): Retry = {
        // default retry instance configured in yaml
        logger.info("Retrieving default retry instance: {}", GET_VERTEX_TAX_RETRY_DEFAULT_NAME)
        retryInstance
    }
}

编辑:

@Autowired
var getTaxResilience: GetTaxResilience = null

withRetry(getTaxResilience.getDefaultRetryInstance())

而且,发生错误的地方已经是可序列化的。

public class RetryConfig implements Serializable

编辑2

val calculateResultDF = preparedDataSetDF.mapPartitions(iterator => {

            val calculateResultDF = iterator.map(row => {
    //retry stuff
                )
            calculateResultDF
        }).toDF()

spark.sql(sqlBuilder.createStagingTableInsertIntoSQL(jobInfoCase.run_id)).toDF()

        insertIntoExceptionArchiveTable(spark, jobInfoCase.run_id)
1bqhqjot

1bqhqjot1#

Spark正在尝试序列化对象的retryConfig字段。retryConfig字段中的对象包含对不可序列化且无法使其可序列化的内容的引用。
一种解决方案是将retryConfig标记为@transient。这将阻止它被序列化(在接收端,它将是null)。其他依赖于@transient字段的val(在本例中为retryRegistry,可传递为retryInstance)可能也应该标记为@transient
然后,您将定义一个readResolve方法,该方法将在接收端被调用。此方法将返回一个新的GetTaxResilience,它将具有适合retryConfig/retryRegistry/retryInstance的值。

class GetTaxResilience extends Serializable  {
    private val logger = LoggerFactory.getLogger(this.getClass)

    @transient val retryConfig = RetryConfig.custom()
        .maxAttempts(RESILIENCE_RETRY_MAX_ATTEMPTS)
        .retryExceptions(classOf[StatusRuntimeException])
        .intervalFunction(IntervalFunction.ofRandomized(RESILIENCE_RETRY_INTERVAL_MILLISECONDS))
        .build

    @transient val retryRegistry = RetryRegistry.of(retryConfig)

    @transient val retryInstance = retryRegistry.retry(GET_VERTEX_TAX_RETRY_DEFAULT_NAME, retryConfig)

    def getDefaultRetryInstance(): Retry = {
        // default retry instance configured in yaml
        logger.info("Retrieving default retry instance: {}", GET_VERTEX_TAX_RETRY_DEFAULT_NAME)
        retryInstance
    }

  private def readResolve: AnyRef = new GetTaxResilience
}

在本例中,由于GetTaxResilience不带构造函数参数,因此将其设置为object可能是有意义的,在这种情况下,它不必设置为Serializable,也不必具有@transient s或readResolve

object GetTaxResilience {
    private val logger = LoggerFactory.getLogger(this.getClass)

    val retryConfig = RetryConfig.custom()
        .maxAttempts(RESILIENCE_RETRY_MAX_ATTEMPTS)
        .retryExceptions(classOf[StatusRuntimeException])
        .intervalFunction(IntervalFunction.ofRandomized(RESILIENCE_RETRY_INTERVAL_MILLISECONDS))
        .build

    val retryRegistry = RetryRegistry.of(retryConfig)

    val retryInstance = retryRegistry.retry(GET_VERTEX_TAX_RETRY_DEFAULT_NAME, retryConfig)

    def getDefaultRetryInstance(): Retry = {
        // default retry instance configured in yaml
        logger.info("Retrieving default retry instance: {}", GET_VERTEX_TAX_RETRY_DEFAULT_NAME)
        retryInstance
    }
}

然后在Scala代码中访问默认重试示例,您只需要:

GetTaxResilience.getDefaultRetryInstance()

(尽管更适合Scala的方式应该是GetTaxResilience.retryInstance)

相关问题