无法将spark连接到kafka

ubof19bj  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(393)

我在.NETCore中使用ApacheSpark
我试图连接spark流与Kafka,当我运行我的应用程序时,我得到以下错误。
日志:

Ivy Default Cache set to: C:\Users\MyUserAccount\.ivy2\cache
    The jars for the packages stored in: C:\Users\MyUserAccount\.ivy2\jars
    :: loading settings :: url = jar:file:/C:/bin/spark-2.4.1-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
    org.apache.spark#spark-sql-kafka-0-10_2.11 added as a dependency
    :: resolving dependencies :: org.apache.spark#spark-submit-parent-.
.
.
            ---------------------------------------------------------------------
            |                  |            modules            ||   artifacts   |
            |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
            ---------------------------------------------------------------------
            |      default     |   6   |   0   |   0   |   0   ||   6   |   0   |
            ---------------------------------------------------------------------
    :: retrieving :: org.apache.spark#spark-submit-parent-83ef8d1b-5e0e-420d-af99-18bdeeed2bc7
            confs: [default]
            0 artifacts copied, 6 already retrieved (0kB/15ms)
    20/08/15 15:32:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    20/08/15 15:32:07 INFO DotnetRunner: Starting DotnetBackend with dotnet.
    20/08/15 15:32:08 INFO DotnetRunner: Port number used by DotnetBackend is 57067
    20/08/15 15:32:08 INFO DotnetRunner: Adding key=spark.jars and value=file:///C:/Users/MyUserAccount/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.3.2.jar,file:///C:/Users/MyUserAccount/.ivy2/jars/org.apache.kafka_kafka-clients-0.10.0.1.jar,file:///C:/Users/MyUserAccount/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar,file:///C:/Users/MyUserAccount/.ivy2/jars/net.jpountz.lz4_lz4-1.3.0.jar,file:///C:/Users/MyUserAccount/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.2.6.jar,file:///C:/Users/MyUserAccount/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar,file:/C:/MyUserAccount/Projects/CEP/source/TradeValue/Asa.Cep.TradeValue/Asa.Cep.TradeValue.Processing/bin/Debug/netcoreapp3.1/microsoft-spark-2.4.x-0.10.0.jar to environment
    20/08/15 15:32:08 INFO DotnetRunner: Adding key=spark.app.name and value=org.apache.spark.deploy.dotnet.DotnetRunner to environment
    20/08/15 15:32:08 INFO DotnetRunner: Adding key=spark.submit.deployMode and value=client to environment
    20/08/15 15:32:08 INFO DotnetRunner: Adding key=spark.master and value=local to environment
    20/08/15 15:32:08 INFO DotnetRunner: Adding key=spark.repl.local.jars and value=file:///C:/Users/MyUserAccount/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.3.2.jar,file:///C:/Users/MyUserAccount/.ivy2/jars/org.apache.kafka_kafka-clients-0.10.0.1.jar,file:///C:/Users/MyUserAccount/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar,file:///C:/Users/MyUserAccount/.ivy2/jars/net.jpountz.lz4_lz4-1.3.0.jar,file:///C:/Users/MyUserAccount/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.2.6.jar,file:///C:/Users/MyUserAccount/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar to environment
    l1
    http://localhost:9092
    subscribe
    chat-message
    l2
    [2020-08-15T11:02:08.9293042Z] [MyUserAccount] [Info] [ConfigurationService] Using port 57067 for connection.
    [2020-08-15T11:02:08.9303643Z] [MyUserAccount] [Info] [JvmBridge] JvMBridge port is 57067
    20/08/15 15:32:09 INFO SparkContext: Running Spark version 2.4.1
    20/08/15 15:32:09 INFO SparkContext: Submitted application: StructuredKafkaWordCount
    20/08/15 15:32:09 INFO SecurityManager: Changing view acls to: MyUserAccount
    20/08/15 15:32:09 INFO SecurityManager: Changing modify acls to: MyUserAccount
    20/08/15 15:32:09 INFO SecurityManager: Changing view acls groups to:
    20/08/15 15:32:09 INFO SecurityManager: Changing modify acls groups to:
    20/08/15 15:32:09 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(MyUserAccount); groups with view permissions: Set(); users  with modify permissions: Set(MyUserAccount); groups with modify permissions: Set()
    20/08/15 15:32:09 INFO Utils: Successfully started service 'sparkDriver' on port 57073.
    20/08/15 15:32:09 INFO SparkEnv: Registering MapOutputTracker
    20/08/15 15:32:09 INFO SparkEnv: Registering BlockManagerMaster
    20/08/15 15:32:09 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
    20/08/15 15:32:09 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
    20/08/15 15:32:09 INFO DiskBlockManager: Created local directory at C:\Users\MyUserAccount\AppData\Local\Temp\blockmgr-4276b71f-affa-46bd-8a0a-fac55f0f825f
    20/08/15 15:32:09 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
    20/08/15 15:32:09 INFO SparkEnv: Registering OutputCommitCoordinator
    20/08/15 15:32:09 INFO Utils: Successfully started service 'SparkUI' on port 4040.
    20/08/15 15:32:09 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://MyUserAccount.asax.local:4040
    20/08/15 15:32:09 INFO SparkContext: Added JAR file:///C:/Users/MyUserAccount/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.3.2.jar at spark://MyUserAccount.asax.local:57073/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.3.2.jar with timestamp 1597489329693
    20/08/15 15:32:09 INFO SparkContext: Added JAR file:///C:/Users/MyUserAccount/.ivy2/jars/org.apache.kafka_kafka-clients-0.10.0.1.jar at spark://MyUserAccount.asax.local:57073/jars/org.apache.kafka_kafka-clients-0.10.0.1.jar with timestamp 1597489329694
    20/08/15 15:32:09 INFO SparkContext: Added JAR file:///C:/Users/MyUserAccount/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar at spark://MyUserAccount.asax.local:57073/jars/org.spark-project.spark_unused-1.0.0.jar with timestamp 1597489329696
    20/08/15 15:32:09 INFO SparkContext: Added JAR file:///C:/Users/MyUserAccount/.ivy2/jars/net.jpountz.lz4_lz4-1.3.0.jar at spark://MyUserAccount.asax.local:57073/jars/net.jpountz.lz4_lz4-1.3.0.jar with timestamp 1597489329697
    20/08/15 15:32:09 INFO SparkContext: Added JAR file:///C:/Users/MyUserAccount/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.2.6.jar at spark://MyUserAccount.asax.local:57073/jars/org.xerial.snappy_snappy-java-1.1.2.6.jar with timestamp 1597489329698
    20/08/15 15:32:09 INFO SparkContext: Added JAR file:///C:/Users/MyUserAccount/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar at spark://MyUserAccount.asax.local:57073/jars/org.slf4j_slf4j-api-1.7.16.jar with timestamp 1597489329699
    20/08/15 15:32:09 INFO SparkContext: Added JAR file:/C:/MyUserAccount/Projects/CEP/source/TradeValue/Asa.Cep.TradeValue/Asa.Cep.TradeValue.Processing/bin/Debug/netcoreapp3.1/microsoft-spark-2.4.x-0.10.0.jar at spark://MyUserAccount.asax.local:57073/jars/microsoft-spark-2.4.x-0.10.0.jar with timestamp 1597489329699
    20/08/15 15:32:09 INFO SparkContext: Added JAR file:/C:/MyUserAccount/Projects/CEP/source/TradeValue/Asa.Cep.TradeValue/Asa.Cep.TradeValue.Processing/bin/Debug/netcoreapp3.1/microsoft-spark-2.4.x-0.10.0.jar at spark://MyUserAccount.asax.local:57073/jars/microsoft-spark-2.4.x-0.10.0.jar with timestamp 1597489329699
    20/08/15 15:32:09 INFO Executor: Starting executor ID driver on host localhost
    20/08/15 15:32:09 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 57082.
    20/08/15 15:32:09 INFO NettyBlockTransferService: Server created on MyUserAccount.asax.local:57082
    20/08/15 15:32:09 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
    20/08/15 15:32:09 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, MyUserAccount.asax.local, 57082, None)
    20/08/15 15:32:09 INFO BlockManagerMasterEndpoint: Registering block manager MyUserAccount.asax.local:57082 with 366.3 MB RAM, BlockManagerId(driver, MyUserAccount.asax.local, 57082, None)
    20/08/15 15:32:09 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, MyUserAccount.asax.local, 57082, None)
    20/08/15 15:32:09 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, MyUserAccount.asax.local, 57082, None)
    l3
    20/08/15 15:32:10 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/C:/MyUserAccount/Projects/CEP/source/TradeValue/Asa.Cep.TradeValue/Asa.Cep.TradeValue.Processing/spark-warehouse').
    20/08/15 15:32:10 INFO SharedState: Warehouse path is 'file:/C:/MyUserAccount/Projects/CEP/source/TradeValue/Asa.Cep.TradeValue/Asa.Cep.TradeValue.Processing/spark-warehouse'.
    20/08/15 15:32:10 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
    20/08/15 15:32:10 INFO ConsumerConfig: ConsumerConfig values:
            metric.reporters = []
            metadata.max.age.ms = 300000
            partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
            reconnect.backoff.ms = 50
            sasl.kerberos.ticket.renew.window.factor = 0.8
            max.partition.fetch.bytes = 1048576
            bootstrap.servers = [http://localhost:9092]
            ssl.keystore.type = JKS
            enable.auto.commit = false
            sasl.mechanism = GSSAPI
            interceptor.classes = null
            exclude.internal.topics = true
            ssl.truststore.password = null
            client.id =
            ssl.endpoint.identification.algorithm = null
            max.poll.records = 1
            check.crcs = true
            request.timeout.ms = 40000
            heartbeat.interval.ms = 3000
            auto.commit.interval.ms = 5000
            receive.buffer.bytes = 65536
            ssl.truststore.type = JKS
            ssl.truststore.location = null
            ssl.keystore.password = null
            fetch.min.bytes = 1
            send.buffer.bytes = 131072
            value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
            group.id = spark-kafka-source-75008566-d5fc-4022-859c-d957a6590559--1597538808-driver-0
            retry.backoff.ms = 100
            sasl.kerberos.kinit.cmd = /usr/bin/kinit
            sasl.kerberos.service.name = null
            sasl.kerberos.ticket.renew.jitter = 0.05
            ssl.trustmanager.algorithm = PKIX
            ssl.key.password = null
            fetch.max.wait.ms = 500
            sasl.kerberos.min.time.before.relogin = 60000
            connections.max.idle.ms = 540000
            session.timeout.ms = 30000
            metrics.num.samples = 2
            key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
            ssl.protocol = TLS
            ssl.provider = null
            ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
            ssl.keystore.location = null
            ssl.cipher.suites = null
            security.protocol = PLAINTEXT
            ssl.keymanager.algorithm = SunX509
            metrics.sample.window.ms = 30000
            auto.offset.reset = earliest

    20/08/15 15:32:10 INFO ConsumerConfig: ConsumerConfig values:
            metric.reporters = []
            metadata.max.age.ms = 300000
            partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
            reconnect.backoff.ms = 50
            sasl.kerberos.ticket.renew.window.factor = 0.8
            max.partition.fetch.bytes = 1048576
            bootstrap.servers = [http://localhost:9092]
            ssl.keystore.type = JKS
            enable.auto.commit = false
            sasl.mechanism = GSSAPI
            interceptor.classes = null
            exclude.internal.topics = true
            ssl.truststore.password = null
            client.id = consumer-1
            ssl.endpoint.identification.algorithm = null
            max.poll.records = 1
            check.crcs = true
            request.timeout.ms = 40000
            heartbeat.interval.ms = 3000
            auto.commit.interval.ms = 5000
            receive.buffer.bytes = 65536
            ssl.truststore.type = JKS
            ssl.truststore.location = null
            ssl.keystore.password = null
            fetch.min.bytes = 1
            send.buffer.bytes = 131072
            value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
            group.id = spark-kafka-source-75008566-d5fc-4022-859c-d957a6590559--1597538808-driver-0
            retry.backoff.ms = 100
            sasl.kerberos.kinit.cmd = /usr/bin/kinit
            sasl.kerberos.service.name = null
            sasl.kerberos.ticket.renew.jitter = 0.05
            ssl.trustmanager.algorithm = PKIX
            ssl.key.password = null
            fetch.max.wait.ms = 500
            sasl.kerberos.min.time.before.relogin = 60000
            connections.max.idle.ms = 540000
            session.timeout.ms = 30000
            metrics.num.samples = 2
            key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
            ssl.protocol = TLS
            ssl.provider = null
            ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
            ssl.keystore.location = null
            ssl.cipher.suites = null
            security.protocol = PLAINTEXT
            ssl.keymanager.algorithm = SunX509
            metrics.sample.window.ms = 30000
            auto.offset.reset = earliest

    20/08/15 15:32:10 INFO AppInfoParser: Kafka version : 0.10.0.1
    20/08/15 15:32:10 INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5
    20/08/15 15:32:10 ERROR DotnetBackendHandler: methods:
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.slf4j.Logger org.apache.spark.sql.streaming.DataStreamReader.log()
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.streaming.DataStreamReader org.apache.spark.sql.streaming.DataStreamReader.format(java.lang.String)
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.Dataset org.apache.spark.sql.streaming.DataStreamReader.load(java.lang.String)
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.Dataset org.apache.spark.sql.streaming.DataStreamReader.load()
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.logDebug(scala.Function0,java.lang.Throwable)
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.logDebug(scala.Function0)
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.logWarning(scala.Function0,java.lang.Throwable)
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.logWarning(scala.Function0)
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public java.lang.String org.apache.spark.sql.streaming.DataStreamReader.logName()
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.logTrace(scala.Function0)
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.logTrace(scala.Function0,java.lang.Throwable)
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.logInfo(scala.Function0)
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.logInfo(scala.Function0,java.lang.Throwable)
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.logError(scala.Function0)
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.logError(scala.Function0,java.lang.Throwable)
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public boolean org.apache.spark.sql.streaming.DataStreamReader.isTraceEnabled()
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.slf4j.Logger org.apache.spark.sql.streaming.DataStreamReader.org$apache$spark$internal$Logging$$log_()
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public boolean org.apache.spark.sql.streaming.DataStreamReader.initializeLogIfNecessary$default$2()
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.org$apache$spark$internal$Logging$$log__$eq(org.slf4j.Logger)
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.initializeLogIfNecessary(boolean)
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public boolean org.apache.spark.sql.streaming.DataStreamReader.initializeLogIfNecessary(boolean,boolean)
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.streaming.DataStreamReader org.apache.spark.sql.streaming.DataStreamReader.options(scala.collection.Map)
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.streaming.DataStreamReader org.apache.spark.sql.streaming.DataStreamReader.options(java.util.Map)
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.streaming.DataStreamReader org.apache.spark.sql.streaming.DataStreamReader.option(java.lang.String,java.lang.String)
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.streaming.DataStreamReader org.apache.spark.sql.streaming.DataStreamReader.option(java.lang.String,boolean)
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.streaming.DataStreamReader org.apache.spark.sql.streaming.DataStreamReader.option(java.lang.String,long)
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.streaming.DataStreamReader org.apache.spark.sql.streaming.DataStreamReader.option(java.lang.String,double)
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.Dataset org.apache.spark.sql.streaming.DataStreamReader.text(java.lang.String)
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.streaming.DataStreamReader org.apache.spark.sql.streaming.DataStreamReader.schema(org.apache.spark.sql.types.StructType)
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.streaming.DataStreamReader org.apache.spark.sql.streaming.DataStreamReader.schema(java.lang.String)
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.Dataset org.apache.spark.sql.streaming.DataStreamReader.json(java.lang.String)
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.Dataset org.apache.spark.sql.streaming.DataStreamReader.textFile(java.lang.String)
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.Dataset org.apache.spark.sql.streaming.DataStreamReader.parquet(java.lang.String)
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.Dataset org.apache.spark.sql.streaming.DataStreamReader.orc(java.lang.String)
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.Dataset org.apache.spark.sql.streaming.DataStreamReader.csv(java.lang.String)
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public final void java.lang.Object.wait() throws java.lang.InterruptedException
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public final void java.lang.Object.wait(long,int) throws java.lang.InterruptedException
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public final native void java.lang.Object.wait(long) throws java.lang.InterruptedException
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public boolean java.lang.Object.equals(java.lang.Object)
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public java.lang.String java.lang.Object.toString()
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public native int java.lang.Object.hashCode()
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public final native java.lang.Class java.lang.Object.getClass()
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public final native void java.lang.Object.notify()
    20/08/15 15:32:10 ERROR DotnetBackendHandler: public final native void java.lang.Object.notifyAll()
    20/08/15 15:32:10 ERROR DotnetBackendHandler: args:
    [2020-08-15T11:02:10.9187218Z] [MyUserAccount] [Error] [JvmBridge] JVM method execution failed: Nonstatic method load failed for class 7 when called with no arguments
    [2020-08-15T11:02:10.9188973Z] [MyUserAccount] [Error] [JvmBridge] java.lang.NoClassDefFoundError: org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow
            at java.lang.ClassLoader.defineClass1(Native Method)
            at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
            at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
            at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
            at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
            at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
            at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
            at java.security.AccessController.doPrivileged(Native Method)
            at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
            at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:136)
            at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43)
            at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:204)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.spark.api.dotnet.DotnetBackendHandler.handleMethodCall(DotnetBackendHandler.scala:162)
            at org.apache.spark.api.dotnet.DotnetBackendHandler.handleBackendRequest(DotnetBackendHandler.scala:102)
            at org.apache.spark.api.dotnet.DotnetBackendHandler.channelRead0(DotnetBackendHandler.scala:29)
            at org.apache.spark.api.dotnet.DotnetBackendHandler.channelRead0(DotnetBackendHandler.scala:24)
            at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
            at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
            at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
            at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
            at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
            at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
            at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
            at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
            at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
            at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
            at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
            at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
            at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
            at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
            at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
            at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
            at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.sources.v2.reader.SupportsScanUnsafeRow
            at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
            ... 47 more

    [2020-08-15T11:02:10.9333693Z] [MyUserAccount] [Exception] [JvmBridge] JVM method execution failed: Nonstatic method load failed for class 7 when called with no arguments
       at Microsoft.Spark.Interop.Ipc.JvmBridge.CallJavaMethod(Boolean isStatic, Object classNameOrJvmObjectReference, String methodName, Object[] args)
    Unhandled exception. System.Exception: JVM method execution failed: Nonstatic method load failed for class 7 when called with no arguments
       at Microsoft.Spark.Interop.Ipc.JvmBridge.CallJavaMethod(Boolean isStatic, Object classNameOrJvmObjectReference, String methodName, Object[] args)
       at Microsoft.Spark.Interop.Ipc.JvmBridge.CallNonStaticJavaMethod(JvmObjectReference objectId, String methodName, Object[] args)
       at Microsoft.Spark.Interop.Ipc.JvmObjectReference.Invoke(String methodName, Object[] args)
       at Microsoft.Spark.Sql.Streaming.DataStreamReader.Load()
       at Asa.Cep.TradeValue.Processing.Program.Main(String[] args) in

我的源代码:

string bootstrapServers = Helper.Instance.KafkaUri;
            string subscribeType = "subscribe";
            string topics = Helper.Instance.KafkaTopic;

            System.Console.WriteLine("l1");

            System.Console.WriteLine(bootstrapServers);
            System.Console.WriteLine(subscribeType);
            System.Console.WriteLine(topics);
            System.Console.WriteLine("l2");

            SparkSession spark = SparkSession
                .Builder()
                .AppName("StructuredKafkaWordCount")
                .GetOrCreate();
            System.Console.WriteLine("l3");

            DataFrame lines = spark
                .ReadStream()
                .Format("kafka")
                .Option("kafka.bootstrap.servers", bootstrapServers)
                .Option(subscribeType, topics)

                .Load()
                .SelectExpr("CAST(value AS STRING)");
            System.Console.WriteLine("l4");

            DataFrame words = lines
                .Select(Explode(Split(lines["value"], " "))
                    .Alias("word"));
            DataFrame wordCounts = words.GroupBy("word").Count();
            System.Console.WriteLine("l5");

            StreamingQuery query = wordCounts
                .WriteStream()
                .OutputMode("complete")
                .Format("console")
                .Start();
            System.Console.WriteLine("l6");

            query.AwaitTermination();

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题