gpt4 book ai didi

scala - 如何在 Apache Flink 中使用 Scala XML?

转载 作者:行者123 更新时间:2023-12-04 02:28:49 24 4
gpt4 key购买 nike

我正在尝试在 Flink 中使用 Scala XML 库来解析 XML,但我无法使其工作。请注意,我需要在同一个处理函数中对我的代码使用序列化和非序列化(字符串)版本。

我已经尝试过不同的解决方案,它们总是在 IntelliJ 中工作,但当我在 Flink 集群上运行它们时却不是。他们总是返回不同的 java.lang.LinkageError: com/sun/org/apache/xerces/internal/jaxp/SAXParserImpl$JAXPSAXParser ;我尝试了多种方法,但仍然遇到与此类似的错误。

这是我的 Flink 作业的示例:

object StreamingJob {
import org.apache.flink.streaming.api.scala._

val l = List(
"""<ciao>ciao</ciao>""",
)

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// set up kafka section excluded
env.setParallelism(10)

val stream = env.fromCollection(l)

stream
.uid("process")
.map(new Processor)
.print

env.execute("Flink-TEST")
}
}

这是我的处理功能的一个例子:

import javax.xml.parsers.{SAXParser, SAXParserFactory}
import org.apache.flink.api.common.functions.MapFunction
import scala.xml.{Elem, XML}
import scala.xml.factory.XMLLoader

class Processor extends MapFunction[String, String] {
override def map(translatedMessage: String): String = {
val xml = Processor.xmlLoader.loadString(translatedMessage)
xml.toString
}
}
object Processor {
val factory: SAXParserFactory = SAXParserFactory.newInstance
val SAXParser: SAXParser = factory.newSAXParser
val xmlLoader: XMLLoader[Elem] = XML.withSAXParser(SAXParser)
}

最后这是我的 pom.xml,使用 maven-shade 插件制作我传递给 flink 的 jar:

        <!-- other sections of the pom are excluded -->
<properties>
<flink.version>1.7.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.8</scala.version>
</properties>
<!-- other sections of the pom are excluded -->
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Scala Library, provided by Flink as well. -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api-scala_2.12</artifactId>
<version>11.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-xml_2.12</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>
<!-- other sections of the pom are excluded -->
<build>
<plugins>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.mycompany.myproj.artifactId.default.StreamingJob</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>

<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<!-- Add src/main/scala to eclipse build path -->
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
<!-- Add src/test/scala to eclipse build path -->
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<!-- other sections of the pom are excluded -->

我相信这个问题在某种程度上与将用于 SAXParser 的实现有关。 Flink 在运行时使用的。我也尝试使用 @transient注释以防止从 Flink 持久化字段但没有成功。

但是,我对究竟发生了什么感到很困惑,有人知道如何防止错误以及出了什么问题吗?

最佳答案

过了一会儿,我弄清楚出了什么问题。

Scala XML 文档说:

In Scala 2.11 and later, add the following to your build.sbt file's libraryDependencies:


"org.scala-lang.modules" %% "scala-xml" % "1.1.1"

在 Maven 中翻译成:

<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-xml_2.12</artifactId>
<version>1.1.1</version>
</dependency>

好吧,似乎不需要这种依赖,因为即使 Flink 1.7.2 似乎使用 Scala 2.12.8,它仍然在他的发行版中保留了 Scala XML(因此在类路径中),我相信这会导致实际加载哪个类的问题如果正确,但这可能不是链接错误的解决方案。

联动错误的解决方法其实是使用Flink自带的 RichMapFunction[InputT, OutputT]:

class Processor extends RichMapFunction[String, String] {
var factory: SAXParserFactory = _
var SAXParser: SAXParser = _
var xmlLoader: XMLLoader[Elem] = _

override def open(parameters: Configuration): Unit = {
factory = SAXParserFactory.newInstance
SAXParser = factory.newSAXParser
xmlLoader = XML.withSAXParser(SAXParser)
}

override def map(translatedMessage: String): String = {
val xml = xmlLoader.loadString(translatedMessage)
xml.toString
}
}

正如 JavaDoc 所说:

Initialization method for the function.

It is called before the actual working methods (like map or join) and thus suitable for one time setup work. For functions that are part of an iteration, this method will be invoked at the beginning of each iteration superstep.



不幸的是,在这种情况下首选使用 var,因为值/变量的初始化需要由 Flink 处理,以防止在运行时出现链接错误。

一些注意事项:
  • 我意识到这可能只发生在 DataStream[T] 而不是 DataSet[T]
  • 作业需要将并行度设置为大于 1 以导致多个任务管理器加载同一个类,如果在 IDE 中完成它可能会很棘手 here 解释。
  • 注意到此问题的原因后,似乎伴生对象可能不适合 Flink 使用。
  • 最后一部分可能是放在 Flink 的“Scala API 扩展”页面中的一个很好的注释,它也解释了 Flink 通常不支持匿名模式匹配函数来解构元组,除非使用 Flink Scala API 扩展:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/scala_api_extensions.html
  • 关于scala - 如何在 Apache Flink 中使用 Scala XML?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54462742/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com