docker pypspark群集容器没有从主机接收kafka流?

c9x0cxw0  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(418)

我已经创建并部署了一个spark集群,它由4个容器组成
Spark控制
Spark从动装置
spark提交
数据装载容器:从本地目录访问脚本
我在所有这些容器中添加了必需的依赖关系jar
并在主机中部署了kafka,通过producer生成流媒体。
我按照下面文档中的确切步骤推出了Kafka
https://kafka.apache.org/quickstart
我验证了kafka生产者和消费者在9092端口交换消息,这是正常工作
下面是简单的pyspark脚本,我想将其作为结构化流处理

from pyspark import SparkContext
from pyspark.sql import SparkSession

print("Kafka App launched")
spark = SparkSession.builder.master("spark://master:7077").appName("kafka_Structured").getOrCreate()
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "hostmachine:9092").option("subscribe", "session-event").option("maxOffsetsPerTrigger", 10).load()

converted_string=df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

print("Recieved Stream in String", converted_string)

下面是我用来执行脚本的spark submit


## container

# pyspark_vol - container for vol mounting

# spark/stru_kafka - container for spark-submit

# i linked the spark master and slave already using the container 'master'

## spark submit

docker run --add-host="localhost: myhost" --rm -it --link master:master --volumes-from pyspark_vol spark/stru_kafka spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.1 –jars /home/spark/spark-2.1.1-bin-hadoop2.6/jars/spark-sql-kafka-0-10_2.11-2.1.1.jar --master spark://master:7077  /data/spark_session_kafka.py localhost 9092 session-event

在我运行脚本之后,脚本执行得很好,但是它似乎没有像Kafka制作人那样批量地监听流媒体并停止执行。
我没有观察到任何具体的错误,但没有从脚本中产生任何输出
我使用socket程序验证了从docker容器内的主机接收数据的连接性,该程序工作正常。
我不确定是否遗漏了任何配置。。

期望值:

上面运行在spark cluster上的应用程序应该打印来自kafka生产者的流媒体

实际

"id" : "f4e8829f-583e-4630-ac22-1d7da2eb80e7",
  "runId" : "4b93d523-7b7c-43ad-9ef6-272dd8a16e0a",
  "name" : null,
  "timestamp" : "2020-09-09T09:21:17.931Z",
  "numInputRows" : 0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 1922,
    "getBatch" : 287,
    "getOffset" : 361,
    "queryPlanning" : 111,
    "triggerExecution" : 2766,
    "walCommit" : 65
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[session-event]]",
    "startOffset" : null,
    "endOffset" : {
      "session-event" : {
        "0" : 24
      }
    },
    "numInputRows" : 0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@6a1b0b4b"
  }
}
fd3cxomn

fd3cxomn1#

问题是我的pyspark\u流脚本没有提供批处理时间和print语句来查看日志。。。
因为它不是聚合流,所以我不得不在这里使用append

result =df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

print("Kafka Straming output is",result)
query = result.writeStream.outputMode("append").format("console").trigger(processingTime='30 seconds').start()
qv7cva1a

qv7cva1a2#

根据spark文档中提供的快速示例,您需要启动查询并等待其终止。
对你来说,这意味着你需要更换

print("Recieved Stream in String", converted_string)

具有

query = df.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()

相关问题