尝试使用apache flink使用twitter流并将其平面Map到case类,一旦它尝试转换tweet,就会失败
java.lang.NoClassDefFoundError: scala/Product$class
版本: <flink.version>1.7.0</flink.version>
<scala.binary.version>2.11</scala.binary.version> <scala.version>2.11.12</scala.version>
已经尝试过更新scala版本,但是没有成功。下面是源代码。
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.twitter.TwitterSource
import org.apache.flink.util.Collector
import org.vj.distributed.Models.Tweet
object TwitterStreaming {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val params = ParameterTool.fromArgs(args)
env.getConfig.setGlobalJobParameters(params)
env.setParallelism(params.getInt("parallelism", 1))
val streamSource: DataStream[String] = env.addSource(new TwitterSource(params.getProperties))
streamSource.flatMap(new ProcessTweetAndPrepareForWrite).print()
env.execute("TwitterStreaming with Apache Flink")
}
private class ProcessTweetAndPrepareForWrite() extends FlatMapFunction[String, Tweet] {
lazy val jsonParser = new ObjectMapper()
override def flatMap(value: String, out: Collector[Tweet]): Unit = {
val jsonNode = jsonParser.readValue(value, classOf[JsonNode])
val validTweet = jsonNode.has("user") && jsonNode.has("text") && jsonNode.has("geo") &&
jsonNode.get("user").has("lang") && jsonNode.get("user").get("lang").asText == "en" &&
jsonNode.get("user").has("followers_count") && jsonNode.get("user").get("followers_count").asInt() > 0 &&
jsonNode.get("user").has("geo_enabled") && jsonNode.get("user").get("geo_enabled").asBoolean().equals(true) &&
jsonNode.get("user").has("location") && !jsonNode.get("user").get("location").asText.equalsIgnoreCase("null") &&
jsonNode.get("geo").has("coordinates") && !jsonNode.get("geo").get("coordinates").asText.equalsIgnoreCase("null")
(validTweet, jsonNode) match {
case (true, node) =>
val location = node.get("user").get("location").asText()
val followers_count = node.get("user").get("followers_count").asInt()
val friends_count = if (node.get("user").has("friends_count")) node.get("user").get("friends_count").asInt() else 0
val geo_enabled = node.get("user").get("geo_enabled").asBoolean()
val coordinatesArray = node.get("geo").get("coordinates")
import scala.collection.JavaConversions._
val latitude = coordinatesArray.head.asText()
val longitude = coordinatesArray.last.asText()
val text = jsonNode.get("text").asText()
out.collect(Tweet(location, followers_count, friends_count, geo_enabled, latitude, longitude, text))
case _ =>
}
}
}
}
2018-12-24 22:49:41799警告com.twitter.hbc.httpclient.clientbase
-FlinkTwitter源代码未捕获异常java.lang.noclassdeffounderror:scala/product$class位于org.vj.distributed.models$tweet.(models。scala:5)在org.vj.distributed.twitterstreaming$processtweetandprepareforwrite.flatmap(twitterstreaming.com)。scala:45)在org.vj.distributed.twitterstreaming$processtweetandprepareforwrite.flatmap(twitterstreaming。scala:23)在org.apache.flink.streaming.api.operators.streamflatmap.processelement(streamflatmap。java:50)在org.apache.flink.streaming.runtime.tasks.operatorchain$copyingchainingoutput.pushtooperator(operatorchain。java:579)在org.apache.flink.streaming.runtime.tasks.operatorchain$copyingchainingoutput.collect(operatorchain。java:554)在org.apache.flink.streaming.runtime.tasks.operatorchain$copyingchainingoutput.collect(operatorchain。java:534)位于org.apache.flink.streaming.api.operators.abstractstreamoperator$countingoutput.collect(abstractstreamoperator)。java:718)在org.apache.flink.streaming.api.operators.abstractstreamoperator$countingoutput.collect(abstractstreamoperator)。java:696)位于org.apache.flink.streaming.api.operators.streamsourcecontexts$nontimestampcontext.collect(streamsourcecontexts)。java:104)在org.apache.flink.streaming.connectors.twitter.twittersource$1.process(twittersource。java:147)在com.twitter.hbc.httpclient.connection.processresponse(连接。java:51)在com.twitter.hbc.httpclient.clientbase.processconnectiondata(clientbase。java:244)在com.twitter.hbc.httpclient.clientbase.run(clientbase。java:144)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1149)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:624)在java.lang.thread.run(线程。java:748)原因:java.lang.classnotfoundexception:java.net.urlclassloader.findclass(urlclassloader)处的scala.product$class。java:382)在java.lang.classloader.loadclass(classloader。java:424)在org.apache.flink.runtime.execution.librarycache.flinkusercodeclassloaders$childfirstclassloader.loadclass(flinkusercodeclassloaders)。java:120)在java.lang.classloader.loadclass(classloader。java:357) ... 还有17个
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.7.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.12</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-twitter_2.11</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.vj.distributed.TwitterStreaming</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.8</version>
<configuration>
<downloadSources>true</downloadSources>
<projectnatures>
<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
<projectnature>org.eclipse.jdt.core.javanature</projectnature>
</projectnatures>
<buildcommands>
<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
</buildcommands>
<classpathContainers>
<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
</classpathContainers>
<excludes>
<exclude>org.scala-lang:scala-library</exclude>
<exclude>org.scala-lang:scala-compiler</exclude>
</excludes>
<sourceIncludes>
<sourceInclude>**/*.scala</sourceInclude>
<sourceInclude>**/*.java</sourceInclude>
</sourceIncludes>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<!-- Add src/main/scala to eclipse build path -->
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
暂无答案!
目前还没有任何答案,快来回答吧!