golang实现可中断的流式下载

最近有一个需要实现下载功能:

从服务器上读取文件,返回一个ReadCloser在用户磁盘上创建文件,通过io.Copy实现文件下载(io.Copy是流式的操作,不会出现因文件过大而内存暴涨的问题)通过context实现暂停

1 流式下载:io.Copy

这里拷贝文件我们选择的是io.Copy而非是通过ioutil.ReadAll()将body中返回的数据一次性读取到内存

通过io.Copy可以保证内存占用一直处于一个比较稳定的水平

2 可中断:context

通过封装io.Copy实现

将io.Copy封装为一个方法,方法里传入context,外部通过context.WithCancel()控制流式拷贝的暂停

3 全部代码

这里演示我通过读取S3的一个对象下载到本地

/*

通过io.Copy实现可中断的流复制

*/

var (

ak = "99999999999999999999"

sk = "9999999999999999999999999999999999999999"

endpoint = "http://xx.xx.xx.xx:8060"

bucket = "test-bucket"

key = "d_xp/2G/2G.txt"

)

func main() {

s3Client := osg.Client.GetS3Client(ak, sk, endpoint)

ctx, cancelFunc := context.WithCancel(context.Background())

object, err := s3Client.GetObject(ctx, &s3.GetObjectInput{

Bucket: aws.String(bucket),

Key: aws.String(key),

})

go func() {

time.Sleep(time.Second * 10)

cancelFunc()

log.Infof("canceled...")

}()

if err != nil {

log.Errorf("%v", err)

return

}

body := object.Body

defer body.Close()

file, err := os.Create("/Users/ziyi/GolandProjects/MyTest/demo_home/io_demo/target.txt")

if err != nil {

log.Errorf("%v", err)

return

}

defer file.Close()

_, err = FileService.Copy(ctx, file, body)

if err != nil {

log.Errorf("%v", err)

return

}

}

type fileService struct {

sem *semaphore.Weighted

}

var FileService = &fileService{

sem: semaphore.NewWeighted(1),

}

type IoCopyCancelledErr struct {

errMsg string

}

func (e *IoCopyCancelledErr) Error() string {

return fmt.Sprintf("io copy error, %s", e.errMsg)

}

func NewIoCopyCancelledErr(msg string) *IoCopyCancelledErr {

return &IoCopyCancelledErr{

errMsg: msg,

}

}

type readerFunc func(p []byte) (n int, err error)

func (rf readerFunc) Read(p []byte) (n int, err error) { return rf(p) }

//通过ctx实现可中断的流拷贝

// Copy closable copy

func (s *fileService) Copy(ctx context.Context, dst io.Writer, src io.Reader) (int64, error) {

// Copy will call the Reader and Writer interface multiple time, in order

// to copy by chunk (avoiding loading the whole file in memory).

// I insert the ability to cancel before read time as it is the earliest

// possible in the call process.

size, err := io.Copy(dst, readerFunc(func(p []byte) (int, error) {

select {

// if context has been canceled

case <-ctx.Done():

// stop process and propagate "context canceled" error

return 0, NewIoCopyCancelledErr(ctx.Err().Error())

default:

// otherwise just run default io.Reader implementation

return src.Read(p)

}

}))

return size, err

}

文章来源

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: