在研究了几天为什么我的flink应用程序不能正常工作之后,我得出了一个结论:这个问题存在于 MinMaxPriorityQueue
我正在使用。
似乎这个结构是不可序列化的。我尝试了几种序列化方法:
env.getConfig.registerTypeWithKryoSerializer(classOf[MinMaxPriorityQueue[Double]], classOf[JavaSerializer])
env.getConfig.registerTypeWithKryoSerializer(classOf[MinMaxPriorityQueue[java.lang.Double]], classOf[ProtobufSerializer]);
env.getConfig().addDefaultKryoSerializer(MyCustomType.class, TBaseSerializer.class);
他们都不走运。
然而我发现:序列化Guava的不可变
有没有与minmaxpriorityqueue等价的方法,或者序列化它的方法?
更新
我把tomasz翻译成scala:
class MinMaxPriorityQueueSerializer extends Serializer[MinMaxPriorityQueue[Object]] {
private[this] val log = LoggerFactory.getLogger(this.getClass)
setImmutable(false)
setAcceptsNull(false)
val OPTIMIZE_POSITIVE = true
override def read(kryo: Kryo, input: Input, aClass: Class[MinMaxPriorityQueue[Object]]): MinMaxPriorityQueue[Object] = {
log.error("Kryo READ")
val comparator: Ordering[Object] = kryo.readClassAndObject(input).asInstanceOf[Ordering[Object]]
val size = input.readInt(OPTIMIZE_POSITIVE)
val queue: MinMaxPriorityQueue[Object] = MinMaxPriorityQueue.orderedBy(comparator)
.expectedSize(size)
.create()
(0 to size).foreach(_ => queue.offer(kryo.readClassAndObject(input)))
queue
}
override def write(kryo: Kryo, output: Output, queue: MinMaxPriorityQueue[Object]): Unit = {
log.error("Kryo WRITE")
kryo.writeClassAndObject(output, queue.comparator)
val declaredSize = queue.size
output.writeInt(declaredSize, OPTIMIZE_POSITIVE)
val actualSize = queue.toArray.foldLeft(0) {
case (z, q) =>
kryo.writeClassAndObject(output, q)
z + 1
}
Preconditions.checkState(
declaredSize == actualSize,
"Declared size (%s) different than actual size (%s)", declaredSize, actualSize)
}
}
并在flink中设置kryo以使用序列化程序:
env.getConfig.addDefaultKryoSerializer(classOf[MinMaxPriorityQueue[Double]], classOf[MinMaxPriorityQueueSerializer])
env.getConfig.registerTypeWithKryoSerializer(classOf[MinMaxPriorityQueue[Double]], classOf[MinMaxPriorityQueueSerializer])
然而,它似乎从未被调用,因为我在日志的任何地方都看不到 log.error("Kryo READ")
以及 log.error("Kryo WRITE")
转换仍然返回一个空的minmaxpriorityqueue,即使我正在更新它。
更新2
我已经实现了serializertester,但是我得到了一个bufferunderflow:
object Main {
def main(args: Array[String]) {
val tester = new MinMaxPriorityQueueSerializerTester()
val inQueue: MinMaxPriorityQueue[java.lang.Double] = MinMaxPriorityQueue.create()
inQueue.add(1.0)
val outputStream = new ByteArrayOutputStream()
tester.serialize(outputStream, inQueue)
val inputStream = new ByteArrayInputStream(outputStream.toByteArray())
val outQueue: MinMaxPriorityQueue[java.lang.Double] = tester.deserialize(inputStream);
System.out.println(inQueue);
System.out.println(outQueue);
}
class MinMaxPriorityQueueSerializerTester {
val kryo = new Kryo
kryo.setInstantiatorStrategy(new StdInstantiatorStrategy)
registerMinMaxSerializer();
// allowForClassesWithoutNoArgConstructor(); // needed to serialize Ordering
def registerMinMaxSerializer() {
kryo.addDefaultSerializer(classOf[MinMaxPriorityQueue[java.lang.Double]], new MinMaxPriorityQueueSerializer());
}
def serialize(out: OutputStream, queue: MinMaxPriorityQueue[java.lang.Double]) {
// try (Output output = new Output(out)) {
val output = new Output(out)
kryo.writeClassAndObject(output, queue)
// kryo.writeObject(output, queue)
//}
output.flush
}
def deserialize(in: InputStream): MinMaxPriorityQueue[java.lang.Double] = {
//try (Input input = new Input(in)) {
val input = new Input(in)
//kryo.readObject(input, classOf[MinMaxPriorityQueue[java.lang.Double]])
kryo.readClassAndObject(input).asInstanceOf[MinMaxPriorityQueue[java.lang.Double]]
//p}
}
}
2条答案
按热度按时间bgtovc5b1#
你可以用一个定制的kryo
Serializer
.下面是一个示例(java):
以下是如何使用它:
flvlnr442#
我最终放弃了,尝试使用一种不同的数据结构,并使其可序列化
java.io.Serializable
.这个数据结构是在这里实现的intervalheap,我只是在我的项目中使它可以序列化。
现在一切正常。