使用spark读取动觉流时出现的问题失败,错误为“获取kinesis流的表状态失败”

gupuwyp2  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(380)

我试图从动觉流在Spark流程序如下所述读取

from __future__ import print_function
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
import sys,os
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

def f(iterator):
    print ('inside print',iterator)
    for x in iterator:
        print('rdd value is',x)

def printRecord(rdd):
    print("========================================================")
    print("Starting new RDD")
    print("========================================================")
    rdd.collect()
    rdd.foreach(f)

if __name__ == "__main__":
    reload(sys)  
    sys.setdefaultencoding('utf-8')
    sc = SparkContext(appName="PythonStreamingKinesisWordCountAsl")
    ssc = StreamingContext(sc, 10)
    AWS_ACCESS_KEY=os.environ['aws_key']
    AWS_SECRET_KEY=os.environ['aws_secret_key']
    sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", AWS_ACCESS_KEY)
    sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", AWS_SECRET_KEY)
    endpointUrl='https://kinesis.us-east-1.amazonaws.com'
    streamName='test-kinesis'
    appName='Kinesis-Streaming'
    regionName='us-east-1'
    #appName, streamName, endpointUrl, regionName = sys.argv[1:]
    regionName='us-east-1'
    lines = KinesisUtils.createStream(
        ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.TRIM_HORIZON, 10)
    counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a+b)
    counts.pprint()
    ssc.start()
    ssc.awaitTermination()

我看到保持运动轨迹的dynamodb并没有被创造出来。作业失败,出现以下错误

20/08/01 15:15:49 ERROR LeaseManager: Failed to get table status for Kinesis-Streaming
com.amazonaws.services.kinesis.leases.exceptions.DependencyException: com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: The security token included in the request is expired (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ExpiredTokenException; Request ID: 1VPKRUJIRSHASS1NL0OAOK62DFVV4KQNSO5AEMVJF66Q9ASUAAJG)
    at com.amazonaws.services.kinesis.leases.impl.LeaseManager.tableStatus(LeaseManager.java:163)
    at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:108)
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.initialize(KinesisClientLibLeaseCoordinator.java:329)
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.initialize(Worker.java:674)
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:614)
    at org.apache.spark.streaming.kinesis.KinesisReceiver$$anon$2.run(KinesisReceiver.scala:196)
Caused by: com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: The security token included in the request is expired (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ExpiredTokenException; Request ID: 1VPKRUJIRSHASS1NL0OAOK62DFVV4KQNSO5AEMVJF66Q9ASUAAJG)

请告诉我这里可能出了什么问题。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题