如何使用scala解决spark中socket流的错误microbatchexecution?

5lhxktic  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(319)

我已经成功使用Spark流如下。

import org.apache.spark.streaming._
import org.apache.spark._
import org.apache.spark.streaming.StreamingContext._

val conf = new SparkConf().setMaster(“local[2]”).setAppName(“NetworkWordCount”)    
val ssc = new StreamingContext(sc, Seconds(30))
val lines = ssc.socketTextStream("localhost", 8888)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
scc.start()

nc -lvp 8888

我想试着用socket接收的数据进行字数统计,如下所示。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._

 val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
 spark.conf.set("spark.sql.shuffle.partitions", 5)

 val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()

 val words = lines.as[String].flatMap(_.split(" "))
 val wordCounts = words.groupBy("value").count()
 val query = wordCounts.writeStream.outputMode("complete").format("console").start()

但我犯了这样的错误。

ERROR MicroBatchExecution: Query [id = 36c54725-eeb5-415e-829a-e73c079d47d4, runId = 048f9d74-6456-4b01-b058-a4bd8e105b91] terminated with error
 java.net.ConnectException: Connection refused (Connection refused)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at 
 java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at 
 java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:607)
    at java.net.Socket.connect(Socket.java:556)
    at java.net.Socket.<init>(Socket.java:452)
    at java.net.Socket.<init>(Socket.java:229)

流媒体下的两种方式有什么问题和区别?谢谢您!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题