如何使用golang执行AWS S3 Multipart Copy

kxxlusnw  于 2023-08-01  发布在  Go
关注(0)|答案(3)|浏览(130)

我正在查看S3复制对象函数的AWS golang文档,它包含以下处理大文件上传的详细信息
但是,要复制大于5GB的对象,必须使用multipart upload Upload Part - >Copy API。有关详细信息,请参阅使用REST Multipart Upload API >(https://docs.aws.amazon.com/AmazonS3/latest/dev/CopyingObjctsUsingRESTMPUapi.html)复制对象。
当我点击该链接时,它只包含Java和.Net x1c 0d1x的代码示例
我是不是缺少了一些文档/示例来说明如何使用golang客户端在S3中复制一个现有的大文件?

hfyxw5xn

hfyxw5xn1#

另外两种解决方案都很慢。使用goroutines可以大大加快这一速度,因为大部分等待时间都在AWS端。
这里是Mike提供的V1解决方案的副本,但是使用了goroutines(并正确地获取了fileSize,这似乎是他遗漏的东西)。

// constant for number of bits in 5 megabyte chunk
const max_part_size = 5 * 1024 * 1024

// helper function to build the string for the range of bits to copy
func buildCopySourceRange(start int64, objectSize int64) string {
    end := start + max_part_size - 1
    if end > objectSize {
        end = objectSize - 1
    }
    startRange := strconv.FormatInt(start, 10)
    stopRange := strconv.FormatInt(end, 10)
    return "bytes=" + startRange + "-" + stopRange
}

// function that starts, perform each part upload, and completes the copy
func MultiPartCopy(sess *session.Session, sourceBucket string, sourceKey string, destBucket string, destKey string) error {
    svc := s3.New(sess, aws.NewConfig().WithRegion(S3_REGION))

    ctx, cancelFn := context.WithTimeout(context.TODO(), 10*time.Minute)
    defer cancelFn()

    headInput := s3.HeadObjectInput{
        Bucket:    aws.String(sourceBucket),
        Key:       aws.String(sourceKey),
    }

    result, err := svc.HeadObject(&headInput)
    if err != nil {
        return err
    }
    fileSize := *result.ContentLength

    //send command to start copy and get the upload id as it is needed later
    var uploadId string
    startInput := s3.CreateMultipartUploadInput{
        Bucket: &destBucket,
        Key:    &destKey,
    }
    createOutput, err := svc.CreateMultipartUploadWithContext(ctx, &startInput)
    if err != nil {
        return err
    }
    if createOutput != nil {
        if createOutput.UploadId != nil {
            uploadId = *createOutput.UploadId
        }
    }
    if uploadId == "" {
        return errors.New("No upload id found in start upload request")
    }

    var i int64
    var partNumberCounter int64 = 1
    copySource := "/" + sourceBucket + "/" + sourceKey
    numUploads := (fileSize + max_part_size - 1) / max_part_size
    fmt.Println("Will attempt upload in ", numUploads, " number of parts to ", destKey)

    type Result struct {
        Error      error
        Part       *s3.CompletedPart
        PartNumber int64
    }
    var wg sync.WaitGroup
    ch := make(chan Result, numUploads)

    count := 0
    for i = 0; i < fileSize; i += max_part_size {
        wg.Add(1)
        count += 1
        go func(wg *sync.WaitGroup, partStart int64, partNumber int64) {
            defer wg.Done()
            var err error
            var part *s3.CompletedPart
            copyRange := buildCopySourceRange(partStart, fileSize)
            partInput := s3.UploadPartCopyInput{
                Bucket:          &destBucket,
                CopySource:      &copySource,
                CopySourceRange: &copyRange,
                Key:             &destKey,
                PartNumber:      &partNumber,
                UploadId:        &uploadId,
            }
            fmt.Println("Attempting to upload part", partNumber, "range:", copyRange)
            partResp, err := svc.UploadPartCopy(&partInput)

            if err != nil {
                err = fmt.Errorf("Error uploading part %d : %w", partNumber, err)
            } else if partResp != nil {
                //copy etag and part number from response as it is needed for completion
                partNum := partNumber
                etag := strings.Trim(*partResp.CopyPartResult.ETag, "\"")
                cPart := s3.CompletedPart{
                    ETag:       &etag,
                    PartNumber: &partNum,
                }
                fmt.Println("Successfully uploaded part", partNumber, "of", uploadId)
                part = &cPart
            }
            ch <- Result{Error: err, Part: part, PartNumber: partNumber}
        }(&wg, i, partNumberCounter)
        partNumberCounter += 1
    }

    wg.Wait()
    close(ch)
    orderedParts := make([]*s3.CompletedPart, numUploads)
    for r := range ch {
        if r.Error != nil {
            fmt.Println("Attempting to abort upload")
            abortIn := s3.AbortMultipartUploadInput{
                UploadId: &uploadId,
            }
            //ignoring any errors with aborting the copy
            svc.AbortMultipartUploadRequest(&abortIn)
            return r.Error

        }
        if r.Part != nil {
            orderedParts[r.PartNumber-1] = r.Part
        }
    }

    // Filter out any nils from the parts list
    parts := make([]*s3.CompletedPart, 0)
    for _, part := range orderedParts {
        if part != nil {
            parts = append(parts, part)
        }

    }

    //create struct for completing the upload
    mpu := s3.CompletedMultipartUpload{
        Parts: parts,
    }

    //complete actual upload
    //does not actually copy if the complete command is not received
    complete := s3.CompleteMultipartUploadInput{
        Bucket:          &destBucket,
        Key:             &destKey,
        UploadId:        &uploadId,
        MultipartUpload: &mpu,
    }
    compOutput, err := svc.CompleteMultipartUpload(&complete)
    if err != nil {
        return fmt.Errorf("Error completing upload: %w", err)
    }
    if compOutput != nil {
        fmt.Println("Successfully copied Bucket:", sourceBucket, "Key:", sourceKey, "to Bucket:", destBucket, "Key:", destKey)
    }
    return nil
}

字符串

3pvhb19x

3pvhb19x2#

与@Mike的答案相同,但使用AWS-SDK-GO-V2

import (
    "logger"
    "context"
    "errors"
    "strconv"
    "strings"
    "time"
    "fmt"

    "github.com/aws/aws-sdk-go-v2/service/s3"
    "github.com/aws/aws-sdk-go-v2/service/s3/types"
)

//constant for number of bits in 5 megabyte chunk
const max_part_size = 5 * 1024 * 1024

var log *logger.Logger

//helper function to build the string for the range of bits to copy
func buildCopySourceRange(start int64, objectSize int64) string {
    end := start + max_part_size - 1
    if end > objectSize {
        end = objectSize - 1
    }
    startRange := strconv.FormatInt(start, 10)
    stopRange := strconv.FormatInt(end, 10)
    return "bytes=" + startRange + "-" + stopRange
}

//function that starts, perform each part upload, and completes the copy
func MultiPartCopy(svc *s3.Client, fileSize int64, sourceBucket string, sourceKey string, destBucket string, destKey string) error {
    log = logger.GetLogger()

    ctx, cancelFn := context.WithTimeout(context.TODO(), 10*time.Minute)
    defer cancelFn()

    //struct for starting a multipart upload
    startInput := s3.CreateMultipartUploadInput{
        Bucket: &destBucket,
        Key:    &destKey,
    }

    //send command to start copy and get the upload id as it is needed later
    var uploadId string
    createOutput, err := svc.CreateMultipartUpload(ctx, &startInput)
    if err != nil {
        return err
    }
    if createOutput != nil {
        if createOutput.UploadId != nil {
            uploadId = *createOutput.UploadId
        }
    }
    if uploadId == "" {
        return errors.New("No upload id found in start upload request")
    }

    var i int64
    var partNumber int32 = 1
    copySource := "/" + sourceBucket + "/" + sourceKey
    parts := make([]types.CompletedPart, 0)
    numUploads := fileSize / max_part_size
    log.Infof("Will attempt upload in %d number of parts to %s", numUploads, destKey)
    for i = 0; i < fileSize; i += max_part_size {
        copyRange := buildCopySourceRange(i, fileSize)
        partInput := s3.UploadPartCopyInput{
            Bucket:          &destBucket,
            CopySource:      &copySource,
            CopySourceRange: &copyRange,
            Key:             &destKey,
            PartNumber:      partNumber,
            UploadId:        &uploadId,
        }
        log.Debugf("Attempting to upload part %d range: %s", partNumber, copyRange)
        partResp, err := svc.UploadPartCopy(context.TODO(), &partInput)

        if err != nil {
            log.Error("Attempting to abort upload")
            abortIn := s3.AbortMultipartUploadInput{
                UploadId: &uploadId,
            }
            //ignoring any errors with aborting the copy
            svc.AbortMultipartUpload(context.TODO(), &abortIn)
            return fmt.Errorf("Error uploading part %d : %w", partNumber, err)
        }

        //copy etag and part number from response as it is needed for completion
        if partResp != nil {
            partNum := partNumber
            etag := strings.Trim(*partResp.CopyPartResult.ETag, "\"")
            cPart := types.CompletedPart{
                ETag:       &etag,
                PartNumber: partNum,
            }
            parts = append(parts, cPart)
            log.Debugf("Successfully upload part %d of %s", partNumber, uploadId)
        }
        partNumber++
        if partNumber%50 == 0 {
            log.Infof("Completed part %d of %d to %s", partNumber, numUploads, destKey)
        }
    }

    //create struct for completing the upload
    mpu := types.CompletedMultipartUpload{
        Parts: parts,
    }

    //complete actual upload
    //does not actually copy if the complete command is not received
    complete := s3.CompleteMultipartUploadInput{
        Bucket:          &destBucket,
        Key:             &destKey,
        UploadId:        &uploadId,
        MultipartUpload: &mpu,
    }
    compOutput, err := svc.CompleteMultipartUpload(context.TODO(), &complete)
    if err != nil {
        return fmt.Errorf("Error completing upload: %w", err)
    }
    if compOutput != nil {
        log.Infof("Successfully copied Bucket: %s Key: %s to Bucket: %s Key: %s", sourceBucket, sourceKey, destBucket, destKey)
    }
    return nil
}

字符串
@Mike一个问题您使用了AWS-SDK-GO-V2中没有的AbortMultipartUploadRequest,所以我使用了AbortMultipartUpload,希望不会造成太大的差异?

fafcakar

fafcakar3#

因此,它采取了一些实验,但我终于得到了多部分复制工作

//imports
import (
  "context"
  "strconv"
  "github.com/aws/aws-sdk-go/service/s3"
  log "github.com/sirupsen/logrus"
)

//constant for number of bits in 5 megabyte chunk
const max_part_size = 5 * 1024 * 1024

//helper function to build the string for the range of bits to copy
func buildCopySourceRange(start int64, objectSize int64) string {
    end := start + max_part_size - 1
    if end > objectSize {
        end = objectSize - 1
    }
    startRange := strconv.FormatInt(start, 10)
    stopRange := strconv.FormatInt(end, 10)
    return "bytes=" + startRange + "-" + stopRange
}

//function that starts, perform each part upload, and completes the copy
func MultiPartCopy(sess *session.Session, sourceBucket string, sourceKey string, destBucket string, destKey string) error {
    svc := s3.New(sess)

    ctx, cancelFn := context.WithTimeout(context.TODO(), 10*time.Minute)
    defer cancelFn()

    //struct for starting a multipart upload
    startInput := s3.CreateMultipartUploadInput{
        Bucket: &destBucket,
        Key:    &destKey,
    }

    //send command to start copy and get the upload id as it is needed later
    var uploadId string
    createOutput, err := svc.CreateMultipartUploadWithContext(ctx, &startInput)
    if err != nil {
        return err
    }
    if createOutput != nil {
        if createOutput.UploadId != nil {
            uploadId = *createOutput.UploadId
        }
    }
    if uploadId == "" {
        return errors.New("No upload id found in start upload request")
    }

    var i int64
    var partNumber int64 = 1
    copySource := "/" + sourceBucket + "/" + sourceKey
    parts := make([]*s3.CompletedPart, 0)
    numUploads := fileSize / max_part_size
    log.Infof("Will attempt upload in %d number of parts to %s", numUploads, destKey)
    for i = 0; i < fileSize; i += max_part_size {
        copyRange := buildCopySourceRange(i, fileSize)
        partInput := s3.UploadPartCopyInput{
            Bucket:          &destBucket,
            CopySource:      &copySource,
            CopySourceRange: &copyRange,
            Key:             &destKey,
            PartNumber:      &partNumber,
            UploadId:        &uploadId,
        }
        log.Debugf("Attempting to upload part %d range: %s", partNumber, copyRange)
        partResp, err := svc.UploadPartCopy(&partInput)

        if err != nil {
            log.Error("Attempting to abort upload")
            abortIn := s3.AbortMultipartUploadInput{
                UploadId: &uploadId,
            }
            //ignoring any errors with aborting the copy
            svc.AbortMultipartUploadRequest(&abortIn)
            return fmt.Errorf("Error uploading part %d : %w", partNumber, err)
        }

        //copy etag and part number from response as it is needed for completion
        if partResp != nil {
            partNum := partNumber
            etag := strings.Trim(*partResp.CopyPartResult.ETag, "\"")
            cPart := s3.CompletedPart{
                ETag:       &etag,
                PartNumber: &partNum,
            }
            parts = append(parts, &cPart)
            log.Debugf("Successfully upload part %d of %s", partNumber, uploadId)
        }
        partNumber++
        if partNumber%50 == 0 {
            log.Infof("Completed part %d of %d to %s", partNumber, numUploads, destKey)
        }
    }

    //create struct for completing the upload
    mpu := s3.CompletedMultipartUpload{
        Parts: parts,
    }

    //complete actual upload
    //does not actually copy if the complete command is not received
    complete := s3.CompleteMultipartUploadInput{
        Bucket:          &destBucket,
        Key:             &destKey,
        UploadId:        &uploadId,
        MultipartUpload: &mpu,
    }
    compOutput, err := svc.CompleteMultipartUpload(&complete)
    if err != nil {
        return fmt.Errorf("Error completing upload: %w", err)
    }
    if compOutput != nil {
        log.Infof("Successfully copied Bucket: %s Key: %s to Bucket: %s Key: %s", sourceBucket, sourceKey, destBucket, destKey)
    }
    return nil
}

字符串

相关问题