window-on-kinesis-dstream vs-structure-streaming-dataframe-in-pyspark

piztneat  于 2021-05-24  发布在  Spark
关注(0)|答案(0)|浏览(222)

我正在编写一个spark流应用程序,从kinesis数据流读取数据。我可以通过kinesisutils.createstream函数进行读取,然后将输出数据流转换为Dataframe来处理消息。
我的要求是执行基于时间序列的聚合,即分钟级聚合,窗口为最后10分钟。目前我正在Dataframe上使用结构化流的窗口功能。代码如下。

def getSparkSessionInstance(sparkConf):
    if ('sparkSessionSingletonInstance' not in globals()):
        globals()['sparkSessionSingletonInstance'] = SparkSession\
            .builder\
            .config(conf=sparkConf)\
            .getOrCreate()
    return globals()['sparkSessionSingletonInstance']

def createContext(applicationName, streamName, endpointUrl, regionName, shards):
    print("Creating new context")

    sc = SparkContext(appName=applicationName)
    ssc = StreamingContext(sc, 60)
    #sqlContext = SQLContext(sc)

    print("appname is" + applicationName +
          streamName + endpointUrl + regionName)

    kinesis_streams = [KinesisUtils.createStream(ssc, applicationName, streamName, endpointUrl,
            regionName, InitialPositionInStream.LATEST, 2) for _ in range(int(shards))]
    kinesisStream = ssc.union(*kinesis_streams)

    def process(time, rdd):
        print("========= %s =========" % time)

        try:
            # Get the singleton instance of SparkSession
            spark = getSparkSessionInstance(rdd.context.getConf())

            df = spark.read.json(rdd)
            df.printSchema()
            print(df.rdd.getNumPartitions())
            df = df.withColumn("eventTime", F.to_timestamp(df.headers.timestamp)).withColumn("x", df.args.x)
            df.show(truncate=False)

            wDf = df.withWatermark("eventTime", "10 minutes").groupBy("x", F.window("eventTime", "1 minutes", "1 minutes")).count()
            wDf.show(truncate=False)
        except Exception as e:
            print(str(e))
            pass

    kinesis_json = kinesisStream.map(lambda x: json.loads(x))
    #kinesis_json.pprint()
    kinesis_json.foreachRDD(process)
    ssc.checkpoint(checkpoint)
    return ssc

if __name__ == "__main__":
    if len(sys.argv) != 7:
        print(
            "Usage: Aggregate.py <app-name> <stream-name> <endpoint-url> <region-name> <kinesis-shards> <checkpoint-directory>",
            file=sys.stderr)
        sys.exit(-1)

    appName, streamName, endpointUrl, regionName, shards, checkpoint = sys.argv[1:]
    ssc = StreamingContext.getOrCreate(checkpoint,
                lambda: createContext(appName, streamName, endpointUrl, regionName, shards))

    ssc.start()
    time.sleep(600)

    #ssc.awaitTermination()
    ssc.stop(stopSparkContext=True,stopGraceFully=True)

我的问题是,如果我以1分钟的批处理间隔读取kinesis数据流,它是否仍然能够创建10分钟的水印,或者我是否需要配置10分钟的数据流窗口?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题