java—使用XDB设置flink度量报告时的奇怪行为

wribegjk  于 2021-06-24  发布在  Flink
关注(0)|答案(0)|浏览(474)

当试图在kubernetes中的flink集群(1.9)上执行flink作业并将度量记录在influxdb时间序列数据库中时,发生了一系列非常奇怪的事件。
假设我们的工作非常简单:

// setup Kafka consumer
Properties kafkaConsumerProps = new Properties();
kafkaConsumerProps.setProperty("bootstrap.servers", ...);
kafkaConsumerProps.setProperty("group.id", ...);

FlinkKafkaConsumer<String> myConsumer =
      new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), kafkaConsumerProps);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000L);

// create direct kafka stream
DataStream<String> trafficEventStream = env.addSource(myConsumer);

trafficEventStream.map(new RichMapFunction<String, String>() {
    @Override
    public String map(String value) throws Exception {
          return value;
    }
});

env.execute("Traffic");

注意:这项工作其实并不重要,它已经被剥离到赤裸裸的骨头。
已在中设置群集的配置 flink-conf.yaml 根据文件(https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html):

metrics.reporter.influxdb.class: org.apache.flink.metrics.influxdb.InfluxdbReporter
metrics.reporter.influxdb.host: influxdb
metrics.reporter.influxdb.port: 8086
metrics.reporter.influxdb.db: flink
metrics.reporter.influxdb.username: flink-metrics
metrics.reporter.influxdb.password: qwerty

但是,当作业提交到集群时,日志会被以下错误消息污染:

2020-02-05 21:45:13,135 WARN  org.apache.flink.runtime.metrics.MetricRegistryImpl Error while reporting metrics 
 org.apache.flink.metrics.influxdb.shaded.org.influxdb.InfluxDBException\$UnableToParseException:partial write: unable to parse 'taskmanager_job_task_operator_reauthentication-latency-avg,host=flink-taskmanager-6484bdf6c5-ssmdc,job_id=45fd8a6e0dbae699a4fd810d5fecc65f,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ Custom\ Source,subtask_index=0,task_attempt_id=8fda43cec7b39138c4a5cc6f8738971f,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ Custom\ Source\ ->\ Map,tm_id=12f6e13572c00ec73a98734e4c5d307a value=� 1580939113059000000': invalid boolean
unable to parse 'taskmanager_job_task_operator_KafkaConsumer_sync-time-avg,host=flink-taskmanager-6484bdf6c5-ssmdc,job_id=45fd8a6e0dbae699a4fd810d5fecc65f,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ Custom\ Source,subtask_index=0,task_attempt_id=8fda43cec7b39138c4a5cc6f8738971f,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ Custom\ Source\ ->\ Map,tm_id=12f6e13572c00ec73a98734e4c5d307a value=� 1580939113059000000': invalid boolean
unable to parse 'taskmanager_job_task_operator_commit-latency-avg,host=flink-taskmanager-6484bdf6c5-ssmdc,job_id=45fd8a6e0dbae699a4fd810d5fecc65f,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ Custom\ Source,subtask_index=0,task_attempt_id=8fda43cec7b39138c4a5cc6f8738971f,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ Custom\ Source\ ->\ Map,tm_id=12f6e13572c00ec73a98734e4c5d307a value=� 1580939113059000000': invalid boolean
unable to parse 'taskmanager_job_task_operator_commit-latency-max,host=flink-taskmanager-6484bdf6c5-ssmdc,job_id=45fd8a6e0dbae699a4fd810d5fecc65f,job_name=Traffic,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=Source:\ Custom\ Source,subtask_index=0,task_attempt_id=8fda43cec7b39138c4a5cc6f8738971f,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ Custom\ Source\ ->\ Map,tm_id=12f6e13572c00ec73a98734e4c5d307a value=� 1580939113059000000': invalid boolean

这种情况持续了很长一段时间,然后:

at org.apache.flink.metrics.influxdb.shaded.org.influxdb.InfluxDBException.buildExceptionFromErrorMessage(InfluxDBException.java:147)
    at org.apache.flink.metrics.influxdb.shaded.org.influxdb.InfluxDBException.buildExceptionForErrorState(InfluxDBException.java:173)
    at org.apache.flink.metrics.influxdb.shaded.org.influxdb.impl.InfluxDBImpl.execute(InfluxDBImpl.java:796)
    at org.apache.flink.metrics.influxdb.shaded.org.influxdb.impl.InfluxDBImpl.write(InfluxDBImpl.java:455)
    at org.apache.flink.metrics.influxdb.InfluxdbReporter.report(InfluxdbReporter.java:101)
    at org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:436)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

看看XDB,当 show series 命令行参数在 flink 数据库中,列出了以下非常奇怪的系列名称:

jobmanager_Status_JVM_CPU_Load,host=flink-jobmanager
jobmanager_Status_JVM_CPU_Time,host=flink-jobmanager
jobmanager_Status_JVM_ClassLoader_ClassesLoaded,host=flink-jobmanager
jobmanager_Status_JVM_ClassLoader_ClassesUnloaded,host=flink-jobmanager
jobmanager_Status_JVM_GarbageCollector_Copy_Count,host=flink-jobmanager
jobmanager_Status_JVM_GarbageCollector_Copy_Time,host=flink-jobmanager
jobmanager_Status_JVM_GarbageCollector_MarkSweepCompact_Count,host=flink-jobmanager
jobmanager_Status_JVM_GarbageCollector_MarkSweepCompact_Time,host=flink-jobmanager
jobmanager_Status_JVM_Memory_Direct_Count,host=flink-jobmanager
jobmanager_Status_JVM_Memory_Direct_MemoryUsed,host=flink-jobmanager
jobmanager_Status_JVM_Memory_Direct_TotalCapacity,host=flink-jobmanager
jobmanager_Status_JVM_Memory_Heap_Committed,host=flink-jobmanager
jobmanager_Status_JVM_Memory_Heap_Max,host=flink-jobmanager
jobmanager_Status_JVM_Memory_Heap_Used,host=flink-jobmanager
jobmanager_Status_JVM_Memory_Mapped_Count,host=flink-jobmanager
jobmanager_Status_JVM_Memory_Mapped_MemoryUsed,host=flink-jobmanager
jobmanager_Status_JVM_Memory_Mapped_TotalCapacity,host=flink-jobmanager
jobmanager_Status_JVM_Memory_NonHeap_Committed,host=flink-jobmanager
jobmanager_Status_JVM_Memory_NonHeap_Max,host=flink-jobmanager
jobmanager_Status_JVM_Memory_NonHeap_Used,host=flink-jobmanager
jobmanager_Status_JVM_Threads_Count,host=flink-jobmanager
jobmanager_job_downtime,host=flink-jobmanager,job_id=078a136e99f5028671744fe4da4ef814,job_name=Traffic
jobmanager_job_downtime,host=flink-jobmanager,job_id=2b4aa6d82aea381721d435fa36f56afc,job_name=Traffic
jobmanager_job_downtime,host=flink-jobmanager,job_id=45fd8a6e0dbae699a4fd810d5fecc65f,job_name=Traffic
jobmanager_job_downtime,host=flink-jobmanager,job_id=613e99628c06c38211fb63d31afe8f0f,job_name=Traffic
jobmanager_job_downtime,host=flink-jobmanager,job_id=d4ead0ce5e17397c03969a6c89790f54,job_name=Traffic
jobmanager_job_downtime,host=flink-jobmanager,job_id=e977b10e9ed0f236bb154515c708682b,job_name=Traffic
jobmanager_job_fullRestarts,host=flink-jobmanager,job_id=078a136e99f5028671744fe4da4ef814,job_name=Traffic

越往南走就越奇怪:

taskmanager_job_task_Shuffle_Netty_Output_numBuffersInRemotePerSecond,host=flink-taskmanager-6484bdf6c5-ssmdc,job_id=d4ead0ce5e17397c03969a6c89790f54,job_name=Traffic,subtask_index=0,task_attempt_id=ef8aae2b44854021f18aeb4707babd06,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ Custom\ Source\ ->\ Map,tm_id=12f6e13572c00ec73a98734e4c5d307a
taskmanager_job_task_Shuffle_Netty_Output_numBuffersInRemotePerSecond,host=flink-taskmanager-6484bdf6c5-ssmdc,job_id=e977b10e9ed0f236bb154515c708682b,job_name=Traffic,subtask_index=0,task_attempt_id=2c96da26392011d06220871513bd5f8b,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ Custom\ Source\ ->\ Map,tm_id=12f6e13572c00ec73a98734e4c5d307a
taskmanager_job_task_Shuffle_Netty_Output_numBytesInLocal,host=flink-taskmanager-6484bdf6c5-kzq2h,job_id=2b4aa6d82aea381721d435fa36f56afc,job_name=Traffic,subtask_index=0,task_attempt_id=886a48ce0db510645a48a5541038fe89,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ Custom\ Source\ ->\ Map,tm_id=7e175a73b9a3ae1c3a8d748982530aa2
taskmanager_job_task_Shuffle_Netty_Output_numBytesInLocal,host=flink-taskmanager-6484bdf6c5-kzq2h,job_id=e977b10e9ed0f236bb154515c708682b,job_name=Traffic,subtask_index=0,task_attempt_id=fc071b097d73d8ed9a1688a3d8251cf4,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ Custom\ Source\ ->\ Map,tm_id=7e175a73b9a3ae1c3a8d748982530aa2
taskmanager_job_task_Shuffle_Netty_Output_numBytesInLocal,host=flink-taskmanager-6484bdf6c5-ssmdc,job_id=078a136e99f5028671744fe4da4ef814,job_name=Traffic,subtask_index=0,task_attempt_id=e791473d96872ce886a60aa547f139e4,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ Custom\ Source\ ->\ Map,tm_id=12f6e13572c00ec73a98734e4c5d307a
taskmanager_job_task_Shuffle_Netty_Output_numBytesInLocal,host=flink-taskmanager-6484bdf6c5-ssmdc,job_id=45fd8a6e0dbae699a4fd810d5fecc65f,job_name=Traffic,subtask_index=0,task_attempt_id=8fda43cec7b39138c4a5cc6f8738971f,task_attempt_num=0,task_id=cbc357ccb763df2852fee8c4fc7d55f2,task_name=Source:\ Custom\ Source\ ->\ Map,tm_id=12f6e13572c00ec73a98734e4c5d307a

你知道这是什么原因吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题