mirror of
https://github.com/alist-org/gofakes3.git
synced 2025-12-24 12:58:04 +08:00
Some checks failed
build / go1.20 (push) Has been cancelled
build / go1.21 (push) Has been cancelled
build / linux (push) Has been cancelled
build / linux_386 (push) Has been cancelled
build / mac_amd64 (push) Has been cancelled
build / mac_arm64 (push) Has been cancelled
build / windows (push) Has been cancelled
build / lint (push) Has been cancelled
519 lines
15 KiB
Go
519 lines
15 KiB
Go
package gofakes3
|
|
|
|
import (
|
|
"crypto/md5"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"math/big"
|
|
"net/url"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/itsHenry35/gofakes3/internal/goskipiter"
|
|
"github.com/ryszard/goskiplist/skiplist"
|
|
)
|
|
|
|
var add1 = new(big.Int).SetInt64(1)
|
|
|
|
/*
|
|
bucketUploads maintains a map of buckets to the list of multipart uploads
|
|
for that bucket.
|
|
|
|
A skiplist that maps object keys to upload ids is also maintained to
|
|
support the ListMultipartUploads operation.
|
|
|
|
From the docs:
|
|
|
|
In the response, the uploads are sorted by key. If your application has
|
|
initiated more than one multipart upload using the same object key,
|
|
then uploads in the response are first sorted by key. Additionally,
|
|
uploads are sorted in ascending order within each key by the upload
|
|
initiation time.
|
|
|
|
It's ambiguous whether "sorted by key" means "sorted by the upload ID"
|
|
or "sorted by the object key". It's also ambiguous whether the docs mean
|
|
the sorting applies only within an individual page of results, or to the
|
|
whole result across all paginations. This is supported somewhat, though
|
|
not unambiguously, by the documentation for "key-marker" and
|
|
"upload-id-marker":
|
|
|
|
key-marker: Together with upload-id-marker, this parameter specifies the
|
|
multipart upload after which listing should begin.
|
|
|
|
If upload-id-marker is not specified, only the keys lexicographically
|
|
greater than the specified key-marker will be included in the list.
|
|
|
|
If upload-id-marker is specified, any multipart uploads for a key equal to
|
|
the key-marker might also be included, provided those multipart uploads
|
|
have upload IDs lexicographically greater than the specified
|
|
upload-id-marker.
|
|
|
|
upload-id-marker: Together with key-marker, specifies the multipart upload
|
|
after which listing should begin. If key-marker is not specified, the
|
|
upload-id-marker parameter is ignored.
|
|
|
|
This implementation assumes "sorted by key" means "sorted by the object
|
|
key" and that the sorting applies across the full pagination set.
|
|
|
|
The SkipList provides O(log n) performance, but the slices inside are
|
|
linear-time. This should provide an acceptable trade-off for simplicity;
|
|
on my 2013-era i7 machine, a simple linear search for the last element
|
|
in a 100,000 element array of 80-ish byte strings takes barely 1ms.
|
|
*/
|
|
type bucketUploads struct {
|
|
// uploads should be protected by the coarse lock in uploader:
|
|
uploads map[UploadID]*multipartUpload
|
|
|
|
// objectIndex provides sorted traversal of the bucket uploads.
|
|
//
|
|
// The keys in this skiplist are the object keys, the values are the slice
|
|
// of *multipartUpload structs associated with that key. The skiplist
|
|
// satisfies the map ordering constraint, the slice satisfies the upload
|
|
// initiation time constraint.
|
|
objectIndex *skiplist.SkipList // effectively map[ObjectKey][]*multipartUpload
|
|
}
|
|
|
|
func newBucketUploads() *bucketUploads {
|
|
return &bucketUploads{
|
|
uploads: map[UploadID]*multipartUpload{},
|
|
objectIndex: skiplist.NewStringMap(),
|
|
}
|
|
}
|
|
|
|
// add assumes uploader.mu is acquired
|
|
func (bu *bucketUploads) add(mpu *multipartUpload) {
|
|
bu.uploads[mpu.ID] = mpu
|
|
|
|
uploads, ok := bu.objectIndex.Get(mpu.Object)
|
|
if !ok {
|
|
uploads = []*multipartUpload{mpu}
|
|
} else {
|
|
uploads = append(uploads.([]*multipartUpload), mpu)
|
|
}
|
|
bu.objectIndex.Set(mpu.Object, uploads)
|
|
}
|
|
|
|
// remove assumes uploader.mu is acquired
|
|
func (bu *bucketUploads) remove(uploadID UploadID) {
|
|
upload := bu.uploads[uploadID]
|
|
delete(bu.uploads, uploadID)
|
|
|
|
var uploads []*multipartUpload
|
|
{
|
|
upv, ok := bu.objectIndex.Get(upload.Object)
|
|
if !ok || upv == nil {
|
|
return
|
|
}
|
|
uploads = upv.([]*multipartUpload)
|
|
}
|
|
|
|
var found = -1
|
|
var v *multipartUpload
|
|
for found, v = range uploads {
|
|
if v.ID == uploadID {
|
|
break
|
|
}
|
|
}
|
|
|
|
if found >= 0 {
|
|
uploads = append(uploads[:found], uploads[found+1:]...) // delete the found index
|
|
}
|
|
|
|
if len(uploads) == 0 {
|
|
bu.objectIndex.Delete(upload.Object)
|
|
} else {
|
|
bu.objectIndex.Set(upload.Object, uploads)
|
|
}
|
|
}
|
|
|
|
// uploader manages multipart uploads.
|
|
//
|
|
// Multipart upload support has the following rather severe limitations (which
|
|
// will hopefully be addressed in the future):
|
|
//
|
|
// - uploads do not interface with the Backend, so they do not
|
|
// currently persist across reboots
|
|
//
|
|
// - upload parts are held in memory, so if you want to upload something huge
|
|
// in multiple parts (which is pretty much exactly what you'd want multipart
|
|
// uploads for), you'll need to make sure your memory is also sufficiently
|
|
// huge!
|
|
//
|
|
// At this stage, the current thinking would be to add a second optional
|
|
// Backend interface that allows persistent operations on multipart upload
|
|
// data, and if a Backend does not implement it, this limited in-memory
|
|
// behaviour can be the fallback. If that can be made to work, it would provide
|
|
// good convenience for Backend implementers if their use case did not require
|
|
// persistent multipart upload handling, or it could be satisfied by this
|
|
// naive implementation.
|
|
type uploader struct {
|
|
// uploadIDs use a big.Int to allow unbounded IDs (not that you'd be
|
|
// expected to ever generate 4.2 billion of these but who are we to judge?)
|
|
uploadID *big.Int
|
|
|
|
buckets map[string]*bucketUploads
|
|
mu sync.Mutex
|
|
}
|
|
|
|
func newUploader() *uploader {
|
|
return &uploader{
|
|
buckets: make(map[string]*bucketUploads),
|
|
uploadID: new(big.Int),
|
|
}
|
|
}
|
|
|
|
func (u *uploader) Begin(bucket, object string, meta map[string]string, initiated time.Time) *multipartUpload {
|
|
u.mu.Lock()
|
|
defer u.mu.Unlock()
|
|
|
|
u.uploadID.Add(u.uploadID, add1)
|
|
|
|
mpu := &multipartUpload{
|
|
ID: UploadID(u.uploadID.String()),
|
|
Bucket: bucket,
|
|
Object: object,
|
|
Meta: meta,
|
|
Initiated: initiated,
|
|
}
|
|
|
|
// FIXME: make sure the uploader responds to DeleteBucket
|
|
bucketUploads := u.buckets[bucket]
|
|
if bucketUploads == nil {
|
|
u.buckets[bucket] = newBucketUploads()
|
|
bucketUploads = u.buckets[bucket]
|
|
}
|
|
|
|
bucketUploads.add(mpu)
|
|
|
|
return mpu
|
|
}
|
|
|
|
func (u *uploader) ListParts(bucket, object string, uploadID UploadID, marker int, limit int64) (*ListMultipartUploadPartsResult, error) {
|
|
u.mu.Lock()
|
|
defer u.mu.Unlock()
|
|
|
|
mpu, err := u.getUnlocked(bucket, object, uploadID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var result = ListMultipartUploadPartsResult{
|
|
Bucket: bucket,
|
|
Key: object,
|
|
UploadID: uploadID,
|
|
MaxParts: limit,
|
|
PartNumberMarker: marker,
|
|
StorageClass: "STANDARD", // FIXME
|
|
}
|
|
|
|
var cnt int64
|
|
for partNumber, part := range mpu.parts[marker:] {
|
|
if part == nil {
|
|
continue
|
|
}
|
|
|
|
if cnt >= limit {
|
|
result.IsTruncated = true
|
|
result.NextPartNumberMarker = partNumber
|
|
break
|
|
}
|
|
|
|
result.Parts = append(result.Parts, ListMultipartUploadPartItem{
|
|
ETag: part.ETag,
|
|
Size: int64(len(part.Body)),
|
|
PartNumber: partNumber,
|
|
LastModified: part.LastModified,
|
|
})
|
|
|
|
cnt++
|
|
}
|
|
|
|
return &result, nil
|
|
}
|
|
|
|
func (u *uploader) List(bucket string, marker *UploadListMarker, prefix Prefix, limit int64) (*ListMultipartUploadsResult, error) {
|
|
u.mu.Lock()
|
|
defer u.mu.Unlock()
|
|
|
|
bucketUploads, ok := u.buckets[bucket]
|
|
if !ok {
|
|
return nil, ErrNoSuchUpload
|
|
}
|
|
|
|
var result = ListMultipartUploadsResult{
|
|
Bucket: bucket,
|
|
Delimiter: prefix.Delimiter,
|
|
Prefix: prefix.Prefix,
|
|
MaxUploads: limit,
|
|
}
|
|
|
|
// we only need to use the uploadID to start the page if one was actually
|
|
// supplied, otherwise assume we can start from the start of the iterator:
|
|
var firstFound = true
|
|
|
|
var iter = goskipiter.New(bucketUploads.objectIndex.Iterator())
|
|
if marker != nil {
|
|
iter.Seek(marker.Object)
|
|
firstFound = marker.UploadID == ""
|
|
result.UploadIDMarker = marker.UploadID
|
|
result.KeyMarker = marker.Object
|
|
}
|
|
|
|
// Indicates whether the returned list of multipart uploads is truncated.
|
|
// The list can be truncated if the number of multipart uploads exceeds
|
|
// the limit allowed or specified by MaxUploads.
|
|
//
|
|
// In our case, this could be because there are still objects left in the
|
|
// iterator, or because there are still uploadIDs left in the slice inside
|
|
// the iteration.
|
|
var truncated bool
|
|
|
|
var cnt int64
|
|
var seenPrefixes = map[string]bool{}
|
|
var match PrefixMatch
|
|
|
|
for iter.Next() {
|
|
object := iter.Key().(string)
|
|
uploads := iter.Value().([]*multipartUpload)
|
|
|
|
retry:
|
|
matched := prefix.Match(object, &match)
|
|
if !matched {
|
|
continue
|
|
}
|
|
|
|
if !firstFound {
|
|
for idx, mpu := range uploads {
|
|
if mpu.ID == marker.UploadID {
|
|
firstFound = true
|
|
uploads = uploads[idx:]
|
|
goto retry
|
|
}
|
|
}
|
|
|
|
} else {
|
|
if match.CommonPrefix {
|
|
if !seenPrefixes[match.MatchedPart] {
|
|
result.CommonPrefixes = append(result.CommonPrefixes, match.AsCommonPrefix())
|
|
seenPrefixes[match.MatchedPart] = true
|
|
}
|
|
|
|
} else {
|
|
for idx, upload := range uploads {
|
|
result.Uploads = append(result.Uploads, ListMultipartUploadItem{
|
|
StorageClass: "STANDARD", // FIXME
|
|
Key: object,
|
|
UploadID: upload.ID,
|
|
Initiated: ContentTime{Time: upload.Initiated},
|
|
})
|
|
|
|
cnt++
|
|
if cnt >= limit {
|
|
if idx != len(uploads)-1 { // if this is not the last iteration, we have truncated
|
|
truncated = true
|
|
result.NextUploadIDMarker = uploads[idx+1].ID
|
|
result.NextKeyMarker = object
|
|
}
|
|
goto done
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
done:
|
|
// If we did not truncate while in the middle of an object's upload ID list,
|
|
// we need to see if there are more objects in the outer iteration:
|
|
if !truncated {
|
|
for iter.Next() {
|
|
object := iter.Key().(string)
|
|
if matched := prefix.Match(object, &match); matched && !match.CommonPrefix {
|
|
truncated = true
|
|
|
|
// This is not especially defensive; it assumes the rest of the code works
|
|
// as it should. Could be something to clean up later:
|
|
result.NextUploadIDMarker = iter.Value().([]*multipartUpload)[0].ID
|
|
result.NextKeyMarker = object
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
result.IsTruncated = truncated
|
|
|
|
return &result, nil
|
|
}
|
|
|
|
func (u *uploader) Complete(bucket, object string, id UploadID) (*multipartUpload, error) {
|
|
u.mu.Lock()
|
|
defer u.mu.Unlock()
|
|
up, err := u.getUnlocked(bucket, object, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// if getUnlocked succeeded, so will this:
|
|
u.buckets[bucket].remove(id)
|
|
|
|
return up, nil
|
|
}
|
|
|
|
func (u *uploader) Get(bucket, object string, id UploadID) (mu *multipartUpload, err error) {
|
|
u.mu.Lock()
|
|
defer u.mu.Unlock()
|
|
return u.getUnlocked(bucket, object, id)
|
|
}
|
|
|
|
func (u *uploader) getUnlocked(bucket, object string, id UploadID) (mu *multipartUpload, err error) {
|
|
bucketUps, ok := u.buckets[bucket]
|
|
if !ok {
|
|
return nil, ErrNoSuchUpload
|
|
}
|
|
|
|
mu, ok = bucketUps.uploads[id]
|
|
if !ok {
|
|
return nil, ErrNoSuchUpload
|
|
}
|
|
|
|
if mu.Bucket != bucket || mu.Object != object {
|
|
// FIXME: investigate what AWS does here; essentially if you initiate a
|
|
// multipart upload at '/ObjectName1?uploads', then complete the upload
|
|
// at '/ObjectName2?uploads', what happens?
|
|
return nil, ErrNoSuchUpload
|
|
}
|
|
|
|
return mu, nil
|
|
}
|
|
|
|
// UploadListMarker is used to seek to the start of a page in a ListMultipartUploads operation.
|
|
type UploadListMarker struct {
|
|
// Represents the key-marker query parameter. Together with 'uploadID',
|
|
// this parameter specifies the multipart upload after which listing should
|
|
// begin.
|
|
//
|
|
// If 'uploadID' is not specified, only the keys lexicographically greater
|
|
// than the specified key-marker will be included in the list.
|
|
//
|
|
// If 'uploadID' is specified, any multipart uploads for a key equal to
|
|
// 'object' might also be included, provided those multipart uploads have
|
|
// upload IDs lexicographically greater than the specified uploadID.
|
|
Object string
|
|
|
|
// Represents the upload-id-marker query parameter to the
|
|
// ListMultipartUploads operation. Together with 'object', specifies the
|
|
// multipart upload after which listing should begin. If 'object' is not
|
|
// specified, the 'uploadID' parameter is ignored.
|
|
UploadID UploadID
|
|
}
|
|
|
|
// uploadListMarkerFromQuery collects the upload-id-marker and key-marker query parameters
|
|
// to the ListMultipartUploads operation.
|
|
func uploadListMarkerFromQuery(q url.Values) *UploadListMarker {
|
|
object := q.Get("key-marker")
|
|
if object == "" {
|
|
return nil
|
|
}
|
|
return &UploadListMarker{Object: object, UploadID: UploadID(q.Get("upload-id-marker"))}
|
|
}
|
|
|
|
type multipartUploadPart struct {
|
|
PartNumber int
|
|
ETag string
|
|
Body []byte
|
|
LastModified ContentTime
|
|
}
|
|
|
|
type multipartUpload struct {
|
|
ID UploadID
|
|
Bucket string
|
|
Object string
|
|
Meta map[string]string
|
|
Initiated time.Time
|
|
|
|
// Part numbers are limited in S3 to 10,000, so we can be a little wasteful.
|
|
// If a new part number is added, the slice is grown to that size. Depending
|
|
// on how bad the input is, this could mean you have a 10,000 element slice
|
|
// that is almost all nils. This shouldn't be a problem in practice.
|
|
//
|
|
// We need to use a slice here so we can get deterministic ordering in order
|
|
// to support pagination when listing the upload parts.
|
|
//
|
|
// The minimum part ID is 1, which means the first item in this slice will
|
|
// always be nil.
|
|
//
|
|
// Do not attempt to access parts without locking mu.
|
|
parts []*multipartUploadPart
|
|
|
|
mu sync.Mutex
|
|
}
|
|
|
|
func (mpu *multipartUpload) AddPart(partNumber int, at time.Time, body []byte) (etag string, err error) {
|
|
if partNumber > MaxUploadPartNumber {
|
|
return "", ErrInvalidPart
|
|
}
|
|
|
|
mpu.mu.Lock()
|
|
defer mpu.mu.Unlock()
|
|
|
|
// What the ETag actually is is not specified, so let's just invent any old thing
|
|
// from guaranteed unique input:
|
|
hash := md5.New()
|
|
hash.Write(body)
|
|
etag = fmt.Sprintf(`"%s"`, hex.EncodeToString(hash.Sum(nil)))
|
|
|
|
part := multipartUploadPart{
|
|
PartNumber: partNumber,
|
|
Body: body,
|
|
ETag: etag,
|
|
LastModified: NewContentTime(at),
|
|
}
|
|
if partNumber >= len(mpu.parts) {
|
|
mpu.parts = append(mpu.parts, make([]*multipartUploadPart, partNumber-len(mpu.parts)+1)...)
|
|
}
|
|
mpu.parts[partNumber] = &part
|
|
return etag, nil
|
|
}
|
|
|
|
func (mpu *multipartUpload) Reassemble(input *CompleteMultipartUploadRequest) (body []byte, etag string, err error) {
|
|
mpu.mu.Lock()
|
|
defer mpu.mu.Unlock()
|
|
|
|
mpuPartsLen := len(mpu.parts)
|
|
|
|
// FIXME: what does AWS do when mpu.Parts > input.Parts? Presumably you may
|
|
// end up uploading more parts than you need to assemble, so it should
|
|
// probably just ignore that?
|
|
if len(input.Parts) > mpuPartsLen {
|
|
return nil, "", ErrInvalidPart
|
|
}
|
|
|
|
if !input.partsAreSorted() {
|
|
return nil, "", ErrInvalidPartOrder
|
|
}
|
|
|
|
var size int64
|
|
|
|
for _, inPart := range input.Parts {
|
|
if inPart.PartNumber >= mpuPartsLen || mpu.parts[inPart.PartNumber] == nil {
|
|
return nil, "", ErrorMessagef(ErrInvalidPart, "unexpected part number %d in complete request", inPart.PartNumber)
|
|
}
|
|
|
|
upPart := mpu.parts[inPart.PartNumber]
|
|
if strings.Trim(inPart.ETag, "\"") != strings.Trim(upPart.ETag, "\"") {
|
|
return nil, "", ErrorMessagef(ErrInvalidPart, "unexpected part etag for number %d in complete request", inPart.PartNumber)
|
|
}
|
|
|
|
size += int64(len(upPart.Body))
|
|
}
|
|
|
|
body = make([]byte, 0, size)
|
|
for _, part := range input.Parts {
|
|
body = append(body, mpu.parts[part.PartNumber].Body...)
|
|
}
|
|
|
|
hash := fmt.Sprintf("%x", md5.Sum(body))
|
|
|
|
return body, hash, nil
|
|
}
|