为什么使用kryo serialize框架到apachestorm会在blot get值时重写数据

cvxl0en2  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(299)

在kafka和apachestorm方案中,可能大部分开发都是使用avro作为序列化框架。但是我需要处理最复杂的数据,然后我发现kryo序列化框架也成功地将其集成到我们的项目中,该项目遵循kafka和apachestorm环境。但想进一步手术时却出现了一种奇怪的状态。
我已经给Kafka发了5次短信,风暴作业也能读取这5条短信并反序列化成功。但是下一个blot得到的数据值是错误的。打印出与上一条消息相同的值。然后我在完成反序列化代码后添加了打印输出。实际上打印出来的是真的,有5条不同的信息。为什么下一个污点不能改变这些价值观?请参见下面的代码:
kryoscheme.java文件

public abstract class KryoScheme<T> implements Scheme {

private static final long serialVersionUID = 6923985190833960706L;

private static final Logger logger = LoggerFactory.getLogger(KryoScheme.class);

private Class<T> clazz;
private Serializer<T> serializer;

public KryoScheme(Class<T> clazz, Serializer<T> serializer) {
    this.clazz = clazz;
    this.serializer = serializer;
}

@Override
public List<Object> deserialize(byte[] buffer) {
    Kryo kryo = new Kryo();
    kryo.register(clazz, serializer);
    T scheme = null;
    try {
        scheme = kryo.readObject(new Input(new ByteArrayInputStream(buffer)), this.clazz);
        logger.info("{}", scheme);
    } catch (Exception e) {
        String errMsg = String.format("Kryo Scheme failed to deserialize data from Kafka to %s. Raw: %s",
                clazz.getName(), 
                new String(buffer));
        logger.error(errMsg, e);
        throw new FailedException(errMsg, e);
    }

    return new Values(scheme);
}}

打印函数.java

public class PrintFunction extends BaseFunction {

private static final Logger logger = LoggerFactory.getLogger(PrintFunction.class);

@Override
public void execute(TridentTuple tuple, TridentCollector collector) {

    List<Object> data = tuple.getValues();

    if (data != null) {
        logger.info("Scheme data size: {}", data.size());
        for (Object value : data) {
            PrintOut out = (PrintOut) value;
            logger.info("{}.{}--value: {}",
                    Thread.currentThread().getName(),
                    Thread.currentThread().getId(),
                    out.toString());

            collector.emit(new Values(out));
        }
    }

}}

stormlocaltopology.java文件

public class StormLocalTopology {

public static void main(String[] args) {

    ........

    BrokerHosts zk = new ZkHosts("xxxxxx");
    Config stormConf = new Config();
    stormConf.put(Config.TOPOLOGY_DEBUG, false);
    stormConf.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 1000 * 5);
    stormConf.put(Config.TOPOLOGY_WORKERS, 1);
    stormConf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 5);
    stormConf.put(Config.TOPOLOGY_TASKS, 1);

    TridentKafkaConfig actSpoutConf = new TridentKafkaConfig(zk, topic);
    actSpoutConf.fetchSizeBytes =  5 * 1024 * 1024 ;
    actSpoutConf.bufferSizeBytes = 5 * 1024 * 1024 ;
    actSpoutConf.scheme = new SchemeAsMultiScheme(scheme);

    actSpoutConf.startOffsetTime = kafka.api.OffsetRequest.LatestTime();

    TridentTopology topology = new TridentTopology();
    TransactionalTridentKafkaSpout actSpout = new TransactionalTridentKafkaSpout(actSpoutConf);

    topology.newStream(topic, actSpout).parallelismHint(4).shuffle()
            .each(new Fields("act"), new PrintFunction(), new Fields());

    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology(topic+"Topology", stormConf,  topology.build());
}}

还有另一个问题,为什么kryo方案只能读取一个消息缓冲区。有没有其他方法得到多个消息缓冲区,然后可以批量发送数据到下一个blot。
另外,如果我发送一个消息,整个流程似乎成功。
那么发送2条消息是错误的。打印出来的信息如下:

56157 [Thread-18-spout0] INFO  s.s.a.s.s.c.KryoScheme - 2016-02-   05T17:20:48.122+0800,T6mdfEW@N5pEtNBW
56160 [Thread-20-b-0] INFO  s.s.a.s.s.PrintFunction - Scheme data size: 1
56160 [Thread-18-spout0] INFO  s.s.a.s.s.c.KryoScheme - 2016-02-    05T17:20:48.282+0800,T(o2KnFxtGB0Tlp8
56161 [Thread-20-b-0] INFO  s.s.a.s.s.PrintFunction - Thread-20-b-0.99--value: 2016-02-05T17:20:48.282+0800,T(o2KnFxtGB0Tlp8
56162 [Thread-20-b-0] INFO  s.s.a.s.s.PrintFunction - Scheme data size: 1
56162 [Thread-20-b-0] INFO  s.s.a.s.s.PrintFunction - Thread-20-b-0.99--value: 2016-02-05T17:20:48.282+0800,T(o2KnFxtGB0Tlp8
g0czyy6m

g0czyy6m1#

对不起,这是我的错误。刚刚在kryo反序列化类中发现一个bug,由于存在一个局部作用域参数,所以在多线程环境下可能会被重写。在party作用域中不更改参数,代码运行良好。
参考代码见:

public class KryoSerializer<T extends BasicEvent> extends Serializer<T> implements Serializable {

private static final long serialVersionUID = -4684340809824908270L;

// It's wrong set

//private T event; 

public KryoSerializer(T event) {
    this.event = event;
}

@Override
public void write(Kryo kryo, Output output, T event) {
    event.write(output);
}

@Override
public T read(Kryo kryo, Input input, Class<T> type) {
    T event = new T();
    event.read(input);
    return event;
}
}

相关问题