如何使java.util.unction.Predicate可序列化?不是我的应用程序中的一个类,并且我没有在任何地方使用它,我如何解决这个问题?
这是一叠东西
App > Serialization stack:
App > - object not serializable (class: java.util.function.Predicate$$Lambda$309/931003277, value: java.util.function.Predicate$$Lambda$309/931003277@5c97826a)
App > - field (class: io.github.resilience4j.retry.RetryConfig, name: exceptionPredicate, type: interface java.util.function.Predicate)
App > - object (class io.github.resilience4j.retry.RetryConfig, io.github.resilience4j.retry.RetryConfig@20496d9f)
App > - field (class: determination.resilience.GetTaxResilience, name: retryConfig, type: class io.github.resilience4j.retry.RetryConfig)
App > - object (class determination.resilience.GetTaxResilience, determination.resilience.GetTaxResilience@af49e79)
App > - field (class: vertex.VertexTaxComplianceOperator, name: getTaxResilience, type: class determination.resilience.GetTaxResilience)
App > - object (class vertex.VertexTaxComplianceOperator, vertex.VertexTaxComplianceOperator@3e68a323)
App > - field (class: vertex.VertexTaxComplianceOperator$$anonfun$5, name: $outer, type: class vertex.VertexTaxComplianceOperator)
App > - object (class vertex.VertexTaxComplianceOperator$$anonfun$5, <function1>)
App > - field (class: org.apache.spark.sql.execution.MapElementsExec, name: func, type: class java.lang.Object)
App > - object (class org.apache.spark.sql.execution.MapElementsExec, MapElements <function1>, obj#512:
我很困惑,这是属于另一个jar的东西,我只是用它,为什么我在这一行得到序列化错误。
App > - field (class: io.github.resilience4j.retry.RetryConfig, name: exceptionPredicate, type: interface java.util.function.Predicate)
以下是我的代码
@Component
class GetTaxResilience extends Serializable {
private val logger = LoggerFactory.getLogger(this.getClass)
val retryConfig = RetryConfig.custom()
.maxAttempts(RESILIENCE_RETRY_MAX_ATTEMPTS)
.retryExceptions(classOf[StatusRuntimeException])
.intervalFunction(IntervalFunction.ofRandomized(RESILIENCE_RETRY_INTERVAL_MILLISECONDS))
.build
val retryRegistry = RetryRegistry.of(retryConfig)
val retryInstance = retryRegistry.retry(GET_VERTEX_TAX_RETRY_DEFAULT_NAME, retryConfig)
def getDefaultRetryInstance(): Retry = {
// default retry instance configured in yaml
logger.info("Retrieving default retry instance: {}", GET_VERTEX_TAX_RETRY_DEFAULT_NAME)
retryInstance
}
}
编辑:
@Autowired
var getTaxResilience: GetTaxResilience = null
withRetry(getTaxResilience.getDefaultRetryInstance())
而且,发生错误的地方已经是可序列化的。
public class RetryConfig implements Serializable
编辑2
val calculateResultDF = preparedDataSetDF.mapPartitions(iterator => {
val calculateResultDF = iterator.map(row => {
//retry stuff
)
calculateResultDF
}).toDF()
spark.sql(sqlBuilder.createStagingTableInsertIntoSQL(jobInfoCase.run_id)).toDF()
insertIntoExceptionArchiveTable(spark, jobInfoCase.run_id)
1条答案
按热度按时间1bqhqjot1#
Spark正在尝试序列化对象的
retryConfig
字段。retryConfig
字段中的对象包含对不可序列化且无法使其可序列化的内容的引用。一种解决方案是将
retryConfig
标记为@transient
。这将阻止它被序列化(在接收端,它将是null
)。其他依赖于@transient
字段的val
(在本例中为retryRegistry
,可传递为retryInstance
)可能也应该标记为@transient
。然后,您将定义一个
readResolve
方法,该方法将在接收端被调用。此方法将返回一个新的GetTaxResilience
,它将具有适合retryConfig
/retryRegistry
/retryInstance
的值。在本例中,由于
GetTaxResilience
不带构造函数参数,因此将其设置为object
可能是有意义的,在这种情况下,它不必设置为Serializable
,也不必具有@transient
s或readResolve
。然后在Scala代码中访问默认重试示例,您只需要:
(尽管更适合Scala的方式应该是
GetTaxResilience.retryInstance
)