如何在Spark流中启用背压(使用pyspark)

beq87vna  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(317)

我想知道正确的方法是什么 backpressurespark streaming 通过 pyspark . 我好像收到了太多的邮件 Kafka 很快就爆炸了。下面是我的密码 spark streaming . 有人能给我指出正确的地方吗 back pressure ?

sc = SparkContext(appName="PythonStreamingDirectKafka")
ssc = StreamingContext(sc, 5)
ssc.checkpoint("/spark_check/")
kvs = KafkaUtils.createDirectStream(ssc, [kafka_topic],
                                    {"metadata.broker.list": bootstrap_servers_ipaddress})
parsed_msg = kvs.map(lambda (key, value): json.loads(value))

## do something below
insrf1ej

insrf1ej1#

下面是我如何设置我的Kafka流代码背压。希望有帮助。

from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("PythonStreamingDirectKafka")\
        .set("spark.streaming.backpressure.enabled", "true") \
        .set("spark.streaming.backpressure.initialRate", "500")

sc = SparkContext(conf=conf)

相关问题