我正在Flink1.9.1集群上运行一个流式作业,并试图将值的直方图输入到我们的prometheus度量收集器中。根据flink docs中的建议,我将dropwizard直方图实现与flink提供的 Package 一起使用,但是在将作业提交到集群时,它会崩溃,并出现以下回溯:
java.lang.LinkageError: loader constraint violation: when resolving method "org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper.<init>(Lcom/codahale/metrics/Histogram;)V" the class loader (instance of org/apache/flink/util/ChildFirstClassLoader) of the current class, com/example/foo/metrics/FooMeter, and the class loader (instance of sun/misc/Launcher$AppClassLoader) for the method's defining class, org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper, have different Class objects for the type com/codahale/metrics/Histogram used in the signature
at com.example.foo.metrics.FooMeter.<init>(FooMeter.scala:11)
at com.example.foo.transform.ValidFoos$.open(ValidFoos.scala:15)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:43)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
我在邮件列表中发现了类似的错误,但是使用 shadowJar
gradle的插件没用。
有什么我不知道的吗?
此处为相关代码:
import com.codahale.metrics.{Histogram, SlidingWindowReservoir}
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper
import org.apache.flink.metrics.{MetricGroup, Histogram => FlinkHistogram}
class FooMeter(metricGroup: MetricGroup, name: String) {
private var histogram: FlinkHistogram = metricGroup.histogram(
name, new DropwizardHistogramWrapper(new Histogram(new SlidingWindowReservoir(500))))
def record(fooValue: Long): Unit = {
histogram.update(fooValue)
}
}
object ValidFoos extends RichFlatMapFunction[Try[FooData], Foo] {
@transient private var fooMeter: FooMeter = _
override def open(parameters: Configuration): Unit = {
fooMeter = new FooMeter(getRuntimeContext.getMetricGroup, "foo_values")
}
override def flatMap(value: Try[FooData], out: Collector[FooData]): Unit = {
Transform.validFoo(value) foreach (foo => {
fooMeter.record(foo.value)
out.collect(foo)
})
}
}
内部版本.gradle:
plugins {
id 'scala'
id 'application'
id 'com.github.johnrengelman.shadow' version '2.0.4'
}
ext {
flinkVersion = "1.9.1"
scalaBinaryVersion = "2.11"
scalaVersion = "2.11.12"
}
dependencies {
implementation(
"org.apache.flink:flink-streaming-scala_${scalaBinaryVersion}:${flinkVersion}",
"org.apache.flink:flink-connector-kafka_${scalaBinaryVersion}:${flinkVersion}",
"org.apache.flink:flink-runtime-web_${scalaBinaryVersion}:${flinkVersion}",
"org.apache.flink:flink-json:${flinkVersion}"
"org.apache.flink:flink-metrics-dropwizard:${flinkVersion}",
"org.scala-lang:scala-library:${scalaVersion}",
)
}
shadowJar {
relocate("org.apache.flink.dropwizard", "com.example.foo.shaded.dropwizard")
relocate("com.codahale", "com.example.foo.shaded.codahale")
}
jar {
zip64 = true
archiveName = rootProject.name + '-all.jar'
manifest {
attributes('Main-Class': 'com.example.foo.Foo')
}
from {
configurations.compileClasspath.collect {
it.isDirectory() ? it : zipTree(it)
}
configurations.runtimeClasspath.collect {
it.isDirectory() ? it : zipTree(it)
}
}
}
更多信息:
在本地运行代码是可行的
flink群集是使用以下目录结构自定义编译的:
# find /usr/lib/flink/
/usr/lib/flink/
/usr/lib/flink/plugins
/usr/lib/flink/plugins/flink-metrics-influxdb-1.9.1.jar
/usr/lib/flink/plugins/flink-s3-fs-hadoop-1.9.1.jar
/usr/lib/flink/plugins/flink-metrics-graphite-1.9.1.jar
/usr/lib/flink/plugins/flink-metrics-prometheus-1.9.1.jar
/usr/lib/flink/plugins/flink-cep_2.11-1.9.1.jar
/usr/lib/flink/plugins/flink-python_2.11-1.9.1.jar
/usr/lib/flink/plugins/flink-queryable-state-runtime_2.11-1.9.1.jar
/usr/lib/flink/plugins/flink-sql-client_2.11-1.9.1.jar
/usr/lib/flink/plugins/flink-metrics-slf4j-1.9.1.jar
/usr/lib/flink/plugins/flink-state-processor-api_2.11-1.9.1.jar
/usr/lib/flink/plugins/flink-oss-fs-hadoop-1.9.1.jar
/usr/lib/flink/plugins/flink-metrics-statsd-1.9.1.jar
/usr/lib/flink/plugins/flink-swift-fs-hadoop-1.9.1.jar
/usr/lib/flink/plugins/flink-gelly-scala_2.11-1.9.1.jar
/usr/lib/flink/plugins/flink-azure-fs-hadoop-1.9.1.jar
/usr/lib/flink/plugins/flink-metrics-datadog-1.9.1.jar
/usr/lib/flink/plugins/flink-shaded-netty-tcnative-dynamic-2.0.25.Final-7.0.jar
/usr/lib/flink/plugins/flink-s3-fs-presto-1.9.1.jar
/usr/lib/flink/plugins/flink-cep-scala_2.11-1.9.1.jar
/usr/lib/flink/plugins/flink-gelly_2.11-1.9.1.jar
/usr/lib/flink/lib
/usr/lib/flink/lib/flink-metrics-influxdb-1.9.1.jar
/usr/lib/flink/lib/flink-metrics-graphite-1.9.1.jar
/usr/lib/flink/lib/flink-metrics-prometheus-1.9.1.jar
/usr/lib/flink/lib/flink-table_2.11-1.9.1.jar
/usr/lib/flink/lib/flink-metrics-slf4j-1.9.1.jar
/usr/lib/flink/lib/log4j-1.2.17.jar
/usr/lib/flink/lib/slf4j-log4j12-1.7.15.jar
/usr/lib/flink/lib/flink-metrics-statsd-1.9.1.jar
/usr/lib/flink/lib/flink-metrics-datadog-1.9.1.jar
/usr/lib/flink/lib/flink-table-blink_2.11-1.9.1.jar
/usr/lib/flink/lib/flink-dist_2.11-1.9.1.jar
/usr/lib/flink/bin/...
暂无答案!
目前还没有任何答案,快来回答吧!