diff --git a/internal/common/common.go b/internal/common/common.go index 1aff75c..ac34f0c 100644 --- a/internal/common/common.go +++ b/internal/common/common.go @@ -24,6 +24,8 @@ import ( "github.com/pkg/errors" "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" + "github.com/sorintlab/agola/internal/objectstorage/posix" + "github.com/sorintlab/agola/internal/objectstorage/s3" "github.com/sorintlab/agola/internal/services/config" "go.uber.org/zap" ) @@ -81,7 +83,7 @@ func NewObjectStorage(c *config.ObjectStorage) (*objectstorage.ObjStorage, error switch c.Type { case config.ObjectStorageTypePosix: - ost, err = objectstorage.NewPosixStorage(c.Path) + ost, err = posix.New(c.Path) if err != nil { return nil, errors.Wrapf(err, "failed to create posix object storage") } @@ -100,7 +102,7 @@ func NewObjectStorage(c *config.ObjectStorage) (*objectstorage.ObjStorage, error return nil, errors.Errorf("wrong s3 endpoint scheme %q (must be http or https)", u.Scheme) } } - ost, err = objectstorage.NewS3Storage(c.Bucket, c.Location, endpoint, c.AccessKey, c.SecretAccessKey, secure) + ost, err = s3.New(c.Bucket, c.Location, endpoint, c.AccessKey, c.SecretAccessKey, secure) if err != nil { return nil, errors.Wrapf(err, "failed to create s3 object storage") } diff --git a/internal/datamanager/data.go b/internal/datamanager/data.go index b7eaca7..29e1fbe 100644 --- a/internal/datamanager/data.go +++ b/internal/datamanager/data.go @@ -23,7 +23,7 @@ import ( "strings" "github.com/pkg/errors" - "github.com/sorintlab/agola/internal/objectstorage" + ostypes "github.com/sorintlab/agola/internal/objectstorage/types" "github.com/sorintlab/agola/internal/sequence" ) @@ -98,19 +98,19 @@ func (d *DataManager) writeData(ctx context.Context, wals []*WalData) error { func (d *DataManager) writeDataType(ctx context.Context, wals []*WalData, datatype, dataSequence string) error { curDataStatus, err := d.GetLastDataStatus() - if err != nil && err != objectstorage.ErrNotExist { + if err != nil && err != ostypes.ErrNotExist { return err } dataEntriesMap := map[string]*DataEntry{} - if err != objectstorage.ErrNotExist { + if err != ostypes.ErrNotExist { curDataSequence := curDataStatus.DataSequence oldDataf, err := d.ost.ReadObject(dataFilePath(datatype, curDataSequence)) - if err != nil && err != objectstorage.ErrNotExist { + if err != nil && err != ostypes.ErrNotExist { return err } - if err != objectstorage.ErrNotExist { + if err != ostypes.ErrNotExist { dec := json.NewDecoder(oldDataf) for { var de *DataEntry @@ -240,7 +240,7 @@ func (d *DataManager) Read(dataType, id string) (io.Reader, error) { pos, ok := dataFileIndex.Index[id] if !ok { - return nil, objectstorage.ErrNotExist + return nil, ostypes.ErrNotExist } dataf, err := d.ost.ReadObject(dataFilePath(dataType, dataSequence)) @@ -276,7 +276,7 @@ func (d *DataManager) GetLastDataStatusPath() (string, error) { } } if dataStatusPath == "" { - return "", objectstorage.ErrNotExist + return "", ostypes.ErrNotExist } return dataStatusPath, nil diff --git a/internal/datamanager/datamanager_test.go b/internal/datamanager/datamanager_test.go index 28a6060..ff6c2d4 100644 --- a/internal/datamanager/datamanager_test.go +++ b/internal/datamanager/datamanager_test.go @@ -24,6 +24,8 @@ import ( slog "github.com/sorintlab/agola/internal/log" "github.com/sorintlab/agola/internal/objectstorage" + "github.com/sorintlab/agola/internal/objectstorage/posix" + ostypes "github.com/sorintlab/agola/internal/objectstorage/types" "github.com/sorintlab/agola/internal/testutil" "go.uber.org/zap" @@ -76,7 +78,7 @@ func TestEtcdReset(t *testing.T) { ostDir, err := ioutil.TempDir(dir, "ost") - ost, err := objectstorage.NewPosixStorage(ostDir) + ost, err := posix.New(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -175,7 +177,7 @@ func TestConcurrentUpdate(t *testing.T) { ostDir, err := ioutil.TempDir(dir, "ost") - ost, err := objectstorage.NewPosixStorage(ostDir) + ost, err := posix.New(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -257,7 +259,7 @@ func TestWalCleaner(t *testing.T) { ostDir, err := ioutil.TempDir(dir, "ost") - ost, err := objectstorage.NewPosixStorage(ostDir) + ost, err := posix.New(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -317,7 +319,7 @@ func TestReadObject(t *testing.T) { ctx := context.Background() ostDir, err := ioutil.TempDir(dir, "ost") - ost, err := objectstorage.NewPosixStorage(ostDir) + ost, err := posix.New(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -386,8 +388,8 @@ func TestReadObject(t *testing.T) { // should not exists _, _, err = dm.ReadObject("datatype01", "object1", nil) - if err != objectstorage.ErrNotExist { - t.Fatalf("expected err %v, got: %v", objectstorage.ErrNotExist, err) + if err != ostypes.ErrNotExist { + t.Fatalf("expected err %v, got: %v", ostypes.ErrNotExist, err) } // should exist _, _, err = dm.ReadObject("datatype01", "object19", nil) @@ -406,8 +408,8 @@ func TestReadObject(t *testing.T) { // should not exists _, _, err = dm.ReadObject("datatype01", "object1", nil) - if err != objectstorage.ErrNotExist { - t.Fatalf("expected err %v, got: %v", objectstorage.ErrNotExist, err) + if err != ostypes.ErrNotExist { + t.Fatalf("expected err %v, got: %v", ostypes.ErrNotExist, err) } // should exist _, _, err = dm.ReadObject("datatype01", "object19", nil) diff --git a/internal/datamanager/wal.go b/internal/datamanager/wal.go index bd7e0c6..ba6d219 100644 --- a/internal/datamanager/wal.go +++ b/internal/datamanager/wal.go @@ -26,12 +26,12 @@ import ( "strings" "time" - uuid "github.com/satori/go.uuid" "github.com/sorintlab/agola/internal/etcd" - "github.com/sorintlab/agola/internal/objectstorage" + ostypes "github.com/sorintlab/agola/internal/objectstorage/types" "github.com/sorintlab/agola/internal/sequence" "github.com/pkg/errors" + uuid "github.com/satori/go.uuid" etcdclientv3 "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3/concurrency" etcdclientv3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" @@ -163,7 +163,7 @@ func (d *DataManager) changesList(paths []string, prefix, startWith string, recu func (d *DataManager) HasOSTWal(walseq string) (bool, error) { _, err := d.ost.Stat(d.storageWalStatusFile(walseq) + ".committed") - if err == objectstorage.ErrNotExist { + if err == ostypes.ErrNotExist { return false, nil } if err != nil { diff --git a/internal/objectstorage/common/atomic.go b/internal/objectstorage/common/atomic.go new file mode 100644 index 0000000..4b14bec --- /dev/null +++ b/internal/objectstorage/common/atomic.go @@ -0,0 +1,84 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "io" + "io/ioutil" + "os" + "path/filepath" + "strings" +) + +// WriteFileAtomicFunc atomically writes a file, it achieves this by creating a +// temporary file and then moving it. writeFunc is the func that will write +// data to the file. +// TODO(sgotti) remove left over tmp files if process crashes before calling +// os.Remove +func WriteFileAtomicFunc(p, baseDir, tmpDir string, perm os.FileMode, persist bool, writeFunc func(f io.Writer) error) error { + f, err := ioutil.TempFile(tmpDir, "tmpfile") + if err != nil { + return err + } + err = writeFunc(f) + if persist && err == nil { + err = f.Sync() + } + if closeErr := f.Close(); err == nil { + err = closeErr + } + if permErr := os.Chmod(f.Name(), perm); err == nil { + err = permErr + } + if err == nil { + err = os.Rename(f.Name(), p) + } + if err != nil { + os.Remove(f.Name()) + return err + } + + if !persist { + return nil + } + // sync parent dirs + pdir := filepath.Dir(p) + for { + if !strings.HasPrefix(pdir, baseDir) { + break + } + f, err := os.Open(pdir) + if err != nil { + f.Close() + return nil + } + if err := f.Sync(); err != nil { + f.Close() + return nil + } + f.Close() + + pdir = filepath.Dir(pdir) + } + return nil +} + +func WriteFileAtomic(filename, baseDir, tmpDir string, perm os.FileMode, persist bool, data []byte) error { + return WriteFileAtomicFunc(filename, baseDir, tmpDir, perm, persist, + func(f io.Writer) error { + _, err := f.Write(data) + return err + }) +} diff --git a/internal/objectstorage/objectstorage.go b/internal/objectstorage/objectstorage.go index 1555a1d..239a468 100644 --- a/internal/objectstorage/objectstorage.go +++ b/internal/objectstorage/objectstorage.go @@ -15,37 +15,17 @@ package objectstorage import ( - "errors" "io" - "time" + + "github.com/sorintlab/agola/internal/objectstorage/types" ) -// TODO(sgotti) -// define common errors (like notFound) so the implementations will return them -// instead of their own errors - -var ErrNotExist = errors.New("does not exist") - -type ReadSeekCloser interface { - io.Reader - io.Seeker - io.Closer -} - type Storage interface { - Stat(filepath string) (*ObjectInfo, error) - ReadObject(filepath string) (ReadSeekCloser, error) + Stat(filepath string) (*types.ObjectInfo, error) + ReadObject(filepath string) (types.ReadSeekCloser, error) WriteObject(filepath string, data io.Reader, size int64, persist bool) error DeleteObject(filepath string) error - List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo -} - -type ObjectInfo struct { - Path string - - LastModified time.Time - - Err error + List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan types.ObjectInfo } // ObjStorage wraps a Storage providing additional helper functions @@ -62,7 +42,7 @@ func (s *ObjStorage) Delimiter() string { return s.delimiter } -func (s *ObjStorage) List(prefix, startWith string, recursive bool, doneCh <-chan struct{}) <-chan ObjectInfo { +func (s *ObjStorage) List(prefix, startWith string, recursive bool, doneCh <-chan struct{}) <-chan types.ObjectInfo { delimiter := s.delimiter if recursive { delimiter = "" diff --git a/internal/objectstorage/objectstorage_test.go b/internal/objectstorage/objectstorage_test.go index c008da2..7f8bdbe 100644 --- a/internal/objectstorage/objectstorage_test.go +++ b/internal/objectstorage/objectstorage_test.go @@ -18,10 +18,15 @@ import ( "fmt" "io/ioutil" "os" + "path" "path/filepath" "reflect" "strings" "testing" + + "github.com/sorintlab/agola/internal/objectstorage/posix" + "github.com/sorintlab/agola/internal/objectstorage/posixflat" + "github.com/sorintlab/agola/internal/objectstorage/s3" ) func TestList(t *testing.T) { @@ -31,12 +36,16 @@ func TestList(t *testing.T) { } defer os.RemoveAll(dir) - ls, err := NewPosixStorage(dir) + ps, err := posix.New(path.Join(dir, "posix")) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + pfs, err := posixflat.New(path.Join(dir, "posixflat")) if err != nil { t.Fatalf("unexpected err: %v", err) } - var s3 *S3Storage + var s3s *s3.S3Storage minioEndpoint := os.Getenv("MINIO_ENDPOINT") minioAccessKey := os.Getenv("MINIO_ACCESSKEY") minioSecretKey := os.Getenv("MINIO_SECRETKEY") @@ -44,7 +53,7 @@ func TestList(t *testing.T) { t.Logf("missing MINIO_ENDPOINT env, skipping tests with minio storage") } else { var err error - s3, err = NewS3Storage(filepath.Base(dir), "", minioEndpoint, minioAccessKey, minioSecretKey, false) + s3s, err = s3.New(filepath.Base(dir), "", minioEndpoint, minioAccessKey, minioSecretKey, false) if err != nil { t.Fatalf("err: %v", err) } @@ -62,7 +71,7 @@ func TestList(t *testing.T) { ops []listop }{ { - map[string]Storage{"local": ls}, + map[string]Storage{"posixflat": pfs}, []string{ // Minio (as of 20190201) IMHO is not real S3 since it tries to map to a // file system and not a flat namespace like S3. For this reason this test @@ -172,7 +181,7 @@ func TestList(t *testing.T) { }, }, { - map[string]Storage{"local": ls, "minio": s3}, + map[string]Storage{"posix": ps, "posixflat": pfs, "minio": s3s}, []string{ // These are multiple of 8 chars on purpose to test the filemarker behavior to // distinguish between a file or a directory when the files ends at the path @@ -252,7 +261,7 @@ func TestList(t *testing.T) { for sname, s := range tt.s { t.Run(fmt.Sprintf("test with storage type %s", sname), func(t *testing.T) { switch s := s.(type) { - case *S3Storage: + case *s3.S3Storage: if s == nil { t.SkipNow() } diff --git a/internal/objectstorage/posix/posix.go b/internal/objectstorage/posix/posix.go new file mode 100644 index 0000000..2e26227 --- /dev/null +++ b/internal/objectstorage/posix/posix.go @@ -0,0 +1,234 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package posix + +import ( + "io" + "os" + "path" + "path/filepath" + "strings" + + "github.com/pkg/errors" + "github.com/sorintlab/agola/internal/objectstorage/common" + "github.com/sorintlab/agola/internal/objectstorage/types" +) + +const ( + dataDirName = "data" + tmpDirName = "tmp" +) + +type PosixStorage struct { + dataDir string + tmpDir string +} + +func New(baseDir string) (*PosixStorage, error) { + if err := os.MkdirAll(baseDir, 0770); err != nil { + return nil, err + } + dataDir := filepath.Join(baseDir, dataDirName) + tmpDir := filepath.Join(baseDir, tmpDirName) + if err := os.MkdirAll(dataDir, 0770); err != nil { + return nil, errors.Wrapf(err, "failed to create data dir") + } + if err := os.MkdirAll(tmpDir, 0770); err != nil { + return nil, errors.Wrapf(err, "failed to create tmp dir") + } + return &PosixStorage{ + dataDir: dataDir, + tmpDir: tmpDir, + }, nil +} + +func (s *PosixStorage) fsPath(p string) (string, error) { + return filepath.Join(s.dataDir, p), nil +} + +func (s *PosixStorage) Stat(p string) (*types.ObjectInfo, error) { + fspath, err := s.fsPath(p) + if err != nil { + return nil, err + } + + fi, err := os.Stat(fspath) + if err != nil { + if os.IsNotExist(err) { + return nil, types.ErrNotExist + } + return nil, err + } + + return &types.ObjectInfo{Path: p, LastModified: fi.ModTime()}, nil +} + +func (s *PosixStorage) ReadObject(p string) (types.ReadSeekCloser, error) { + fspath, err := s.fsPath(p) + if err != nil { + return nil, err + } + + f, err := os.Open(fspath) + if err != nil && os.IsNotExist(err) { + return nil, types.ErrNotExist + } + return f, err +} + +func (s *PosixStorage) WriteObject(p string, data io.Reader, size int64, persist bool) error { + fspath, err := s.fsPath(p) + if err != nil { + return err + } + + if err := os.MkdirAll(path.Dir(fspath), 0770); err != nil { + return err + } + return common.WriteFileAtomicFunc(fspath, s.dataDir, s.tmpDir, 0660, persist, func(f io.Writer) error { + _, err := io.Copy(f, data) + return err + }) +} + +func (s *PosixStorage) DeleteObject(p string) error { + fspath, err := s.fsPath(p) + if err != nil { + return err + } + + if err := os.Remove(fspath); err != nil { + if os.IsNotExist(err) { + return types.ErrNotExist + } + return err + } + + // try to remove parent empty dirs + // TODO(sgotti) if this fails we ignore errors and the dirs will be left as + // empty, clean them asynchronously + pdir := filepath.Dir(fspath) + for { + if pdir == s.dataDir || !strings.HasPrefix(pdir, s.dataDir) { + break + } + f, err := os.Open(pdir) + if err != nil { + return nil + } + + _, err = f.Readdirnames(1) + if err == io.EOF { + f.Close() + if err := os.Remove(pdir); err != nil { + return nil + } + } else { + f.Close() + break + } + + pdir = filepath.Dir(pdir) + } + return nil +} + +func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan types.ObjectInfo { + objectCh := make(chan types.ObjectInfo, 1) + + if len(delimiter) > 1 { + objectCh <- types.ObjectInfo{Err: errors.Errorf("wrong delimiter %q", delimiter)} + return objectCh + } + + if startWith != "" && !strings.Contains(startWith, prefix) { + objectCh <- types.ObjectInfo{Err: errors.Errorf("wrong startwith value %q for prefix %q", startWith, prefix)} + return objectCh + } + + recursive := delimiter == "" + + // remove leading slash from prefix + if strings.HasPrefix(prefix, "/") { + prefix = strings.TrimPrefix(prefix, "/") + } + + fprefix := filepath.Join(s.dataDir, prefix) + root := filepath.Dir(fprefix) + if len(root) < len(s.dataDir) { + root = s.dataDir + } + + // remove leading slash + if strings.HasPrefix(startWith, "/") { + startWith = strings.TrimPrefix(startWith, "/") + } + + go func(objectCh chan<- types.ObjectInfo) { + defer close(objectCh) + err := filepath.Walk(root, func(ep string, info os.FileInfo, err error) error { + if err != nil && !os.IsNotExist(err) { + return err + } + if os.IsNotExist(err) { + return nil + } + p := ep + + // get the path with / separator + p = filepath.ToSlash(p) + + p, err = filepath.Rel(s.dataDir, p) + if err != nil { + return err + } + if !recursive && len(p) > len(prefix) { + rel := strings.TrimPrefix(p, prefix) + skip := strings.Contains(rel, delimiter) + + if info.IsDir() && skip { + return filepath.SkipDir + } + if skip { + return nil + } + } + + if info.IsDir() { + return nil + } + + if strings.HasPrefix(p, prefix) && p > startWith { + select { + // Send object content. + case objectCh <- types.ObjectInfo{Path: p, LastModified: info.ModTime()}: + // If receives done from the caller, return here. + case <-doneCh: + return io.EOF + } + } + + return nil + }) + if err != nil && err != io.EOF { + objectCh <- types.ObjectInfo{ + Err: err, + } + return + } + }(objectCh) + + return objectCh +} diff --git a/internal/objectstorage/posix/posix_test.go b/internal/objectstorage/posix/posix_test.go new file mode 100644 index 0000000..aca1a2e --- /dev/null +++ b/internal/objectstorage/posix/posix_test.go @@ -0,0 +1,60 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package posix + +import ( + "bytes" + "io/ioutil" + "os" + "path/filepath" + "testing" +) + +func TestDeleteObject(t *testing.T) { + objects := []string{"☺☺☺☺a☺☺☺☺☺☺b☺☺☺☺", "s3/is/nota/fil.fa", "s3/is/not/a/file///system/fi%l%%e01"} + + dir, err := ioutil.TempDir("", "objectstorage") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + //defer os.RemoveAll(dir) + + ls, err := New(dir) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + for _, obj := range objects { + if err := ls.WriteObject(obj, bytes.NewReader([]byte{}), 0, true); err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := ls.DeleteObject(obj); err != nil { + t.Fatalf("unexpected err: %v", err) + } + } + + // no files and directories should be left + bd, err := os.Open(filepath.Join(dir, dataDirName)) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + files, err := bd.Readdirnames(0) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if len(files) > 0 { + t.Fatalf("expected 0 files got %d files", len(files)) + } +} diff --git a/internal/objectstorage/posix.go b/internal/objectstorage/posixflat/posixflat.go similarity index 75% rename from internal/objectstorage/posix.go rename to internal/objectstorage/posixflat/posixflat.go index 75a687b..d37cb14 100644 --- a/internal/objectstorage/posix.go +++ b/internal/objectstorage/posixflat/posixflat.go @@ -12,11 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -package objectstorage +package posixflat import ( "io" - "io/ioutil" "os" "path" "path/filepath" @@ -24,6 +23,9 @@ import ( "strings" "unicode/utf8" + "github.com/sorintlab/agola/internal/objectstorage/common" + "github.com/sorintlab/agola/internal/objectstorage/types" + "github.com/pkg/errors" ) @@ -204,12 +206,12 @@ func unescape(s string) (string, bool, error) { return string(t), hasFileMarker, nil } -type PosixStorage struct { +type PosixFlatStorage struct { dataDir string tmpDir string } -func NewPosixStorage(baseDir string) (*PosixStorage, error) { +func New(baseDir string) (*PosixFlatStorage, error) { if err := os.MkdirAll(baseDir, 0770); err != nil { return nil, err } @@ -221,20 +223,20 @@ func NewPosixStorage(baseDir string) (*PosixStorage, error) { if err := os.MkdirAll(tmpDir, 0770); err != nil { return nil, errors.Wrapf(err, "failed to create tmp dir") } - return &PosixStorage{ + return &PosixFlatStorage{ dataDir: dataDir, tmpDir: tmpDir, }, nil } -func (s *PosixStorage) fsPath(p string) (string, error) { +func (s *PosixFlatStorage) fsPath(p string) (string, error) { if p == "" { return "", errors.Errorf("empty key name") } return filepath.Join(s.dataDir, escape(p)), nil } -func (s *PosixStorage) Stat(p string) (*ObjectInfo, error) { +func (s *PosixFlatStorage) Stat(p string) (*types.ObjectInfo, error) { fspath, err := s.fsPath(p) if err != nil { return nil, err @@ -243,15 +245,15 @@ func (s *PosixStorage) Stat(p string) (*ObjectInfo, error) { fi, err := os.Stat(fspath) if err != nil { if os.IsNotExist(err) { - return nil, ErrNotExist + return nil, types.ErrNotExist } return nil, err } - return &ObjectInfo{Path: p, LastModified: fi.ModTime()}, nil + return &types.ObjectInfo{Path: p, LastModified: fi.ModTime()}, nil } -func (s *PosixStorage) ReadObject(p string) (ReadSeekCloser, error) { +func (s *PosixFlatStorage) ReadObject(p string) (types.ReadSeekCloser, error) { fspath, err := s.fsPath(p) if err != nil { return nil, err @@ -259,12 +261,12 @@ func (s *PosixStorage) ReadObject(p string) (ReadSeekCloser, error) { f, err := os.Open(fspath) if err != nil && os.IsNotExist(err) { - return nil, ErrNotExist + return nil, types.ErrNotExist } return f, err } -func (s *PosixStorage) WriteObject(p string, data io.Reader, size int64, persist bool) error { +func (s *PosixFlatStorage) WriteObject(p string, data io.Reader, size int64, persist bool) error { fspath, err := s.fsPath(p) if err != nil { return err @@ -273,13 +275,13 @@ func (s *PosixStorage) WriteObject(p string, data io.Reader, size int64, persist if err := os.MkdirAll(path.Dir(fspath), 0770); err != nil { return err } - return s.WriteFileAtomicFunc(fspath, 0660, persist, func(f io.Writer) error { + return common.WriteFileAtomicFunc(fspath, s.dataDir, s.tmpDir, 0660, persist, func(f io.Writer) error { _, err := io.Copy(f, data) return err }) } -func (s *PosixStorage) DeleteObject(p string) error { +func (s *PosixFlatStorage) DeleteObject(p string) error { fspath, err := s.fsPath(p) if err != nil { return err @@ -287,7 +289,7 @@ func (s *PosixStorage) DeleteObject(p string) error { if err := os.Remove(fspath); err != nil { if os.IsNotExist(err) { - return ErrNotExist + return types.ErrNotExist } return err } @@ -321,16 +323,16 @@ func (s *PosixStorage) DeleteObject(p string) error { return nil } -func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo { - objectCh := make(chan ObjectInfo, 1) +func (s *PosixFlatStorage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan types.ObjectInfo { + objectCh := make(chan types.ObjectInfo, 1) if len(delimiter) > 1 { - objectCh <- ObjectInfo{Err: errors.Errorf("wrong delimiter %q", delimiter)} + objectCh <- types.ObjectInfo{Err: errors.Errorf("wrong delimiter %q", delimiter)} return objectCh } if startWith != "" && !strings.Contains(startWith, prefix) { - objectCh <- ObjectInfo{Err: errors.Errorf("wrong startwith value %q for prefix %q", startWith, prefix)} + objectCh <- types.ObjectInfo{Err: errors.Errorf("wrong startwith value %q for prefix %q", startWith, prefix)} return objectCh } @@ -352,7 +354,7 @@ func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan s startWith = strings.TrimPrefix(startWith, "/") } - go func(objectCh chan<- ObjectInfo) { + go func(objectCh chan<- types.ObjectInfo) { var prevp string defer close(objectCh) err := filepath.Walk(root, func(ep string, info os.FileInfo, err error) error { @@ -410,7 +412,7 @@ func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan s if p > prevp { select { // Send object content. - case objectCh <- ObjectInfo{Path: p, LastModified: info.ModTime()}: + case objectCh <- types.ObjectInfo{Path: p, LastModified: info.ModTime()}: // If receives done from the caller, return here. case <-doneCh: return io.EOF @@ -422,7 +424,7 @@ func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan s return nil }) if err != nil && err != io.EOF { - objectCh <- ObjectInfo{ + objectCh <- types.ObjectInfo{ Err: err, } return @@ -431,64 +433,3 @@ func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan s return objectCh } - -// WriteFileAtomicFunc atomically writes a file, it achieves this by creating a -// temporary file and then moving it. writeFunc is the func that will write -// data to the file. -// TODO(sgotti) remove left over tmp files if process crashes before calling -// os.Remove -func (s *PosixStorage) WriteFileAtomicFunc(p string, perm os.FileMode, persist bool, writeFunc func(f io.Writer) error) error { - f, err := ioutil.TempFile(s.tmpDir, "tmpfile") - if err != nil { - return err - } - err = writeFunc(f) - if persist && err == nil { - err = f.Sync() - } - if closeErr := f.Close(); err == nil { - err = closeErr - } - if permErr := os.Chmod(f.Name(), perm); err == nil { - err = permErr - } - if err == nil { - err = os.Rename(f.Name(), p) - } - if err != nil { - os.Remove(f.Name()) - return err - } - - if !persist { - return nil - } - // sync parent dirs - pdir := filepath.Dir(p) - for { - if !strings.HasPrefix(pdir, s.dataDir) { - break - } - f, err := os.Open(pdir) - if err != nil { - f.Close() - return nil - } - if err := f.Sync(); err != nil { - f.Close() - return nil - } - f.Close() - - pdir = filepath.Dir(pdir) - } - return nil -} - -func (s *PosixStorage) WriteFileAtomic(filename string, perm os.FileMode, persist bool, data []byte) error { - return s.WriteFileAtomicFunc(filename, perm, persist, - func(f io.Writer) error { - _, err := f.Write(data) - return err - }) -} diff --git a/internal/objectstorage/posix_test.go b/internal/objectstorage/posixflat/posixflat_test.go similarity index 98% rename from internal/objectstorage/posix_test.go rename to internal/objectstorage/posixflat/posixflat_test.go index 35f1255..4fc4328 100644 --- a/internal/objectstorage/posix_test.go +++ b/internal/objectstorage/posixflat/posixflat_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package objectstorage +package posixflat import ( "bytes" @@ -86,7 +86,7 @@ func TestDeleteObject(t *testing.T) { } //defer os.RemoveAll(dir) - ls, err := NewPosixStorage(dir) + ls, err := New(dir) if err != nil { t.Fatalf("unexpected err: %v", err) } diff --git a/internal/objectstorage/s3.go b/internal/objectstorage/s3/s3.go similarity index 86% rename from internal/objectstorage/s3.go rename to internal/objectstorage/s3/s3.go index b94ba23..fc9dc61 100644 --- a/internal/objectstorage/s3.go +++ b/internal/objectstorage/s3/s3.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package objectstorage +package s3 import ( "io" @@ -21,6 +21,8 @@ import ( "os" "strings" + "github.com/sorintlab/agola/internal/objectstorage/types" + minio "github.com/minio/minio-go" "github.com/pkg/errors" ) @@ -32,7 +34,7 @@ type S3Storage struct { minioCore *minio.Core } -func NewS3Storage(bucket, location, endpoint, accessKeyID, secretAccessKey string, secure bool) (*S3Storage, error) { +func New(bucket, location, endpoint, accessKeyID, secretAccessKey string, secure bool) (*S3Storage, error) { minioClient, err := minio.New(endpoint, accessKeyID, secretAccessKey, secure) if err != nil { return nil, err @@ -60,24 +62,24 @@ func NewS3Storage(bucket, location, endpoint, accessKeyID, secretAccessKey strin }, nil } -func (s *S3Storage) Stat(p string) (*ObjectInfo, error) { +func (s *S3Storage) Stat(p string) (*types.ObjectInfo, error) { oi, err := s.minioClient.StatObject(s.bucket, p, minio.StatObjectOptions{}) if err != nil { merr := minio.ToErrorResponse(err) if merr.StatusCode == http.StatusNotFound { - return nil, ErrNotExist + return nil, types.ErrNotExist } return nil, merr } - return &ObjectInfo{Path: p, LastModified: oi.LastModified}, nil + return &types.ObjectInfo{Path: p, LastModified: oi.LastModified}, nil } -func (s *S3Storage) ReadObject(filepath string) (ReadSeekCloser, error) { +func (s *S3Storage) ReadObject(filepath string) (types.ReadSeekCloser, error) { if _, err := s.minioClient.StatObject(s.bucket, filepath, minio.StatObjectOptions{}); err != nil { merr := minio.ToErrorResponse(err) if merr.StatusCode == http.StatusNotFound { - return nil, ErrNotExist + return nil, types.ErrNotExist } return nil, merr } @@ -117,11 +119,11 @@ func (s *S3Storage) DeleteObject(filepath string) error { return s.minioClient.RemoveObject(s.bucket, filepath) } -func (s *S3Storage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo { - objectCh := make(chan ObjectInfo, 1) +func (s *S3Storage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan types.ObjectInfo { + objectCh := make(chan types.ObjectInfo, 1) if len(delimiter) > 1 { - objectCh <- ObjectInfo{ + objectCh <- types.ObjectInfo{ Err: errors.Errorf("wrong delimiter %q", delimiter), } return objectCh @@ -136,7 +138,7 @@ func (s *S3Storage) List(prefix, startWith, delimiter string, doneCh <-chan stru } // Initiate list objects goroutine here. - go func(objectCh chan<- ObjectInfo) { + go func(objectCh chan<- types.ObjectInfo) { defer close(objectCh) // Save continuationToken for next request. var continuationToken string @@ -144,7 +146,7 @@ func (s *S3Storage) List(prefix, startWith, delimiter string, doneCh <-chan stru // Get list of objects a maximum of 1000 per request. result, err := s.minioCore.ListObjectsV2(s.bucket, prefix, continuationToken, false, delimiter, 1000, startWith) if err != nil { - objectCh <- ObjectInfo{ + objectCh <- types.ObjectInfo{ Err: err, } return @@ -154,7 +156,7 @@ func (s *S3Storage) List(prefix, startWith, delimiter string, doneCh <-chan stru for _, object := range result.Contents { select { // Send object content. - case objectCh <- ObjectInfo{Path: object.Key, LastModified: object.LastModified}: + case objectCh <- types.ObjectInfo{Path: object.Key, LastModified: object.LastModified}: // If receives done from the caller, return here. case <-doneCh: return diff --git a/internal/objectstorage/types/types.go b/internal/objectstorage/types/types.go new file mode 100644 index 0000000..cd11a90 --- /dev/null +++ b/internal/objectstorage/types/types.go @@ -0,0 +1,41 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package types + +import ( + "errors" + "io" + "time" +) + +// TODO(sgotti) +// define common errors (like notFound) so the implementations will return them +// instead of their own errors + +var ErrNotExist = errors.New("does not exist") + +type ReadSeekCloser interface { + io.Reader + io.Seeker + io.Closer +} + +type ObjectInfo struct { + Path string + + LastModified time.Time + + Err error +} diff --git a/internal/services/configstore/readdb/readdb.go b/internal/services/configstore/readdb/readdb.go index 45b8f19..e48237e 100644 --- a/internal/services/configstore/readdb/readdb.go +++ b/internal/services/configstore/readdb/readdb.go @@ -28,6 +28,7 @@ import ( "github.com/sorintlab/agola/internal/db" "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" + ostypes "github.com/sorintlab/agola/internal/objectstorage/types" "github.com/sorintlab/agola/internal/sequence" "github.com/sorintlab/agola/internal/services/types" "github.com/sorintlab/agola/internal/util" @@ -125,10 +126,10 @@ func (r *ReadDB) ResetDB() error { func (r *ReadDB) SyncFromDump() (string, error) { dumpIndex, err := r.dm.GetLastDataStatus() - if err != nil && err != objectstorage.ErrNotExist { + if err != nil && err != ostypes.ErrNotExist { return "", errors.WithStack(err) } - if err == objectstorage.ErrNotExist { + if err == ostypes.ErrNotExist { return "", nil } for dataType, files := range dumpIndex.Files { diff --git a/internal/services/runservice/api/api.go b/internal/services/runservice/api/api.go index 926cd4f..5c6c5ba 100644 --- a/internal/services/runservice/api/api.go +++ b/internal/services/runservice/api/api.go @@ -26,6 +26,7 @@ import ( "github.com/sorintlab/agola/internal/db" "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" + ostypes "github.com/sorintlab/agola/internal/objectstorage/types" "github.com/sorintlab/agola/internal/services/runservice/action" "github.com/sorintlab/agola/internal/services/runservice/common" "github.com/sorintlab/agola/internal/services/runservice/readdb" @@ -198,7 +199,7 @@ func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, se } f, err := h.ost.ReadObject(logPath) if err != nil { - if err == objectstorage.ErrNotExist { + if err == ostypes.ErrNotExist { return common.NewErrNotExist(err), true } return err, true diff --git a/internal/services/runservice/api/executor.go b/internal/services/runservice/api/executor.go index fa126d1..7ca04d6 100644 --- a/internal/services/runservice/api/executor.go +++ b/internal/services/runservice/api/executor.go @@ -22,13 +22,15 @@ import ( "net/http" "strconv" - "github.com/gorilla/mux" "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" + ostypes "github.com/sorintlab/agola/internal/objectstorage/types" "github.com/sorintlab/agola/internal/services/runservice/action" "github.com/sorintlab/agola/internal/services/runservice/common" "github.com/sorintlab/agola/internal/services/runservice/store" "github.com/sorintlab/agola/internal/services/runservice/types" + + "github.com/gorilla/mux" "go.uber.org/zap" ) @@ -244,7 +246,7 @@ func (h *ArchivesHandler) readArchive(rtID string, step int, w io.Writer) error archivePath := store.OSTRunTaskArchivePath(rtID, step) f, err := h.ost.ReadObject(archivePath) if err != nil { - if err == objectstorage.ErrNotExist { + if err == ostypes.ErrNotExist { return common.NewErrNotExist(err) } return err @@ -321,7 +323,7 @@ func matchCache(ost *objectstorage.ObjStorage, key string, prefix bool) (string, defer close(doneCh) // get the latest modified object - var lastObject *objectstorage.ObjectInfo + var lastObject *ostypes.ObjectInfo for object := range ost.List(store.OSTCacheDir()+"/"+key, "", false, doneCh) { if object.Err != nil { return "", object.Err @@ -340,7 +342,7 @@ func matchCache(ost *objectstorage.ObjStorage, key string, prefix bool) (string, } _, err := ost.Stat(cachePath) - if err == objectstorage.ErrNotExist { + if err == ostypes.ErrNotExist { return "", nil } if err != nil { @@ -353,7 +355,7 @@ func (h *CacheHandler) readCache(key string, w io.Writer) error { cachePath := store.OSTCachePath(key) f, err := h.ost.ReadObject(cachePath) if err != nil { - if err == objectstorage.ErrNotExist { + if err == ostypes.ErrNotExist { return common.NewErrNotExist(err) } return err diff --git a/internal/services/runservice/readdb/readdb.go b/internal/services/runservice/readdb/readdb.go index 95079a9..042a80a 100644 --- a/internal/services/runservice/readdb/readdb.go +++ b/internal/services/runservice/readdb/readdb.go @@ -32,6 +32,7 @@ import ( "github.com/sorintlab/agola/internal/db" "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" + ostypes "github.com/sorintlab/agola/internal/objectstorage/types" "github.com/sorintlab/agola/internal/sequence" "github.com/sorintlab/agola/internal/services/runservice/common" "github.com/sorintlab/agola/internal/services/runservice/store" @@ -647,10 +648,10 @@ func (r *ReadDB) SyncObjectStorage(ctx context.Context) error { func (r *ReadDB) SyncFromDump() (string, error) { dumpIndex, err := r.dm.GetLastDataStatus() - if err != nil && err != objectstorage.ErrNotExist { + if err != nil && err != ostypes.ErrNotExist { return "", errors.WithStack(err) } - if err == objectstorage.ErrNotExist { + if err == ostypes.ErrNotExist { return "", nil } for dataType, files := range dumpIndex.Files { diff --git a/internal/services/runservice/scheduler.go b/internal/services/runservice/scheduler.go index 14e863d..832834f 100644 --- a/internal/services/runservice/scheduler.go +++ b/internal/services/runservice/scheduler.go @@ -27,7 +27,7 @@ import ( "github.com/sorintlab/agola/internal/datamanager" "github.com/sorintlab/agola/internal/etcd" slog "github.com/sorintlab/agola/internal/log" - "github.com/sorintlab/agola/internal/objectstorage" + ostypes "github.com/sorintlab/agola/internal/objectstorage/types" "github.com/sorintlab/agola/internal/runconfig" "github.com/sorintlab/agola/internal/services/runservice/common" "github.com/sorintlab/agola/internal/services/runservice/store" @@ -862,7 +862,7 @@ func (s *Runservice) runTasksUpdater(ctx context.Context) error { func (s *Runservice) OSTFileExists(path string) (bool, error) { _, err := s.ost.Stat(path) - if err != nil && err != objectstorage.ErrNotExist { + if err != nil && err != ostypes.ErrNotExist { return false, err } return err == nil, nil @@ -1363,7 +1363,7 @@ func (s *Runservice) cacheCleaner(ctx context.Context, cacheExpireInterval time. } if object.LastModified.Add(cacheExpireInterval).Before(time.Now()) { if err := s.ost.DeleteObject(object.Path); err != nil { - if err != objectstorage.ErrNotExist { + if err != ostypes.ErrNotExist { log.Warnf("failed to delete cache object %q: %v", object.Path, err) } } diff --git a/internal/services/runservice/store/store.go b/internal/services/runservice/store/store.go index 93eb579..8370593 100644 --- a/internal/services/runservice/store/store.go +++ b/internal/services/runservice/store/store.go @@ -24,7 +24,7 @@ import ( "github.com/sorintlab/agola/internal/datamanager" "github.com/sorintlab/agola/internal/etcd" - "github.com/sorintlab/agola/internal/objectstorage" + ostypes "github.com/sorintlab/agola/internal/objectstorage/types" "github.com/sorintlab/agola/internal/services/runservice/common" "github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/util" @@ -540,7 +540,7 @@ func GetRunEtcdOrOST(ctx context.Context, e *etcd.Store, dm *datamanager.DataMan } if r == nil { r, err = OSTGetRun(dm, runID) - if err != nil && err != objectstorage.ErrNotExist { + if err != nil && err != ostypes.ErrNotExist { return nil, err } }