我在.NETCore中使用ApacheSpark
我试图连接spark流与Kafka,当我运行我的应用程序时,我得到以下错误。
日志:
Ivy Default Cache set to: C:\Users\MyUserAccount\.ivy2\cache
The jars for the packages stored in: C:\Users\MyUserAccount\.ivy2\jars
:: loading settings :: url = jar:file:/C:/bin/spark-2.4.1-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-.
.
.
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 6 | 0 | 0 | 0 || 6 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-83ef8d1b-5e0e-420d-af99-18bdeeed2bc7
confs: [default]
0 artifacts copied, 6 already retrieved (0kB/15ms)
20/08/15 15:32:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/08/15 15:32:07 INFO DotnetRunner: Starting DotnetBackend with dotnet.
20/08/15 15:32:08 INFO DotnetRunner: Port number used by DotnetBackend is 57067
20/08/15 15:32:08 INFO DotnetRunner: Adding key=spark.jars and value=file:///C:/Users/MyUserAccount/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.3.2.jar,file:///C:/Users/MyUserAccount/.ivy2/jars/org.apache.kafka_kafka-clients-0.10.0.1.jar,file:///C:/Users/MyUserAccount/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar,file:///C:/Users/MyUserAccount/.ivy2/jars/net.jpountz.lz4_lz4-1.3.0.jar,file:///C:/Users/MyUserAccount/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.2.6.jar,file:///C:/Users/MyUserAccount/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar,file:/C:/MyUserAccount/Projects/CEP/source/TradeValue/Asa.Cep.TradeValue/Asa.Cep.TradeValue.Processing/bin/Debug/netcoreapp3.1/microsoft-spark-2.4.x-0.10.0.jar to environment
20/08/15 15:32:08 INFO DotnetRunner: Adding key=spark.app.name and value=org.apache.spark.deploy.dotnet.DotnetRunner to environment
20/08/15 15:32:08 INFO DotnetRunner: Adding key=spark.submit.deployMode and value=client to environment
20/08/15 15:32:08 INFO DotnetRunner: Adding key=spark.master and value=local to environment
20/08/15 15:32:08 INFO DotnetRunner: Adding key=spark.repl.local.jars and value=file:///C:/Users/MyUserAccount/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.3.2.jar,file:///C:/Users/MyUserAccount/.ivy2/jars/org.apache.kafka_kafka-clients-0.10.0.1.jar,file:///C:/Users/MyUserAccount/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar,file:///C:/Users/MyUserAccount/.ivy2/jars/net.jpountz.lz4_lz4-1.3.0.jar,file:///C:/Users/MyUserAccount/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.2.6.jar,file:///C:/Users/MyUserAccount/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar to environment
l1
http://localhost:9092
subscribe
chat-message
l2
[2020-08-15T11:02:08.9293042Z] [MyUserAccount] [Info] [ConfigurationService] Using port 57067 for connection.
[2020-08-15T11:02:08.9303643Z] [MyUserAccount] [Info] [JvmBridge] JvMBridge port is 57067
20/08/15 15:32:09 INFO SparkContext: Running Spark version 2.4.1
20/08/15 15:32:09 INFO SparkContext: Submitted application: StructuredKafkaWordCount
20/08/15 15:32:09 INFO SecurityManager: Changing view acls to: MyUserAccount
20/08/15 15:32:09 INFO SecurityManager: Changing modify acls to: MyUserAccount
20/08/15 15:32:09 INFO SecurityManager: Changing view acls groups to:
20/08/15 15:32:09 INFO SecurityManager: Changing modify acls groups to:
20/08/15 15:32:09 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(MyUserAccount); groups with view permissions: Set(); users with modify permissions: Set(MyUserAccount); groups with modify permissions: Set()
20/08/15 15:32:09 INFO Utils: Successfully started service 'sparkDriver' on port 57073.
20/08/15 15:32:09 INFO SparkEnv: Registering MapOutputTracker
20/08/15 15:32:09 INFO SparkEnv: Registering BlockManagerMaster
20/08/15 15:32:09 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
20/08/15 15:32:09 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
20/08/15 15:32:09 INFO DiskBlockManager: Created local directory at C:\Users\MyUserAccount\AppData\Local\Temp\blockmgr-4276b71f-affa-46bd-8a0a-fac55f0f825f
20/08/15 15:32:09 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
20/08/15 15:32:09 INFO SparkEnv: Registering OutputCommitCoordinator
20/08/15 15:32:09 INFO Utils: Successfully started service 'SparkUI' on port 4040.
20/08/15 15:32:09 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://MyUserAccount.asax.local:4040
20/08/15 15:32:09 INFO SparkContext: Added JAR file:///C:/Users/MyUserAccount/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.3.2.jar at spark://MyUserAccount.asax.local:57073/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.3.2.jar with timestamp 1597489329693
20/08/15 15:32:09 INFO SparkContext: Added JAR file:///C:/Users/MyUserAccount/.ivy2/jars/org.apache.kafka_kafka-clients-0.10.0.1.jar at spark://MyUserAccount.asax.local:57073/jars/org.apache.kafka_kafka-clients-0.10.0.1.jar with timestamp 1597489329694
20/08/15 15:32:09 INFO SparkContext: Added JAR file:///C:/Users/MyUserAccount/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar at spark://MyUserAccount.asax.local:57073/jars/org.spark-project.spark_unused-1.0.0.jar with timestamp 1597489329696
20/08/15 15:32:09 INFO SparkContext: Added JAR file:///C:/Users/MyUserAccount/.ivy2/jars/net.jpountz.lz4_lz4-1.3.0.jar at spark://MyUserAccount.asax.local:57073/jars/net.jpountz.lz4_lz4-1.3.0.jar with timestamp 1597489329697
20/08/15 15:32:09 INFO SparkContext: Added JAR file:///C:/Users/MyUserAccount/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.2.6.jar at spark://MyUserAccount.asax.local:57073/jars/org.xerial.snappy_snappy-java-1.1.2.6.jar with timestamp 1597489329698
20/08/15 15:32:09 INFO SparkContext: Added JAR file:///C:/Users/MyUserAccount/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar at spark://MyUserAccount.asax.local:57073/jars/org.slf4j_slf4j-api-1.7.16.jar with timestamp 1597489329699
20/08/15 15:32:09 INFO SparkContext: Added JAR file:/C:/MyUserAccount/Projects/CEP/source/TradeValue/Asa.Cep.TradeValue/Asa.Cep.TradeValue.Processing/bin/Debug/netcoreapp3.1/microsoft-spark-2.4.x-0.10.0.jar at spark://MyUserAccount.asax.local:57073/jars/microsoft-spark-2.4.x-0.10.0.jar with timestamp 1597489329699
20/08/15 15:32:09 INFO SparkContext: Added JAR file:/C:/MyUserAccount/Projects/CEP/source/TradeValue/Asa.Cep.TradeValue/Asa.Cep.TradeValue.Processing/bin/Debug/netcoreapp3.1/microsoft-spark-2.4.x-0.10.0.jar at spark://MyUserAccount.asax.local:57073/jars/microsoft-spark-2.4.x-0.10.0.jar with timestamp 1597489329699
20/08/15 15:32:09 INFO Executor: Starting executor ID driver on host localhost
20/08/15 15:32:09 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 57082.
20/08/15 15:32:09 INFO NettyBlockTransferService: Server created on MyUserAccount.asax.local:57082
20/08/15 15:32:09 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/08/15 15:32:09 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, MyUserAccount.asax.local, 57082, None)
20/08/15 15:32:09 INFO BlockManagerMasterEndpoint: Registering block manager MyUserAccount.asax.local:57082 with 366.3 MB RAM, BlockManagerId(driver, MyUserAccount.asax.local, 57082, None)
20/08/15 15:32:09 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, MyUserAccount.asax.local, 57082, None)
20/08/15 15:32:09 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, MyUserAccount.asax.local, 57082, None)
l3
20/08/15 15:32:10 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/C:/MyUserAccount/Projects/CEP/source/TradeValue/Asa.Cep.TradeValue/Asa.Cep.TradeValue.Processing/spark-warehouse').
20/08/15 15:32:10 INFO SharedState: Warehouse path is 'file:/C:/MyUserAccount/Projects/CEP/source/TradeValue/Asa.Cep.TradeValue/Asa.Cep.TradeValue.Processing/spark-warehouse'.
20/08/15 15:32:10 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
20/08/15 15:32:10 INFO ConsumerConfig: ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [http://localhost:9092]
ssl.keystore.type = JKS
enable.auto.commit = false
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id =
ssl.endpoint.identification.algorithm = null
max.poll.records = 1
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id = spark-kafka-source-75008566-d5fc-4022-859c-d957a6590559--1597538808-driver-0
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = earliest
20/08/15 15:32:10 INFO ConsumerConfig: ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [http://localhost:9092]
ssl.keystore.type = JKS
enable.auto.commit = false
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id = consumer-1
ssl.endpoint.identification.algorithm = null
max.poll.records = 1
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id = spark-kafka-source-75008566-d5fc-4022-859c-d957a6590559--1597538808-driver-0
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = earliest
20/08/15 15:32:10 INFO AppInfoParser: Kafka version : 0.10.0.1
20/08/15 15:32:10 INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5
20/08/15 15:32:10 ERROR DotnetBackendHandler: methods:
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.slf4j.Logger org.apache.spark.sql.streaming.DataStreamReader.log()
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.streaming.DataStreamReader org.apache.spark.sql.streaming.DataStreamReader.format(java.lang.String)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.Dataset org.apache.spark.sql.streaming.DataStreamReader.load(java.lang.String)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.Dataset org.apache.spark.sql.streaming.DataStreamReader.load()
20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.logDebug(scala.Function0,java.lang.Throwable)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.logDebug(scala.Function0)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.logWarning(scala.Function0,java.lang.Throwable)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.logWarning(scala.Function0)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public java.lang.String org.apache.spark.sql.streaming.DataStreamReader.logName()
20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.logTrace(scala.Function0)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.logTrace(scala.Function0,java.lang.Throwable)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.logInfo(scala.Function0)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.logInfo(scala.Function0,java.lang.Throwable)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.logError(scala.Function0)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.logError(scala.Function0,java.lang.Throwable)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public boolean org.apache.spark.sql.streaming.DataStreamReader.isTraceEnabled()
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.slf4j.Logger org.apache.spark.sql.streaming.DataStreamReader.org$apache$spark$internal$Logging$$log_()
20/08/15 15:32:10 ERROR DotnetBackendHandler: public boolean org.apache.spark.sql.streaming.DataStreamReader.initializeLogIfNecessary$default$2()
20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.org$apache$spark$internal$Logging$$log__$eq(org.slf4j.Logger)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.initializeLogIfNecessary(boolean)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public boolean org.apache.spark.sql.streaming.DataStreamReader.initializeLogIfNecessary(boolean,boolean)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.streaming.DataStreamReader org.apache.spark.sql.streaming.DataStreamReader.options(scala.collection.Map)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.streaming.DataStreamReader org.apache.spark.sql.streaming.DataStreamReader.options(java.util.Map)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.streaming.DataStreamReader org.apache.spark.sql.streaming.DataStreamReader.option(java.lang.String,java.lang.String)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.streaming.DataStreamReader org.apache.spark.sql.streaming.DataStreamReader.option(java.lang.String,boolean)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.streaming.DataStreamReader org.apache.spark.sql.streaming.DataStreamReader.option(java.lang.String,long)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.streaming.DataStreamReader org.apache.spark.sql.streaming.DataStreamReader.option(java.lang.String,double)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.Dataset org.apache.spark.sql.streaming.DataStreamReader.text(java.lang.String)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.streaming.DataStreamReader org.apache.spark.sql.streaming.DataStreamReader.schema(org.apache.spark.sql.types.StructType)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.streaming.DataStreamReader org.apache.spark.sql.streaming.DataStreamReader.schema(java.lang.String)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.Dataset org.apache.spark.sql.streaming.DataStreamReader.json(java.lang.String)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.Dataset org.apache.spark.sql.streaming.DataStreamReader.textFile(java.lang.String)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.Dataset org.apache.spark.sql.streaming.DataStreamReader.parquet(java.lang.String)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.Dataset org.apache.spark.sql.streaming.DataStreamReader.orc(java.lang.String)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.Dataset org.apache.spark.sql.streaming.DataStreamReader.csv(java.lang.String)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public final void java.lang.Object.wait() throws java.lang.InterruptedException
20/08/15 15:32:10 ERROR DotnetBackendHandler: public final void java.lang.Object.wait(long,int) throws java.lang.InterruptedException
20/08/15 15:32:10 ERROR DotnetBackendHandler: public final native void java.lang.Object.wait(long) throws java.lang.InterruptedException
20/08/15 15:32:10 ERROR DotnetBackendHandler: public boolean java.lang.Object.equals(java.lang.Object)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public java.lang.String java.lang.Object.toString()
20/08/15 15:32:10 ERROR DotnetBackendHandler: public native int java.lang.Object.hashCode()
20/08/15 15:32:10 ERROR DotnetBackendHandler: public final native java.lang.Class java.lang.Object.getClass()
20/08/15 15:32:10 ERROR DotnetBackendHandler: public final native void java.lang.Object.notify()
20/08/15 15:32:10 ERROR DotnetBackendHandler: public final native void java.lang.Object.notifyAll()
20/08/15 15:32:10 ERROR DotnetBackendHandler: args:
[2020-08-15T11:02:10.9187218Z] [MyUserAccount] [Error] [JvmBridge] JVM method execution failed: Nonstatic method load failed for class 7 when called with no arguments
[2020-08-15T11:02:10.9188973Z] [MyUserAccount] [Error] [JvmBridge] java.lang.NoClassDefFoundError: org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:136)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:204)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.api.dotnet.DotnetBackendHandler.handleMethodCall(DotnetBackendHandler.scala:162)
at org.apache.spark.api.dotnet.DotnetBackendHandler.handleBackendRequest(DotnetBackendHandler.scala:102)
at org.apache.spark.api.dotnet.DotnetBackendHandler.channelRead0(DotnetBackendHandler.scala:29)
at org.apache.spark.api.dotnet.DotnetBackendHandler.channelRead0(DotnetBackendHandler.scala:24)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.sources.v2.reader.SupportsScanUnsafeRow
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 47 more
[2020-08-15T11:02:10.9333693Z] [MyUserAccount] [Exception] [JvmBridge] JVM method execution failed: Nonstatic method load failed for class 7 when called with no arguments
at Microsoft.Spark.Interop.Ipc.JvmBridge.CallJavaMethod(Boolean isStatic, Object classNameOrJvmObjectReference, String methodName, Object[] args)
Unhandled exception. System.Exception: JVM method execution failed: Nonstatic method load failed for class 7 when called with no arguments
at Microsoft.Spark.Interop.Ipc.JvmBridge.CallJavaMethod(Boolean isStatic, Object classNameOrJvmObjectReference, String methodName, Object[] args)
at Microsoft.Spark.Interop.Ipc.JvmBridge.CallNonStaticJavaMethod(JvmObjectReference objectId, String methodName, Object[] args)
at Microsoft.Spark.Interop.Ipc.JvmObjectReference.Invoke(String methodName, Object[] args)
at Microsoft.Spark.Sql.Streaming.DataStreamReader.Load()
at Asa.Cep.TradeValue.Processing.Program.Main(String[] args) in
我的源代码:
string bootstrapServers = Helper.Instance.KafkaUri;
string subscribeType = "subscribe";
string topics = Helper.Instance.KafkaTopic;
System.Console.WriteLine("l1");
System.Console.WriteLine(bootstrapServers);
System.Console.WriteLine(subscribeType);
System.Console.WriteLine(topics);
System.Console.WriteLine("l2");
SparkSession spark = SparkSession
.Builder()
.AppName("StructuredKafkaWordCount")
.GetOrCreate();
System.Console.WriteLine("l3");
DataFrame lines = spark
.ReadStream()
.Format("kafka")
.Option("kafka.bootstrap.servers", bootstrapServers)
.Option(subscribeType, topics)
.Load()
.SelectExpr("CAST(value AS STRING)");
System.Console.WriteLine("l4");
DataFrame words = lines
.Select(Explode(Split(lines["value"], " "))
.Alias("word"));
DataFrame wordCounts = words.GroupBy("word").Count();
System.Console.WriteLine("l5");
StreamingQuery query = wordCounts
.WriteStream()
.OutputMode("complete")
.Format("console")
.Start();
System.Console.WriteLine("l6");
query.AwaitTermination();
暂无答案!
目前还没有任何答案,快来回答吧!