scdf聚合器notserializableexception

7y4bm7vi  于 2021-07-14  发布在  Java
关注(0)|答案(1)|浏览(321)

我正在尝试拆分一个字符串,比如“a,b,c”,然后聚合它们。但失败了。我在scdf中创建一个流,如下所示:

:test-agg-redis > splitter | aggregator

自定义参数:
app.aggregator.aggregation=#这个[有效载荷]
app.aggregator.message store type=redis
app.splitter.delimiters=,
然后我启动一个自定义源,如:

@Bean
public Supplier<String> simpleString() {
    return () ->
        "a,b,c"
    ;
}

但聚合器失败:

2021-04-22 04:51:59.264 ERROR [aggregator-processor,604bc6c7962cf644,9c467b646ef31f5a,true] 553 --- [oundedElastic-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'aggregator'; defined in: 'org.springframework.cloud.fn.aggregator.AggregatorFunctionConfiguration'; from source: 'org.springframework.core.type.StandardMethodMetadata@f627d13']; nested exception is java.lang.IllegalArgumentException: If relying on the default RedisSerializer (JdkSerializationRedisSerializer) the Object must be Serializable. Either make it Serializable or provide your own implementation of RedisSerializer via 'setValueSerializer(..)', failedMessage=GenericMessage [payload=byte[1], headers={sequenceNumber=1, sequenceSize=3, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedTopic=test-agg-redis.splitter, b3=604bc6c7962cf644-c30db5ed0c06fce8-1, nativeHeaders={}, kafka_offset=240, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7ecf45f1, correlationId=0cd0c4aa-6fe1-7b77-c673-412815ca04bb, id=32d25597-c5f7-7731-2102-591425bd550f, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTimestamp=1619067118736, kafka_groupId=test-agg-redis, timestamp=1619067118896}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:79)
    at org.springframework.integration.handler.ReplyProducingMessageHandlerWrapper.handleRequestMessage(ReplyProducingMessageHandlerWrapper.java:58)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:62)
    at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:102)
    at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:37)
    at org.springframework.integration.endpoint.ReactiveStreamsConsumer.lambda$doStart$1(ReactiveStreamsConsumer.java:177)
    at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
    at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:123)
    at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:432)
    at reactor.core.publisher.EmitterProcessor.onNext(EmitterProcessor.java:274)
    at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
    at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
    at reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153)
    at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:63)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
    at org.springframework.integration.channel.FluxMessageChannel.lambda$subscribeTo$2(FluxMessageChannel.java:83)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:189)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:439)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:526)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.IllegalArgumentException: If relying on the default RedisSerializer (JdkSerializationRedisSerializer) the Object must be Serializable. Either make it Serializable or provide your own implementation of RedisSerializer via 'setValueSerializer(..)'
    at org.springframework.integration.redis.store.RedisMessageStore.rethrowAsIllegalArgumentException(RedisMessageStore.java:188)
    at org.springframework.integration.redis.store.RedisMessageStore.doStoreIfAbsent(RedisMessageStore.java:128)
    at org.springframework.integration.store.AbstractKeyValueMessageStore.doAddMessage(AbstractKeyValueMessageStore.java:145)
    at org.springframework.integration.store.AbstractKeyValueMessageStore.addMessagesToGroup(AbstractKeyValueMessageStore.java:212)
    at org.springframework.integration.store.AbstractMessageGroupStore.addMessageToGroup(AbstractMessageGroupStore.java:189)
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.store(AbstractCorrelatingMessageHandler.java:780)
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.processMessageForGroup(AbstractCorrelatingMessageHandler.java:495)
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:474)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:62)
    ... 27 more
Caused by: org.springframework.data.redis.serializer.SerializationException: Cannot serialize; nested exception is org.springframework.core.serializer.support.SerializationFailedException: Failed to serialize object using DefaultSerializer; nested exception is java.io.NotSerializableException: org.springframework.kafka.core.DefaultKafkaConsumerFactory$1
    at org.springframework.data.redis.serializer.JdkSerializationRedisSerializer.serialize(JdkSerializationRedisSerializer.java:96)
    at org.springframework.data.redis.core.AbstractOperations.rawValue(AbstractOperations.java:127)
    at org.springframework.data.redis.core.DefaultValueOperations.setIfAbsent(DefaultValueOperations.java:295)
    at org.springframework.data.redis.core.DefaultBoundValueOperations.setIfAbsent(DefaultBoundValueOperations.java:149)
    at org.springframework.integration.redis.store.RedisMessageStore.doStoreIfAbsent(RedisMessageStore.java:121)
    ... 34 more
Caused by: org.springframework.core.serializer.support.SerializationFailedException: Failed to serialize object using DefaultSerializer; nested exception is java.io.NotSerializableException: org.springframework.kafka.core.DefaultKafkaConsumerFactory$1
    at org.springframework.core.serializer.support.SerializingConverter.convert(SerializingConverter.java:64)
    at org.springframework.core.serializer.support.SerializingConverter.convert(SerializingConverter.java:33)
    at org.springframework.data.redis.serializer.JdkSerializationRedisSerializer.serialize(JdkSerializationRedisSerializer.java:94)
    ... 38 more
Caused by: java.io.NotSerializableException: org.springframework.kafka.core.DefaultKafkaConsumerFactory$1
    at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
    at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
    at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.base/java.io.ObjectOutputStream.writeArray(Unknown Source)
    at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
    at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
    at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
    at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
    at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
    at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
    at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
    at java.base/java.util.HashMap.internalWriteEntries(Unknown Source)
    at java.base/java.util.HashMap.writeObject(Unknown Source)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.base/java.lang.reflect.Method.invoke(Unknown Source)
    at java.base/java.io.ObjectStreamClass.invokeWriteObject(Unknown Source)
    at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
    at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
    at java.base/java.io.ObjectOutputStream.defaultWriteObject(Unknown Source)
    at org.springframework.messaging.MessageHeaders.writeObject(MessageHeaders.java:316)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.base/java.lang.reflect.Method.invoke(Unknown Source)
    at java.base/java.io.ObjectStreamClass.invokeWriteObject(Unknown Source)
    at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
    at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
    at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
    at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
    at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
    at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
    at org.springframework.core.serializer.DefaultSerializer.serialize(DefaultSerializer.java:46)
    at org.springframework.core.serializer.Serializer.serializeToByteArray(Serializer.java:56)
    at org.springframework.core.serializer.support.SerializingConverter.convert(SerializingConverter.java:60)
    ... 40 more

为什么?我使用的是最新版本的scdf。谢谢!
注意:如果我没有设置参数'app.aggregator.message store type'aggregator不抛出exoption,但是app'log'打印:[“yq==”,“yg==”,“yw==”]似乎是解码错误。为什么

1tu0hz3e

1tu0hz3e1#

请在github中针对聚合器应用打开一个问题。
消息存储使用java序列化来存储消息;这个 Consumer 聚合前应过滤掉标头;它不可序列化。

相关问题