我正在尝试使用flink中的scala xml库来解析xml,但我无法让它工作。请注意,我需要在同一个处理函数中对代码使用序列化和非序列化(字符串)版本。
我已经尝试了不同的解决方案,它们总是在intellij中工作,但当我在flink集群上运行它们时就不行了。他们的回报总是不同的 java.lang.LinkageError: com/sun/org/apache/xerces/internal/jaxp/SAXParserImpl$JAXPSAXParser
; 我尝试了很多种方法,但仍然得到类似的错误。
这是我的flink工作的一个例子:
object StreamingJob {
import org.apache.flink.streaming.api.scala._
val l = List(
"""<ciao>ciao</ciao>""",
)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// set up kafka section excluded
env.setParallelism(10)
val stream = env.fromCollection(l)
stream
.uid("process")
.map(new Processor)
.print
env.execute("Flink-TEST")
}
}
这是我的处理函数的一个例子:
import javax.xml.parsers.{SAXParser, SAXParserFactory}
import org.apache.flink.api.common.functions.MapFunction
import scala.xml.{Elem, XML}
import scala.xml.factory.XMLLoader
class Processor extends MapFunction[String, String] {
override def map(translatedMessage: String): String = {
val xml = Processor.xmlLoader.loadString(translatedMessage)
xml.toString
}
}
object Processor {
val factory: SAXParserFactory = SAXParserFactory.newInstance
val SAXParser: SAXParser = factory.newSAXParser
val xmlLoader: XMLLoader[Elem] = XML.withSAXParser(SAXParser)
}
最后,这是我的pom.xml,使用maven shade插件制作我传递给flink的jar:
<!-- other sections of the pom are excluded -->
<properties>
<flink.version>1.7.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.8</scala.version>
</properties>
<!-- other sections of the pom are excluded -->
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Scala Library, provided by Flink as well. -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api-scala_2.12</artifactId>
<version>11.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-xml_2.12</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>
<!-- other sections of the pom are excluded -->
<build>
<plugins>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.mycompany.myproj.artifactId.default.StreamingJob</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<!-- Add src/main/scala to eclipse build path -->
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
<!-- Add src/test/scala to eclipse build path -->
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<!-- other sections of the pom are excluded -->
我相信这个问题在某种程度上与将用于未来的实现有关 SAXParser
flink在运行时使用的。我也试过用 @transient
注解以防止从flink中持久化字段,但没有成功。
然而,我很困惑到底发生了什么,有人知道如何防止错误和哪里出错了吗?
1条答案
按热度按时间pkbketx91#
过了一段时间,我才发现它出了什么问题。
scala xml文档说:
在scala 2.11及更高版本中,将以下内容添加到build.sbt文件的librarydependencies中:
这在maven中可以理解为:
看起来不需要这个依赖,因为即使flink 1.7.2似乎使用scala 2.12.8,它仍然在其发行版中保留scala xml(因此在类路径中),我相信这可能会导致实际加载到哪个类中的问题,如果正确的话,这可能不是解决链接错误的方法。
解决连杆机构误差的方法实际上是使用Flink自己的方法
RichMapFunction[InputT, OutputT]
:正如javadoc所说:
函数的初始化方法。
它在实际工作方法(如map或join)之前调用,因此适合于一次性设置工作。对于作为迭代一部分的函数,此方法将在每个迭代步骤的开始处调用。
不幸的是
var
在这种情况下,最好是由flink处理值/变量的初始化,以防止运行时出现链接错误。注意事项:
我意识到这可能只发生在
DataStream[T]
而不是开着DataSet[T]
.作业需要将并行度设置为大于1,才能导致多个任务管理器加载同一个类,如果在IDE中完成,可能会很棘手,这里已经解释过。
在注意到这个问题的原因之后,似乎同伴对象对于flink的使用并不理想。
最后一部分可能是flink的“scala api扩展”页面中的一个很好的注解,其中还解释了除非使用flink scala api扩展,否则flink通常不支持匿名模式匹配函数来解构元组:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/scala_api_extensions.html