如何在java中序列化executorservice?

6qftjkof  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(306)

我创建了一个countminsketch来计算一些值的最小频率。我正在使用executorservice异步更新草图。我在我的flink项目中使用这个类,它需要是可序列化的,所以我实现了可序列化接口。但是,这还不够,因为executorservice还需要可序列化。如何以可序列化的方式使用executorservice?或者executorservice是否有可序列化的实现?

import java.io.Serializable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CountMinSketch implements Serializable {

    private static final long serialVersionUID = 1123747953291780413L;

    private static final int H1 = 0;
    private static final int H2 = 1;
    private static final int H3 = 2;
    private static final int H4 = 3;
    private static final int LIMIT = 100;
    private final int[][] sketch = new int[4][LIMIT];

    final NaiveHashFunction h1 = new NaiveHashFunction(11, 9);
    final NaiveHashFunction h2 = new NaiveHashFunction(17, 15);
    final NaiveHashFunction h3 = new NaiveHashFunction(31, 65);
    final NaiveHashFunction h4 = new NaiveHashFunction(61, 101);

    private ExecutorService executor = Executors.newSingleThreadExecutor();

    public CountMinSketch() {
        // initialize sketch
    }

    public Future<Boolean> updateSketch(String value) {
        return executor.submit(() -> {
            sketch[H1][h1.getHashValue(value)]++;
            sketch[H2][h2.getHashValue(value)]++;
            sketch[H3][h3.getHashValue(value)]++;
            sketch[H4][h4.getHashValue(value)]++;
            return true;
        });
    }

    public Future<Boolean> updateSketch(String value, int count) {
        return executor.submit(() -> {
            sketch[H1][h1.getHashValue(value)] = sketch[H1][h1.getHashValue(value)] + count;
            sketch[H2][h2.getHashValue(value)] = sketch[H2][h2.getHashValue(value)] + count;
            sketch[H3][h3.getHashValue(value)] = sketch[H3][h3.getHashValue(value)] + count;
            sketch[H4][h4.getHashValue(value)] = sketch[H4][h4.getHashValue(value)] + count;
            return true;
        });
    }

    public int getFrequencyFromSketch(String value) {
        int valueH1 = sketch[H1][h1.getHashValue(value)];
        int valueH2 = sketch[H2][h2.getHashValue(value)];
        int valueH3 = sketch[H3][h3.getHashValue(value)];
        int valueH4 = sketch[H4][h4.getHashValue(value)];
        return findMinimum(valueH1, valueH2, valueH3, valueH4);
    }

    private int findMinimum(final int a, final int b, final int c, final int d) {
        return Math.min(Math.min(a, b), Math.min(c, d));
    }
}

import java.io.Serializable;

public class NaiveHashFunction implements Serializable {

    private static final long serialVersionUID = -3460094846654202562L;
    private final static int LIMIT = 100;
    private long prime;
    private long odd;

    public NaiveHashFunction(final long prime, final long odd) {
        this.prime = prime;
        this.odd = odd;
    }

    public int getHashValue(final String value) {
        int hash = value.hashCode();
        if (hash < 0) {
            hash = Math.abs(hash);
        }
        return calculateHash(hash, prime, odd);
    }

    private int calculateHash(final int hash, final long prime, final long odd) {
        return (int) ((((hash % LIMIT) * prime) % LIMIT) * odd) % LIMIT;
    }
}

flink等级:

public static class AverageAggregator implements
            AggregateFunction<Tuple3<Integer, Tuple5<Integer, String, Integer, String, Integer>, Double>, Tuple3<Double, Long, Integer>, Tuple2<String, Double>> {

        private static final long serialVersionUID = 7233937097358437044L;
        private String functionName;
        private CountMinSketch countMinSketch = new CountMinSketch();
.....
}

错误:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the AggregateFunction is not serializable. The object probably contains or references non serializable fields.
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559)
    at org.apache.flink.streaming.api.datastream.WindowedStream.aggregate(WindowedStream.java:811)
    at org.apache.flink.streaming.api.datastream.WindowedStream.aggregate(WindowedStream.java:730)
    at org.apache.flink.streaming.api.datastream.WindowedStream.aggregate(WindowedStream.java:701)
    at org.sense.flink.examples.stream.MultiSensorMultiStationsReadingMqtt2.<init>(MultiSensorMultiStationsReadingMqtt2.java:39)
    at org.sense.flink.App.main(App.java:141)
Caused by: java.io.NotSerializableException: java.util.concurrent.Executors$FinalizableDelegatedExecutorService
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
    ... 6 more
v09wglhw

v09wglhw1#

通常不序列化功能组件,只序列化数据。我真的不知道你想做什么,但是如果你用 @Transient 注解,它应该起作用。

k10s72fa

k10s72fa2#

ExecutorService 包含无法序列化的状态。特别是工作线程。。。并且他们正在处理的任务的状态永远无法使用标准对象序列化类进行序列化。
如果你真的不需要序列化 ExecutorService ,可以将引用它的变量标记为 transient ... 阻止它意外地被连载。
可以想象,您可以序列化 ExecutorService 的工作队列。但是序列化一个正在执行的任务需要实现一个自定义机制来检查任务的状态 Callable / Runnable ... 当它运行时。
如果您试图将自身序列化为检查点计算的机制,则可能是找错了树。序列化无法捕获线程堆栈上的状态。

相关问题