我已经成功使用Spark流如下。
import org.apache.spark.streaming._
import org.apache.spark._
import org.apache.spark.streaming.StreamingContext._
val conf = new SparkConf().setMaster(“local[2]”).setAppName(“NetworkWordCount”)
val ssc = new StreamingContext(sc, Seconds(30))
val lines = ssc.socketTextStream("localhost", 8888)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
scc.start()
nc -lvp 8888
我想试着用socket接收的数据进行字数统计,如下所示。
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._
val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", 5)
val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream.outputMode("complete").format("console").start()
但我犯了这样的错误。
ERROR MicroBatchExecution: Query [id = 36c54725-eeb5-415e-829a-e73c079d47d4, runId = 048f9d74-6456-4b01-b058-a4bd8e105b91] terminated with error
java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:607)
at java.net.Socket.connect(Socket.java:556)
at java.net.Socket.<init>(Socket.java:452)
at java.net.Socket.<init>(Socket.java:229)
流媒体下的两种方式有什么问题和区别?谢谢您!
暂无答案!
目前还没有任何答案,快来回答吧!