我正在编写一个spark流应用程序,从kinesis数据流读取数据。我可以通过kinesisutils.createstream函数进行读取,然后将输出数据流转换为Dataframe来处理消息。
我的要求是执行基于时间序列的聚合,即分钟级聚合,窗口为最后10分钟。目前我正在Dataframe上使用结构化流的窗口功能。代码如下。
def getSparkSessionInstance(sparkConf):
if ('sparkSessionSingletonInstance' not in globals()):
globals()['sparkSessionSingletonInstance'] = SparkSession\
.builder\
.config(conf=sparkConf)\
.getOrCreate()
return globals()['sparkSessionSingletonInstance']
def createContext(applicationName, streamName, endpointUrl, regionName, shards):
print("Creating new context")
sc = SparkContext(appName=applicationName)
ssc = StreamingContext(sc, 60)
#sqlContext = SQLContext(sc)
print("appname is" + applicationName +
streamName + endpointUrl + regionName)
kinesis_streams = [KinesisUtils.createStream(ssc, applicationName, streamName, endpointUrl,
regionName, InitialPositionInStream.LATEST, 2) for _ in range(int(shards))]
kinesisStream = ssc.union(*kinesis_streams)
def process(time, rdd):
print("========= %s =========" % time)
try:
# Get the singleton instance of SparkSession
spark = getSparkSessionInstance(rdd.context.getConf())
df = spark.read.json(rdd)
df.printSchema()
print(df.rdd.getNumPartitions())
df = df.withColumn("eventTime", F.to_timestamp(df.headers.timestamp)).withColumn("x", df.args.x)
df.show(truncate=False)
wDf = df.withWatermark("eventTime", "10 minutes").groupBy("x", F.window("eventTime", "1 minutes", "1 minutes")).count()
wDf.show(truncate=False)
except Exception as e:
print(str(e))
pass
kinesis_json = kinesisStream.map(lambda x: json.loads(x))
#kinesis_json.pprint()
kinesis_json.foreachRDD(process)
ssc.checkpoint(checkpoint)
return ssc
if __name__ == "__main__":
if len(sys.argv) != 7:
print(
"Usage: Aggregate.py <app-name> <stream-name> <endpoint-url> <region-name> <kinesis-shards> <checkpoint-directory>",
file=sys.stderr)
sys.exit(-1)
appName, streamName, endpointUrl, regionName, shards, checkpoint = sys.argv[1:]
ssc = StreamingContext.getOrCreate(checkpoint,
lambda: createContext(appName, streamName, endpointUrl, regionName, shards))
ssc.start()
time.sleep(600)
#ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)
我的问题是,如果我以1分钟的批处理间隔读取kinesis数据流,它是否仍然能够创建10分钟的水印,或者我是否需要配置10分钟的数据流窗口?
暂无答案!
目前还没有任何答案,快来回答吧!