我正在尝试使用spark流媒体,但我被困在第一个示例:
import java.util.Arrays;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
public class NetworkWordCount {
public static void main(String[] args) {
// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
JavaDStream<String> words = lines.flatMap(
new FlatMapFunction<String, String>() {
@Override public Iterable<String> call(String x) {
return Arrays.asList(x.split(" "));
}
});
// Count each word in each batch
JavaPairDStream<String, Integer> pairs = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print();
jssc.start(); // Start the computation
jssc.awaitTermination();
}
}
这段代码实际上是文档的副本。
https://spark.apache.org/docs/1.6.3/streaming-programming-guide.html
我设置了一个netcat服务器,如:
北卡罗来纳州-lk 9999
以及obne nc客户端,如:
nc本地主机9999
我在其中键入如下句子:
你好,世界!世界万岁\n
在netcat服务器上正确显示。
但它不起作用。每一批我都有一张空印。
21/02/16 00:36:41 INFO SocketInputDStream: Removing blocks of RDD BlockRDD[137] at socketTextStream at NetworkWordCount.java:17 of time 1613432201000 ms
21/02/16 00:36:41 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1613432199000 ms)
21/02/16 00:36:41 INFO InputInfoTracker: remove old batch metadata: 1613432199000 ms
-------------------------------------------
Time: 1613432201000 ms
-------------------------------------------
我尝试了不同的价值观 setMaster
例如 local[4]
, local[2]
以及 local[*]
但结果是一样的。
另外,如果我在netcat客户端之前运行spark流代码,我甚至看不到nc服务器上的字符串。
1条答案
按热度按时间44u64gxh1#
我找到了解决我问题的办法。
简而言之,您需要将消息直接写入运行tcp服务器的终端,而不需要另一个netcat客户端。
没有错误或缺少配置这只是对netcat工作方式的误解。
我从电话里听懂了
man nc
比-k
选项允许netcat管理多个连接,但部分是错误的。但这并不是我想的那样。如果使用
-k
选项,则它将接受多个连接,但仍将一次处理一个连接。这意味着,如果您有两个nc客户端,并且如果您在两个客户端中都键入了一些文本,那么在您关闭第一个连接之前,服务器将只接收其中一个客户端的文本。