从valuestate获取数据时flink中的异常\u访问\u冲突

jv4diomz  于 2021-06-25  发布在  Flink
关注(0)|答案(0)|浏览(201)

在flink中从共享valuestate读取数据时,jvm中出现了一个莫名其妙的崩溃。我不确定我是否在做一些愚蠢的事情,是否在flink(或其他地方)中偶然发现了一个bug,或者它是否是预期的行为(尽管我怀疑“jre中的致命错误”是否是预期的)。有人能解释一下如何解决这个问题吗?
我收到的错误消息是:


# 

# A fatal error has been detected by the Java Runtime Environment:

# 

# EXCEPTION_ACCESS_VIOLATION (0xc0000005) at pc=0x0000000054e9a390, pid=6328, tid=0x00000000000002f8

# 

# JRE version: Java(TM) SE Runtime Environment (8.0_111-b14) (build 1.8.0_111-b14)

# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.111-b14 mixed mode windows-amd64 compressed oops)

# Problematic frame:

# C  [zip.dll+0xa390]

# 

# Failed to write core dump. Minidumps are not enabled by default on client versions of Windows

# 

# An error report file with more information is saved as:

# C:\Users\bjornper\eclipse_workspace\TestEnvironment\hs_err_pid6328.log

# 

# If you would like to submit a bug report, please visit:

# http://bugreport.java.com/bugreport/crash.jsp

# The crash happened outside the Java Virtual Machine in native code.

# See problematic frame for where to report the bug.

# 

我可以用以下程序重现坠机过程:
主要类别:

package flinkjvmcrash;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class MainMinimalCrash {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //env.setParallelism(1);

        DataStream<LogRow> logRowDataStream = env.addSource(new MyDataSource());

        logRowDataStream.keyBy("sourceId").flatMap(new Aggregator());

        env.execute("My Data Flow");
    }
}

聚合器:

package flinkjvmcrash;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

public class Aggregator extends RichFlatMapFunction<LogRow, LogRow> {

    // I need this class (or something similar) to process delta values between incoming LogRow objects
    public class AggregationData {
        public String release = "";
        public long timestamp = 0;
    }

    private transient ValueState<AggregationData> aggregationData;

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<AggregationData> descriptorAggregationData =
                new ValueStateDescriptor<AggregationData>(
                        "aggregationData",
                        TypeInformation.of(new TypeHint<AggregationData>() {}),
                        new AggregationData());
        aggregationData = getRuntimeContext().getState(descriptorAggregationData);
    }

    @Override
    public void flatMap(LogRow value, Collector<LogRow> out) throws Exception {
        AggregationData data = aggregationData.value(); // Commenting out this row makes the bug disappear

        // This function will of course do more work, but that is not relevant to the bug/crash
    }
}

数据源:

package flinkjvmcrash;

import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

public class MyDataSource extends RichSourceFunction<LogRow> {

    @Override
    public void run(SourceContext<LogRow> ctx) throws Exception {
        // produces 8000 data objects in quick succession
        for (int i = 0; i < 8000; i++) {
            LogRow logRow = new LogRow();
            ctx.collect(logRow);
        }
    }

    @Override
    public void cancel() {
    }
}

流中使用的数据对象:

package flinkjvmcrash;

public class LogRow {
    public String sourceId;
    public String release;
    public Long timestamp;

    public Integer lotsOfMoreFields;
}

我使用的环境是:
java 8
Flink1.1.3
现在有了所需的工具
系统信息:

OS: Windows 10.0 , 64 bit Build 14393 (10.0.14393.0)

CPU:total 4 (2 cores per cpu, 2 threads per core) family 6 model 60 stepping 3, cmov, cx8, fxsr, mmx, sse, sse2, sse3, ssse3, sse4.1, sse4.2, popcnt, avx, avx2, aes, clmul, erms, lzcnt, ht, tsc, tscinvbit, bmi1, bmi2

Memory: 4k page, physical 8299612k(4301944k free), swap 11052124k(6589788k free)

vm_info: Java HotSpot(TM) 64-Bit Server VM (25.111-b14) for windows-amd64 JRE (1.8.0_111-b14), built on Sep 22 2016 19:24:05 by "java_re" with MS VC++ 10.0 (VS2010)

编辑1:
通过改变 AggregationData 内部类(在 ValueState )到 Tuple2<String, Long> 相反,问题不再出现。
编辑2:
现在是我第一次发布问题的第二天,同样的代码不再重现问题。。。电脑已经在晚上关机了,但除此之外,我不知道为什么问题现在没有出现(即使昨天我可以重现问题,即使重新启动…)
根据@rmetzer的请求,以下是 hs_err_pid6328.log 文件:https://gist.github.com/plankton555/e6ba1224b34035e91c5d5933f1c73549

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题