Spark流套接字流示例不工作

oyjwcjzk  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(393)

我正在尝试使用spark流媒体,但我被困在第一个示例:

import java.util.Arrays;

import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;

public class NetworkWordCount {
    public static void main(String[] args) {
        // Create a local StreamingContext with two working thread and batch interval of 1 second
        SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

        JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

        JavaDStream<String> words = lines.flatMap(
                new FlatMapFunction<String, String>() {
                    @Override public Iterable<String> call(String x) {
                        return Arrays.asList(x.split(" "));
                    }
                });

        // Count each word in each batch
        JavaPairDStream<String, Integer> pairs = words.mapToPair(
                new PairFunction<String, String, Integer>() {
                    @Override public Tuple2<String, Integer> call(String s) {
                        return new Tuple2<String, Integer>(s, 1);
                    }
                });
        JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
                new Function2<Integer, Integer, Integer>() {
                    @Override public Integer call(Integer i1, Integer i2) {
                        return i1 + i2;
                    }
                });

        // Print the first ten elements of each RDD generated in this DStream to the console
        wordCounts.print();

        jssc.start();              // Start the computation
        jssc.awaitTermination();
    }
}

这段代码实际上是文档的副本。
https://spark.apache.org/docs/1.6.3/streaming-programming-guide.html
我设置了一个netcat服务器,如:
北卡罗来纳州-lk 9999
以及obne nc客户端,如:
nc本地主机9999
我在其中键入如下句子:
你好,世界!世界万岁\n
在netcat服务器上正确显示。
但它不起作用。每一批我都有一张空印。

21/02/16 00:36:41 INFO SocketInputDStream: Removing blocks of RDD BlockRDD[137] at socketTextStream at NetworkWordCount.java:17 of time 1613432201000 ms
21/02/16 00:36:41 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1613432199000 ms)
21/02/16 00:36:41 INFO InputInfoTracker: remove old batch metadata: 1613432199000 ms
-------------------------------------------
Time: 1613432201000 ms
-------------------------------------------

我尝试了不同的价值观 setMaster 例如 local[4] , local[2] 以及 local[*] 但结果是一样的。
另外,如果我在netcat客户端之前运行spark流代码,我甚至看不到nc服务器上的字符串。

44u64gxh

44u64gxh1#

我找到了解决我问题的办法。
简而言之,您需要将消息直接写入运行tcp服务器的终端,而不需要另一个netcat客户端。
没有错误或缺少配置这只是对netcat工作方式的误解。
我从电话里听懂了 man nc-k 选项允许netcat管理多个连接,但部分是错误的。

-k      When a connection is completed, listen for another one.  Requires -l.  When used together with the -u option, the server socket is not connected and it can receive UDP datagrams from multiple
             hosts.

但这并不是我想的那样。如果使用 -k 选项,则它将接受多个连接,但仍将一次处理一个连接。
这意味着,如果您有两个nc客户端,并且如果您在两个客户端中都键入了一些文本,那么在您关闭第一个连接之前,服务器将只接收其中一个客户端的文本。

相关问题