我正在尝试将kafka与apache spark流媒体集成,下面是代码-
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.HasOffsetRanges;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;
public class SampleSparkStraming {
public static void main(String[] args) {
//SparkContext context = new
SparkConf conf = new SparkConf().setAppName("SampleAPP").setMaster("spark://localhost:4040").set("spark.ui.port","4040");
//SparkConf conf = new SparkConf().setAppName("SampleAPP").setMaster("local[*]");
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(500000));
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("TEST-Kafka");
final JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
private static final long serialVersionUID = 1L;
public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
final OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
rdd.foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String, String>>>() {
private static final long serialVersionUID = 1L;
public void call(Iterator<ConsumerRecord<String, String>> consumerRecords) {
OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
System.out.println(o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
}
});
}
});
//stream.print();
}
}
``` `pom.xml` ```
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.0</version>
</dependency>
版本- kafka_2.10-0.10.2.0
spark-2.1.0 OS - Windows 7
当我尝试使用来自kafka的cmd consumer时,消息正在被使用,但不能与其他程序一起使用。。正在获取以下异常-
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/04/10 17:07:32 INFO SparkContext: Running Spark version 2.1.0
17/04/10 17:07:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/04/10 17:07:33 INFO SecurityManager: Changing view acls to: xxxxx
17/04/10 17:07:33 INFO SecurityManager: Changing modify acls to: xxxxxx
17/04/10 17:07:33 INFO SecurityManager: Changing view acls groups to:
17/04/10 17:07:33 INFO SecurityManager: Changing modify acls groups to:
17/04/10 17:07:33 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(xxxxx); groups with view permissions: Set(); users with modify permissions: Set(xxxxx); groups with modify permissions: Set()
17/04/10 17:07:33 INFO Utils: Successfully started service 'sparkDriver' on port 55878.
17/04/10 17:07:33 INFO SparkEnv: Registering MapOutputTracker
17/04/10 17:07:33 INFO SparkEnv: Registering BlockManagerMaster
17/04/10 17:07:33 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
17/04/10 17:07:33 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
17/04/10 17:07:33 INFO DiskBlockManager: Created local directory at C:\Users\xxxxxx\AppData\Local\Temp\blockmgr-38e935a6-96c1-4942-a88f-6b7c8677fba7
17/04/10 17:07:33 INFO MemoryStore: MemoryStore started with capacity 349.2 MB
17/04/10 17:07:33 INFO SparkEnv: Registering OutputCommitCoordinator
17/04/10 17:07:33 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
17/04/10 17:07:33 INFO Utils: Successfully started service 'SparkUI' on port 4041.
17/04/10 17:07:33 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.10.25.26:4041
17/04/10 17:07:33 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://localhost:4040...
17/04/10 17:07:33 INFO TransportClientFactory: Successfully created connection to localhost/127.0.0.1:4040 after 18 ms (0 ms spent in bootstraps)
17/04/10 17:07:33 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from localhost/127.0.0.1:4040 is closed
17/04/10 17:07:33 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master localhost:4040
org.apache.spark.SparkException: Exception thrown in awaitResult
at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100)
at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108)
at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:106)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Connection from localhost/127.0.0.1:4040 closed
at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:128)
at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:109)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:230)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:257)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:230)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:230)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:182)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:230)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1289)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:893)
at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:691)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:408)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:455)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
... 1 more
在spark窗口上,得到如下错误-
17/04/10 17:07:33 WARN HttpParser: Illegal character 0x0 in state=START for buffer HeapByteBuffer@73f2273b[p=1,l=1292,c=16384,r=1291]={\x00<<<\x00\x00\x00\x00\x00\x05\x0c\x03_>\xF5s.bKM\x00...Ft\x00\x0b10.10.25.26>>>\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00...\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00}
17/04/10 17:07:33 WARN HttpParser: badMessage: 400 Illegal character 0x0 for HttpChannelOverHttp@1a099881{r=0,c=false,a=IDLE,uri=}
有类似的问题和答案是有关版本冲突,但我无法检测到的问题。
2条答案
按热度按时间nqwrtyyt1#
假设uri是正确的,apachespark现在不支持scala2.12。另外,里面的“2.10”
kafka_2.10-0.10.2.0
这意味着它应该与Scala2.10一起部署。所以至少,使用scala 2.11.8,然后链接这个kafka连接器,如文档所示:
zed5wv102#
如果您在独立模式下运行spark应用程序,那么您需要先进行集群并在应用程序中使用该url,否则您可以简单地使用local[*]模式来运行spark应用程序。
根据您的代码,您还需要启动streamingcontext来从kafka获取数据。