如何将scalaxml与apacheflink结合使用?

k75qkfdt  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(392)

我正在尝试使用flink中的scala xml库来解析xml,但我无法让它工作。请注意,我需要在同一个处理函数中对代码使用序列化和非序列化(字符串)版本。
我已经尝试了不同的解决方案,它们总是在intellij中工作,但当我在flink集群上运行它们时就不行了。他们的回报总是不同的 java.lang.LinkageError: com/sun/org/apache/xerces/internal/jaxp/SAXParserImpl$JAXPSAXParser ; 我尝试了很多种方法,但仍然得到类似的错误。
这是我的flink工作的一个例子:

object StreamingJob {
  import org.apache.flink.streaming.api.scala._

  val l = List(
    """<ciao>ciao</ciao>""",
  )

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // set up kafka section excluded
    env.setParallelism(10)

    val stream = env.fromCollection(l)

    stream
      .uid("process")
      .map(new Processor)
      .print

    env.execute("Flink-TEST")
  }
}

这是我的处理函数的一个例子:

import javax.xml.parsers.{SAXParser, SAXParserFactory}
import org.apache.flink.api.common.functions.MapFunction
import scala.xml.{Elem, XML}
import scala.xml.factory.XMLLoader

class Processor extends MapFunction[String, String] {
  override def map(translatedMessage: String): String = {
    val xml = Processor.xmlLoader.loadString(translatedMessage)
    xml.toString
  }
}
object Processor {
  val factory: SAXParserFactory = SAXParserFactory.newInstance
  val SAXParser: SAXParser = factory.newSAXParser
  val xmlLoader: XMLLoader[Elem] = XML.withSAXParser(SAXParser)
}

最后,这是我的pom.xml,使用maven shade插件制作我传递给flink的jar:

<!-- other sections of the pom are excluded -->
        <properties>
            <flink.version>1.7.0</flink.version>
            <scala.binary.version>2.12</scala.binary.version>
            <scala.version>2.12.8</scala.version>
        </properties>
        <!-- other sections of the pom are excluded -->
    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- Scala Library, provided by Flink as well. -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-xml</artifactId>
            <version>2.9.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>1.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.11.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.11.1</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-yaml</artifactId>
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api-scala_2.12</artifactId>
            <version>11.0</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang.modules</groupId>
            <artifactId>scala-xml_2.12</artifactId>
            <version>1.1.1</version>
        </dependency>
    </dependencies>
        <!-- other sections of the pom are excluded -->
<build>
        <plugins>
            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.mycompany.myproj.artifactId.default.StreamingJob</mainClass>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <!-- Scala Compiler -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>build-helper-maven-plugin</artifactId>
                <version>1.7</version>
                <executions>
                    <!-- Add src/main/scala to eclipse build path -->
                    <execution>
                        <id>add-source</id>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>add-source</goal>
                        </goals>
                        <configuration>
                            <sources>
                                <source>src/main/scala</source>
                            </sources>
                        </configuration>
                    </execution>
                    <!-- Add src/test/scala to eclipse build path -->
                    <execution>
                        <id>add-test-source</id>
                        <phase>generate-test-sources</phase>
                        <goals>
                            <goal>add-test-source</goal>
                        </goals>
                        <configuration>
                            <sources>
                                <source>src/test/scala</source>
                            </sources>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
        <!-- other sections of the pom are excluded -->

我相信这个问题在某种程度上与将用于未来的实现有关 SAXParser flink在运行时使用的。我也试过用 @transient 注解以防止从flink中持久化字段,但没有成功。
然而,我很困惑到底发生了什么,有人知道如何防止错误和哪里出错了吗?

pkbketx9

pkbketx91#

过了一段时间,我才发现它出了什么问题。
scala xml文档说:
在scala 2.11及更高版本中,将以下内容添加到build.sbt文件的librarydependencies中:

"org.scala-lang.modules" %% "scala-xml" % "1.1.1"

这在maven中可以理解为:

<dependency>
    <groupId>org.scala-lang.modules</groupId>
    <artifactId>scala-xml_2.12</artifactId>
    <version>1.1.1</version>
</dependency>

看起来不需要这个依赖,因为即使flink 1.7.2似乎使用scala 2.12.8,它仍然在其发行版中保留scala xml(因此在类路径中),我相信这可能会导致实际加载到哪个类中的问题,如果正确的话,这可能不是解决链接错误的方法。
解决连杆机构误差的方法实际上是使用Flink自己的方法 RichMapFunction[InputT, OutputT] :

class Processor extends RichMapFunction[String, String] {
  var factory: SAXParserFactory = _
  var SAXParser: SAXParser = _
  var xmlLoader: XMLLoader[Elem] = _

  override def open(parameters: Configuration): Unit = {
    factory = SAXParserFactory.newInstance
    SAXParser = factory.newSAXParser
    xmlLoader = XML.withSAXParser(SAXParser)
  }

  override def map(translatedMessage: String): String = {
    val xml = xmlLoader.loadString(translatedMessage)
    xml.toString
  }
}

正如javadoc所说:
函数的初始化方法。
它在实际工作方法(如map或join)之前调用,因此适合于一次性设置工作。对于作为迭代一部分的函数,此方法将在每个迭代步骤的开始处调用。
不幸的是 var 在这种情况下,最好是由flink处理值/变量的初始化,以防止运行时出现链接错误。
注意事项:
我意识到这可能只发生在 DataStream[T] 而不是开着 DataSet[T] .
作业需要将并行度设置为大于1,才能导致多个任务管理器加载同一个类,如果在IDE中完成,可能会很棘手,这里已经解释过。
在注意到这个问题的原因之后,似乎同伴对象对于flink的使用并不理想。
最后一部分可能是flink的“scala api扩展”页面中的一个很好的注解,其中还解释了除非使用flink scala api扩展,否则flink通常不支持匿名模式匹配函数来解构元组:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/scala_api_extensions.html

相关问题