Redshift复制命令错误:无法通过AWS Glue Spark打开与OID 591923的关系

wr98u20j  于 2022-11-16  发布在  Apache
关注(0)|答案(1)|浏览(192)

有一个代码已经在生产中运行了6个月,它在给定数量的表中循环运行,并进行红移复制。它一直成功运行到10月31日,从11月1日到失败(对于一个特定的表;对于其它的运行良好)。

## Truncate and execute Copy command.
def ExecuteCopyCommand(TableList): 
    
    QueryIdDict = {}
    for TableName in TableList:
        SourcePath = f's3://{BucketName}/{prefix}'  
        query = f" truncate table {TableName}; \
                    copy {TableName} \
                    from '{SourcePath}' \
                    iam_role 'abcd' \
                    delimiter as '.' \
                    ignoreheader 1 \
                    dateformat as 'auto' \
                    timeformat as 'auto' \
                    Null as 'NULL';"
        
        ## Executing truncate and copy command on redshift cluster
        try: 
            response = client.execute_statement(
                ClusterIdentifier='redshift-abc',
                Database='abc',
                SecretArn='arn:aws:secretsmanager:abcd',
                Sql= query
    
            )
            print(TableName + ": Copy command executed")
            print('Query',query)
            print('Response',response)
            QueryId = response['Id']
            QueryIdDict[QueryId] = TableName
            DataDict= { 'Level': 'Info',
                        'SourceLocation': SourcePath,
                        'TargetDatabaseName': 'redshift-abc',
                        'TargetSchemaName': str(TableName.split('.')[0]),
                        'TargetTableName': str(TableName.split('.')[1]),
                        'ExecutedQuery': query.strip(),
                        'ExecutedQueryId': str(QueryId),
                        'Description': 'Copy command executed on redshift and query is in progress.',
                        'Status': 'Succeeded' 
                       }
            DataList.append(DataDict)
            time.sleep(1)
        except Exception as e:
            DataDict= { 'Level': 'Error',
                        'SourceLocation': SourcePath,
                        'TargetDatabaseName': 'redshift-abc',
                        'TargetSchemaName': str(TableName.split('.')[0]),
                        'TargetTableName': str(TableName.split('.')[1]),
                        'ExecutedQuery': query.strip(),
                        'ExecutedQueryId': '',
                        'Description': f'Fail to execute copy command. Error : {str(e)}',
                        'Status': 'Failed' 
                      }
            DataList.append(DataDict)
            print('Error occur in  ExecuteCopyCommand block.')
            print('Error occur while executing copy command.')
            print('TableName : ' +  TableName)
            print(e)
            raise
    print('Query dict',QueryIdDict)        
    return QueryIdDict

下面的代码失败,并出现以下错误:
主要错误:Exception: ERROR: could not open relation with OID 591927
追溯:

test_table: Copy command executed
Query  truncate table test_table;                     copy test_table                     from 's3://bucket_test/pipeline/test_table/year=2022/month=02/day=28/'                     iam_role 'arn:aws:iam::xyz:role/Account-B-Glue-Redshift-Cloudwatch'                     delimiter as '.'                     ignoreheader 1                     dateformat as 'auto'                     timeformat as 'auto'                     Null as 'NULL';
Response {'ClusterIdentifier': 'redshift-abc', 'CreatedAt': datetime.datetime(2022, 11, 10, 6, 21, 42, 363000, tzinfo=tzlocal()), 'Database': 'abc', 'Id': 'abcdcs-4878-446b-80e9-8d544860847a', 'SecretArn': 'arn:aws:secretsmanager:abcd', 'ResponseMetadata': {'RequestId': '690f6542-4e33-4d84-afb8-2f9ebc9af62e', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '690f6542-4e33-4d84-afb8-2f9ebc9af62e', 'content-type': 'application/x-amz-json-1.1', 'content-length': '231', 'date': 'Thu, 10 Nov 2022 06:21:42 GMT'}, 'RetryAttempts': 0}}
Query dict {'abcdcs-4878-446b-80e9-8d544860847a': 'test_table'}
QueryId of executed copy command

{'abcdcs-4878-446b-80e9-8d544860847a': 'test_table'}
Checking executed query status for each table.

test_table: Copy command failed

{'ClusterIdentifier': 'redshift-abc', 'CreatedAt': datetime.datetime(2022, 11, 10, 6, 21, 42, 363000, tzinfo=tzlocal()), 'Duration': -1, 'Error': 'ERROR: could not open relation with OID 591927', 'HasResultSet': False, 'Id': '9c6cb33c-4878-446b-80e9-8d544860847a', 'QueryString': " truncate table test_table;                     copy test_table                     from 's3://bucket_test/pipeline/test_table/year=2022/month=02/day=28/'                     iam_role ''                     delimiter as '\x01'                     ignoreheader 1                     dateformat as 'auto'                     timeformat as 'auto'                     Null as 'NULL';", 'RedshiftPid': 1073775000, 'RedshiftQueryId': 6553022, 'ResultRows': -1, 'ResultSize': -1, 'SecretArn': 'arn:aws:secretsmanager:abcd', 'Status': 'FAILED', 'UpdatedAt': datetime.datetime(2022, 11, 10, 6, 21, 42, 937000, tzinfo=tzlocal()), 'ResponseMetadata': {'RequestId': 'c77cb319-14d3-42fd-8c34-611dbd5a17b4', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'c77cb319-14d3-42fd-8c34-611dbd5a17b4', 'content-type': 'application/x-amz-json-1.1', 'content-length': '890', 'date': 'Thu, 10 Nov 2022 06:22:13 GMT'}, 'RetryAttempts': 0}}
Error occur in CheckQueryStatus block
ERROR: could not open relation with OID 591927
Error occur in main block.
Fail to refresh table in redshift.
{'MessageId': 'eb6338b8-cd1d-5d47-8a63-635e57fee266', 'ResponseMetadata': {'RequestId': '60766afd-c861-5c1d-9d61-311b5282333c', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '60766afd-c861-5c1d-9d61-311b5282333c', 'content-type': 'text/xml', 'content-length': '294', 'date': 'Thu, 10 Nov 2022 06:22:26 GMT'}, 'RetryAttempts': 0}}
Email Notification sent to respective e-mail id.
ERROR: could not open relation with OID 591927

该错误由CheckQueryStatus函数引发,如下所示:

## Check executed query status.
def CheckQueryStatus(QueryIdDict):
    InprogressQueryIdList = [key for key in QueryIdDict.keys()]
    SucceedTableList = []
   
   ## Expected Status of running query 
    FailStatus = ['ABORTED','FAILED']
    InprogressStatus = ['SUBMITTED','PICKED','STARTED']
    SucceedStatus = ['FINISHED']
    
    try:
        while len(InprogressQueryIdList):
            for QueryId in InprogressQueryIdList:
                response = client.describe_statement(
                    Id=QueryId
                )
                if response['Status'] in SucceedStatus:
                    SucceedTableList.append(QueryIdDict[QueryId])
                    InprogressQueryIdList.remove(QueryId)
                    print('Query Executed Sucessfully : ' + QueryIdDict[QueryId])
                    SourcePath = f's3://{BucketName}/pipeline/{QueryIdDict[QueryId]}/{PathPrefix}/'
                    DataDict= { 'Level': 'Info',
                                'SourceLocation': SourcePath,
                                'TargetDatabaseName': 'abc',
                                'TargetSchemaName': str(QueryIdDict[QueryId].split('.')[0]),
                                'TargetTableName': str(QueryIdDict[QueryId].split('.')[1]),
                                'ExecutedQuery': '',
                                'ExecutedQueryId': str(QueryId),
                                'Description': 'Data loaded successfully in staging table',
                                'Status': 'Succeed' 
                              }
                    DataList.append(DataDict)
                elif response['Status'] in InprogressStatus:
                    time.sleep(30)
                else:
                    print(QueryIdDict[QueryId] + ': Copy command failed\n')
                    print(response)
                    raise Exception(str(response['Error']))
                
        print('Table refreshed successfully\n')
        print(SucceedTableList)
    except Exception as e:
        SourcePath = f's3://{BucketName}/pipeline/{QueryIdDict[QueryId]}/{PathPrefix}/'
        DataDict= { 'Level': 'Error',
                    'SourceLocation': SourcePath,
                    'TargetDatabaseName': 'abc',
                    'TargetSchemaName': str(QueryIdDict[QueryId].split('.')[0]),
                    'TargetTableName': str(QueryIdDict[QueryId].split('.')[1]),
                    'ExecutedQuery': '',
                    'ExecutedQueryId': str(QueryId),
                    'Description': f'Copy command failed.{response["Error"]}',
                    'Status': 'Failed' 
                  }
        DataList.append(DataDict)
        print('Error occur in CheckQueryStatus block')
        print(e)
        raise

现在:
1.当我从DBeaver或其他查询工具运行相同的复制命令时,它工作得非常好。
1.当我为其他表运行此代码时,完全相同的代码,它工作正常。只有此表失败。
1.创建了一个测试表,以查看这是否不是typical Postgres OID bug,但该错误可能会被复制。
这让我很困惑。有什么帮助吗?

nhhxz33t

nhhxz33t1#

此错误通常是由过时的表信息和正在删除目标表(并可能重新创建同名的新表)的其他进程引起的。

相关问题