我在kafka中创建了一个名为“test”的主题,它只有一个分区,并且没有被复制。
我创建了一个kafka生产者,它在主题“test”中编写以下字符串:“ca”,循环100000次。迭代之间有1000毫秒的睡眠时间(thread.sleep)。关键是每个循环迭代的索引。
我在centos 7和windows上都运行了以下代码。我通常使用maven汇编插件构建一个胖jar,并使用spark submit运行它。在提交jar时,我总是需要指定包:--packages org.apache。spark:spark-sql-kafka-0-10_2.11:2.4.0
public class StreamFromKafka {
public static void es() throws StreamingQueryException {
SparkSession session = SparkSession.builder().appName("streamFromKafka").master("local[*]").getOrCreate();
String columnName = "value";
Dataset<Row> df = session.readStream().format("kafka")
.option("group.id","test-consumer-group")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test").load();
Dataset<Row> df1 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").select(columnName);
Dataset<String> words = df1.as(Encoders.STRING()).flatMap(line -> Arrays.asList(line.split(" ")).iterator(), Encoders.STRING());
//comment1 --> StreamingQuery query0 = words.writeStream().outputMode("update").format("console").start();
//comment2 --> query0.awaitTermination();
Dataset<Row> wordCount = words.groupBy("value").count();
StreamingQuery query = wordCount.writeStream().outputMode("update").format("console").start();
query.awaitTermination();
}
}
如果在上述代码中取消“comment1”和“comment2”的注解,该表将在windows上快速打印:
-------------------------------------------
Batch: 5
-------------------------------------------
+-----+
|value|
+-----+
| A|
| B|
| C|
| A|
| A|
| B|
| C|
| A|
+-----+
但是,如果我对comment1和comment2进行注解,那么小批量在windows上似乎很持久。
所以我可以得出结论,流确实是从windows上的kafka读取的,但是groupby需要很多时间。
昨天晚上20:46,我在windows上运行这个实现的时间比在linux上要长。它在windows上有很长的小批量(实时流媒体是在结构化流媒体api的引擎盖下用小批量构建的)。例如,如下图所示,执行两个批需要一分钟:
如下图所示,执行四个批需要三分钟:
在linux上更快。因为我第一次在linux上尝试过,所以我希望在windows上看到控制台输出的时间更少,然后,因为我什么也看不到,所以我认为它不起作用。
我应该在linux上计时小批量,以便比较行为。
暂无答案!
目前还没有任何答案,快来回答吧!