在kafka和apachestorm方案中,可能大部分开发都是使用avro作为序列化框架。但是我需要处理最复杂的数据,然后我发现kryo序列化框架也成功地将其集成到我们的项目中,该项目遵循kafka和apachestorm环境。但想进一步手术时却出现了一种奇怪的状态。
我已经给Kafka发了5次短信,风暴作业也能读取这5条短信并反序列化成功。但是下一个blot得到的数据值是错误的。打印出与上一条消息相同的值。然后我在完成反序列化代码后添加了打印输出。实际上打印出来的是真的,有5条不同的信息。为什么下一个污点不能改变这些价值观?请参见下面的代码:
kryoscheme.java文件
public abstract class KryoScheme<T> implements Scheme {
private static final long serialVersionUID = 6923985190833960706L;
private static final Logger logger = LoggerFactory.getLogger(KryoScheme.class);
private Class<T> clazz;
private Serializer<T> serializer;
public KryoScheme(Class<T> clazz, Serializer<T> serializer) {
this.clazz = clazz;
this.serializer = serializer;
}
@Override
public List<Object> deserialize(byte[] buffer) {
Kryo kryo = new Kryo();
kryo.register(clazz, serializer);
T scheme = null;
try {
scheme = kryo.readObject(new Input(new ByteArrayInputStream(buffer)), this.clazz);
logger.info("{}", scheme);
} catch (Exception e) {
String errMsg = String.format("Kryo Scheme failed to deserialize data from Kafka to %s. Raw: %s",
clazz.getName(),
new String(buffer));
logger.error(errMsg, e);
throw new FailedException(errMsg, e);
}
return new Values(scheme);
}}
打印函数.java
public class PrintFunction extends BaseFunction {
private static final Logger logger = LoggerFactory.getLogger(PrintFunction.class);
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
List<Object> data = tuple.getValues();
if (data != null) {
logger.info("Scheme data size: {}", data.size());
for (Object value : data) {
PrintOut out = (PrintOut) value;
logger.info("{}.{}--value: {}",
Thread.currentThread().getName(),
Thread.currentThread().getId(),
out.toString());
collector.emit(new Values(out));
}
}
}}
stormlocaltopology.java文件
public class StormLocalTopology {
public static void main(String[] args) {
........
BrokerHosts zk = new ZkHosts("xxxxxx");
Config stormConf = new Config();
stormConf.put(Config.TOPOLOGY_DEBUG, false);
stormConf.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 1000 * 5);
stormConf.put(Config.TOPOLOGY_WORKERS, 1);
stormConf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 5);
stormConf.put(Config.TOPOLOGY_TASKS, 1);
TridentKafkaConfig actSpoutConf = new TridentKafkaConfig(zk, topic);
actSpoutConf.fetchSizeBytes = 5 * 1024 * 1024 ;
actSpoutConf.bufferSizeBytes = 5 * 1024 * 1024 ;
actSpoutConf.scheme = new SchemeAsMultiScheme(scheme);
actSpoutConf.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
TridentTopology topology = new TridentTopology();
TransactionalTridentKafkaSpout actSpout = new TransactionalTridentKafkaSpout(actSpoutConf);
topology.newStream(topic, actSpout).parallelismHint(4).shuffle()
.each(new Fields("act"), new PrintFunction(), new Fields());
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(topic+"Topology", stormConf, topology.build());
}}
还有另一个问题,为什么kryo方案只能读取一个消息缓冲区。有没有其他方法得到多个消息缓冲区,然后可以批量发送数据到下一个blot。
另外,如果我发送一个消息,整个流程似乎成功。
那么发送2条消息是错误的。打印出来的信息如下:
56157 [Thread-18-spout0] INFO s.s.a.s.s.c.KryoScheme - 2016-02- 05T17:20:48.122+0800,T6mdfEW@N5pEtNBW
56160 [Thread-20-b-0] INFO s.s.a.s.s.PrintFunction - Scheme data size: 1
56160 [Thread-18-spout0] INFO s.s.a.s.s.c.KryoScheme - 2016-02- 05T17:20:48.282+0800,T(o2KnFxtGB0Tlp8
56161 [Thread-20-b-0] INFO s.s.a.s.s.PrintFunction - Thread-20-b-0.99--value: 2016-02-05T17:20:48.282+0800,T(o2KnFxtGB0Tlp8
56162 [Thread-20-b-0] INFO s.s.a.s.s.PrintFunction - Scheme data size: 1
56162 [Thread-20-b-0] INFO s.s.a.s.s.PrintFunction - Thread-20-b-0.99--value: 2016-02-05T17:20:48.282+0800,T(o2KnFxtGB0Tlp8
1条答案
按热度按时间g0czyy6m1#
对不起,这是我的错误。刚刚在kryo反序列化类中发现一个bug,由于存在一个局部作用域参数,所以在多线程环境下可能会被重写。在party作用域中不更改参数,代码运行良好。
参考代码见: