原因:java.lang.runtimeexception:java.io.notserializableexception:io.netty.channel.defaultchannelhandlercontext位于org.apache.storm.serialization.serializableserializer.write(serializableserializer)。java:41)~[风暴核心-1.0.1.2.5.0.0-1245。jar:1.0.1.2.5.0.0-1245]在com.esotericsoftware.kryo.kryo.writeclassandobject(kryo。java:628) ~[kryo-3.0.3.jar:?]位于com.esotericsoftware.kryo.serializers.mapserializer.write(mapserializer。java:113)~[kryo-3.0.3.jar:?]位于com.esotericsoftware.kryo.serializers.mapserializer.write(mapserializer。java:39)~[kryo-3.0.3.jar:?]在com.esotericsoftware.kryo.kryo.writeclassandobject(kryo。java:628)~[kryo-3.0.3.jar:?]在com.esotericsoftware.kryo.serializers.collectionserializer.write(collectionserializer。java:100)~[kryo-3.0.3.jar:?]位于com.esotericsoftware.kryo.serializers.collectionserializer.write(collectionserializer。java:40)~[kryo-3.0.3.jar:?]在com.esotericsoftware.kryo.kryo.writeobject(kryo。java:534)~[kryo-3.0.3.jar:?]在org.apache.storm.serialization.kryovaluesserializer.serializationinto(kryovaluesserializer。java:44)~[风暴核心-1.0.1.2.5.0.0-1245。jar:1.0.1.2.5.0.0-1245]在org.apache.storm.serialization.kryotupleserializer.serialize(kryotupleserializer。java:44)~[风暴核心-1.0.1.2.5.0.0-1245。jar:1.0.1.2.5.0.0-1245]在org.apache.storm.daemon.worker$mk\u transfer\u fn$transfer\u fn\u 6723.invoke(worker。clj:192)~[风暴核心-1.0.1.2.5.0.0-1245。jar:1.0.1.2.5.0.0-1245]在org.apache.storm.daemon.executor$start\u batch\u transfer\uu gt\u worker\u handler\u bang\uufn\u6411.invoke(executor。clj:313)~[风暴核心-1.0.1.2.5.0.0-1245。jar:1.0.1.2.5.0.0-1245]在org.apache.storm.disruptor$clojure\u handler$reify\u 6005.onevent(disruptor。clj:40)~[风暴核心-1.0.1.2.5.0.0-1245。jar:1.0.1.2.5.0.0-1245]在org.apache.storm.utils.disruptorqueue.consumebatchtocursor(disruptorqueue。java:451)~[风暴核心-1.0.1.2.5.0.0-1245。jar:1.0.1.2.5.0.0-1245] ... 6个以上
我用的是暴风本地模式没问题,但是对集群会报错。
这是我的密码:
public class NettySpout extends BaseRichSpout {
private static final long serialVersionUID = 1L;
/**
* colloctor for spout
*/
private SpoutOutputCollector collector;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
collector=spoutOutputCollector;
StormServer stormServer=new StormServer();
stormServer.run();
}
@Override
public void nextTuple() {
Values tuple;
try {
while ((tuple = ServerHandler.queue.take()) != null) {
collector.emit(tuple);
}
} catch (Exception e) {
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("value","channl"));
}
public class ServerHandler extends ChannelInboundHandlerAdapter{
private static Logger logger = LogManager.getLogger(ServerHandler.class);
public static LinkedBlockingQueue<Values> queue = new LinkedBlockingQueue<Values>();
public static Map<String,ChannelHandlerContext> ctxes;
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
JSONObject message = (JSONObject) msg;
queue.put(new Values(new StreamData(message.toString().getBytes()), new HashMap<>(ctxes)));
}
1条答案
按热度按时间klsxnrf11#
我不太了解风暴本身,但似乎你试图序列化channelhandlercontext(它存储在Map中),这是不可序列化的。