由以下原因引起:java.lang.runtimeexception:java.io.notserializableexception:io.netty.channel.defaultchannelhandlercontext

fumotvh3  于 2021-06-24  发布在  Storm
关注(0)|答案(1)|浏览(472)

原因:java.lang.runtimeexception:java.io.notserializableexception:io.netty.channel.defaultchannelhandlercontext位于org.apache.storm.serialization.serializableserializer.write(serializableserializer)。java:41)~[风暴核心-1.0.1.2.5.0.0-1245。jar:1.0.1.2.5.0.0-1245]在com.esotericsoftware.kryo.kryo.writeclassandobject(kryo。java:628) ~[kryo-3.0.3.jar:?]位于com.esotericsoftware.kryo.serializers.mapserializer.write(mapserializer。java:113)~[kryo-3.0.3.jar:?]位于com.esotericsoftware.kryo.serializers.mapserializer.write(mapserializer。java:39)~[kryo-3.0.3.jar:?]在com.esotericsoftware.kryo.kryo.writeclassandobject(kryo。java:628)~[kryo-3.0.3.jar:?]在com.esotericsoftware.kryo.serializers.collectionserializer.write(collectionserializer。java:100)~[kryo-3.0.3.jar:?]位于com.esotericsoftware.kryo.serializers.collectionserializer.write(collectionserializer。java:40)~[kryo-3.0.3.jar:?]在com.esotericsoftware.kryo.kryo.writeobject(kryo。java:534)~[kryo-3.0.3.jar:?]在org.apache.storm.serialization.kryovaluesserializer.serializationinto(kryovaluesserializer。java:44)~[风暴核心-1.0.1.2.5.0.0-1245。jar:1.0.1.2.5.0.0-1245]在org.apache.storm.serialization.kryotupleserializer.serialize(kryotupleserializer。java:44)~[风暴核心-1.0.1.2.5.0.0-1245。jar:1.0.1.2.5.0.0-1245]在org.apache.storm.daemon.worker$mk\u transfer\u fn$transfer\u fn\u 6723.invoke(worker。clj:192)~[风暴核心-1.0.1.2.5.0.0-1245。jar:1.0.1.2.5.0.0-1245]在org.apache.storm.daemon.executor$start\u batch\u transfer\uu gt\u worker\u handler\u bang\uufn\u6411.invoke(executor。clj:313)~[风暴核心-1.0.1.2.5.0.0-1245。jar:1.0.1.2.5.0.0-1245]在org.apache.storm.disruptor$clojure\u handler$reify\u 6005.onevent(disruptor。clj:40)~[风暴核心-1.0.1.2.5.0.0-1245。jar:1.0.1.2.5.0.0-1245]在org.apache.storm.utils.disruptorqueue.consumebatchtocursor(disruptorqueue。java:451)~[风暴核心-1.0.1.2.5.0.0-1245。jar:1.0.1.2.5.0.0-1245] ... 6个以上
我用的是暴风本地模式没问题,但是对集群会报错。
这是我的密码:

public class NettySpout extends BaseRichSpout {

private static final long serialVersionUID = 1L;
/**
 * colloctor for spout
 */
private SpoutOutputCollector collector;

@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
    collector=spoutOutputCollector;
    StormServer stormServer=new StormServer();
    stormServer.run();
}

@Override
public void nextTuple() {
    Values tuple;
    try {
        while ((tuple = ServerHandler.queue.take()) != null) {
            collector.emit(tuple);
        }
    } catch (Exception e) {
    }
}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    outputFieldsDeclarer.declare(new Fields("value","channl"));
}

public class ServerHandler extends ChannelInboundHandlerAdapter{

private static Logger logger = LogManager.getLogger(ServerHandler.class);
public static LinkedBlockingQueue<Values> queue = new LinkedBlockingQueue<Values>();
public static Map<String,ChannelHandlerContext> ctxes;

public void channelRead(ChannelHandlerContext ctx, Object msg)  throws Exception {
    JSONObject message = (JSONObject) msg;
    queue.put(new Values(new StreamData(message.toString().getBytes()), new HashMap<>(ctxes)));

}
klsxnrf1

klsxnrf11#

我不太了解风暴本身,但似乎你试图序列化channelhandlercontext(它存储在Map中),这是不可序列化的。

相关问题