From b3867fb7cae11b05893594231da0ddf99d268f6c Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Tue, 21 May 2019 15:17:53 +0200 Subject: [PATCH] objectstorage: add posix standard storage rename the previous posix storage to posixflat and make it currently not user selectable (since I'm not sure it's really worth using it). The new posix storage uses the filesystem without any escaping so it's not a real flat namespace. This isn't a real issue since also minio is not a flat namespace and we are so forced to use it like a hierarchycal filesystem. --- internal/common/common.go | 6 +- internal/datamanager/data.go | 14 +- internal/datamanager/datamanager_test.go | 18 +- internal/datamanager/wal.go | 6 +- internal/objectstorage/common/atomic.go | 84 +++++++ internal/objectstorage/objectstorage.go | 32 +-- internal/objectstorage/objectstorage_test.go | 21 +- internal/objectstorage/posix/posix.go | 234 ++++++++++++++++++ internal/objectstorage/posix/posix_test.go | 60 +++++ .../{posix.go => posixflat/posixflat.go} | 107 ++------ .../posixflat_test.go} | 4 +- internal/objectstorage/{ => s3}/s3.go | 28 ++- internal/objectstorage/types/types.go | 41 +++ .../services/configstore/readdb/readdb.go | 5 +- internal/services/runservice/api/api.go | 3 +- internal/services/runservice/api/executor.go | 12 +- internal/services/runservice/readdb/readdb.go | 5 +- internal/services/runservice/scheduler.go | 6 +- internal/services/runservice/store/store.go | 4 +- 19 files changed, 525 insertions(+), 165 deletions(-) create mode 100644 internal/objectstorage/common/atomic.go create mode 100644 internal/objectstorage/posix/posix.go create mode 100644 internal/objectstorage/posix/posix_test.go rename internal/objectstorage/{posix.go => posixflat/posixflat.go} (75%) rename internal/objectstorage/{posix_test.go => posixflat/posixflat_test.go} (98%) rename internal/objectstorage/{ => s3}/s3.go (86%) create mode 100644 internal/objectstorage/types/types.go diff --git a/internal/common/common.go b/internal/common/common.go index 1aff75c..ac34f0c 100644 --- a/internal/common/common.go +++ b/internal/common/common.go @@ -24,6 +24,8 @@ import ( "github.com/pkg/errors" "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" + "github.com/sorintlab/agola/internal/objectstorage/posix" + "github.com/sorintlab/agola/internal/objectstorage/s3" "github.com/sorintlab/agola/internal/services/config" "go.uber.org/zap" ) @@ -81,7 +83,7 @@ func NewObjectStorage(c *config.ObjectStorage) (*objectstorage.ObjStorage, error switch c.Type { case config.ObjectStorageTypePosix: - ost, err = objectstorage.NewPosixStorage(c.Path) + ost, err = posix.New(c.Path) if err != nil { return nil, errors.Wrapf(err, "failed to create posix object storage") } @@ -100,7 +102,7 @@ func NewObjectStorage(c *config.ObjectStorage) (*objectstorage.ObjStorage, error return nil, errors.Errorf("wrong s3 endpoint scheme %q (must be http or https)", u.Scheme) } } - ost, err = objectstorage.NewS3Storage(c.Bucket, c.Location, endpoint, c.AccessKey, c.SecretAccessKey, secure) + ost, err = s3.New(c.Bucket, c.Location, endpoint, c.AccessKey, c.SecretAccessKey, secure) if err != nil { return nil, errors.Wrapf(err, "failed to create s3 object storage") } diff --git a/internal/datamanager/data.go b/internal/datamanager/data.go index b7eaca7..29e1fbe 100644 --- a/internal/datamanager/data.go +++ b/internal/datamanager/data.go @@ -23,7 +23,7 @@ import ( "strings" "github.com/pkg/errors" - "github.com/sorintlab/agola/internal/objectstorage" + ostypes "github.com/sorintlab/agola/internal/objectstorage/types" "github.com/sorintlab/agola/internal/sequence" ) @@ -98,19 +98,19 @@ func (d *DataManager) writeData(ctx context.Context, wals []*WalData) error { func (d *DataManager) writeDataType(ctx context.Context, wals []*WalData, datatype, dataSequence string) error { curDataStatus, err := d.GetLastDataStatus() - if err != nil && err != objectstorage.ErrNotExist { + if err != nil && err != ostypes.ErrNotExist { return err } dataEntriesMap := map[string]*DataEntry{} - if err != objectstorage.ErrNotExist { + if err != ostypes.ErrNotExist { curDataSequence := curDataStatus.DataSequence oldDataf, err := d.ost.ReadObject(dataFilePath(datatype, curDataSequence)) - if err != nil && err != objectstorage.ErrNotExist { + if err != nil && err != ostypes.ErrNotExist { return err } - if err != objectstorage.ErrNotExist { + if err != ostypes.ErrNotExist { dec := json.NewDecoder(oldDataf) for { var de *DataEntry @@ -240,7 +240,7 @@ func (d *DataManager) Read(dataType, id string) (io.Reader, error) { pos, ok := dataFileIndex.Index[id] if !ok { - return nil, objectstorage.ErrNotExist + return nil, ostypes.ErrNotExist } dataf, err := d.ost.ReadObject(dataFilePath(dataType, dataSequence)) @@ -276,7 +276,7 @@ func (d *DataManager) GetLastDataStatusPath() (string, error) { } } if dataStatusPath == "" { - return "", objectstorage.ErrNotExist + return "", ostypes.ErrNotExist } return dataStatusPath, nil diff --git a/internal/datamanager/datamanager_test.go b/internal/datamanager/datamanager_test.go index 28a6060..ff6c2d4 100644 --- a/internal/datamanager/datamanager_test.go +++ b/internal/datamanager/datamanager_test.go @@ -24,6 +24,8 @@ import ( slog "github.com/sorintlab/agola/internal/log" "github.com/sorintlab/agola/internal/objectstorage" + "github.com/sorintlab/agola/internal/objectstorage/posix" + ostypes "github.com/sorintlab/agola/internal/objectstorage/types" "github.com/sorintlab/agola/internal/testutil" "go.uber.org/zap" @@ -76,7 +78,7 @@ func TestEtcdReset(t *testing.T) { ostDir, err := ioutil.TempDir(dir, "ost") - ost, err := objectstorage.NewPosixStorage(ostDir) + ost, err := posix.New(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -175,7 +177,7 @@ func TestConcurrentUpdate(t *testing.T) { ostDir, err := ioutil.TempDir(dir, "ost") - ost, err := objectstorage.NewPosixStorage(ostDir) + ost, err := posix.New(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -257,7 +259,7 @@ func TestWalCleaner(t *testing.T) { ostDir, err := ioutil.TempDir(dir, "ost") - ost, err := objectstorage.NewPosixStorage(ostDir) + ost, err := posix.New(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -317,7 +319,7 @@ func TestReadObject(t *testing.T) { ctx := context.Background() ostDir, err := ioutil.TempDir(dir, "ost") - ost, err := objectstorage.NewPosixStorage(ostDir) + ost, err := posix.New(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -386,8 +388,8 @@ func TestReadObject(t *testing.T) { // should not exists _, _, err = dm.ReadObject("datatype01", "object1", nil) - if err != objectstorage.ErrNotExist { - t.Fatalf("expected err %v, got: %v", objectstorage.ErrNotExist, err) + if err != ostypes.ErrNotExist { + t.Fatalf("expected err %v, got: %v", ostypes.ErrNotExist, err) } // should exist _, _, err = dm.ReadObject("datatype01", "object19", nil) @@ -406,8 +408,8 @@ func TestReadObject(t *testing.T) { // should not exists _, _, err = dm.ReadObject("datatype01", "object1", nil) - if err != objectstorage.ErrNotExist { - t.Fatalf("expected err %v, got: %v", objectstorage.ErrNotExist, err) + if err != ostypes.ErrNotExist { + t.Fatalf("expected err %v, got: %v", ostypes.ErrNotExist, err) } // should exist _, _, err = dm.ReadObject("datatype01", "object19", nil) diff --git a/internal/datamanager/wal.go b/internal/datamanager/wal.go index bd7e0c6..ba6d219 100644 --- a/internal/datamanager/wal.go +++ b/internal/datamanager/wal.go @@ -26,12 +26,12 @@ import ( "strings" "time" - uuid "github.com/satori/go.uuid" "github.com/sorintlab/agola/internal/etcd" - "github.com/sorintlab/agola/internal/objectstorage" + ostypes "github.com/sorintlab/agola/internal/objectstorage/types" "github.com/sorintlab/agola/internal/sequence" "github.com/pkg/errors" + uuid "github.com/satori/go.uuid" etcdclientv3 "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3/concurrency" etcdclientv3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" @@ -163,7 +163,7 @@ func (d *DataManager) changesList(paths []string, prefix, startWith string, recu func (d *DataManager) HasOSTWal(walseq string) (bool, error) { _, err := d.ost.Stat(d.storageWalStatusFile(walseq) + ".committed") - if err == objectstorage.ErrNotExist { + if err == ostypes.ErrNotExist { return false, nil } if err != nil { diff --git a/internal/objectstorage/common/atomic.go b/internal/objectstorage/common/atomic.go new file mode 100644 index 0000000..4b14bec --- /dev/null +++ b/internal/objectstorage/common/atomic.go @@ -0,0 +1,84 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "io" + "io/ioutil" + "os" + "path/filepath" + "strings" +) + +// WriteFileAtomicFunc atomically writes a file, it achieves this by creating a +// temporary file and then moving it. writeFunc is the func that will write +// data to the file. +// TODO(sgotti) remove left over tmp files if process crashes before calling +// os.Remove +func WriteFileAtomicFunc(p, baseDir, tmpDir string, perm os.FileMode, persist bool, writeFunc func(f io.Writer) error) error { + f, err := ioutil.TempFile(tmpDir, "tmpfile") + if err != nil { + return err + } + err = writeFunc(f) + if persist && err == nil { + err = f.Sync() + } + if closeErr := f.Close(); err == nil { + err = closeErr + } + if permErr := os.Chmod(f.Name(), perm); err == nil { + err = permErr + } + if err == nil { + err = os.Rename(f.Name(), p) + } + if err != nil { + os.Remove(f.Name()) + return err + } + + if !persist { + return nil + } + // sync parent dirs + pdir := filepath.Dir(p) + for { + if !strings.HasPrefix(pdir, baseDir) { + break + } + f, err := os.Open(pdir) + if err != nil { + f.Close() + return nil + } + if err := f.Sync(); err != nil { + f.Close() + return nil + } + f.Close() + + pdir = filepath.Dir(pdir) + } + return nil +} + +func WriteFileAtomic(filename, baseDir, tmpDir string, perm os.FileMode, persist bool, data []byte) error { + return WriteFileAtomicFunc(filename, baseDir, tmpDir, perm, persist, + func(f io.Writer) error { + _, err := f.Write(data) + return err + }) +} diff --git a/internal/objectstorage/objectstorage.go b/internal/objectstorage/objectstorage.go index 1555a1d..239a468 100644 --- a/internal/objectstorage/objectstorage.go +++ b/internal/objectstorage/objectstorage.go @@ -15,37 +15,17 @@ package objectstorage import ( - "errors" "io" - "time" + + "github.com/sorintlab/agola/internal/objectstorage/types" ) -// TODO(sgotti) -// define common errors (like notFound) so the implementations will return them -// instead of their own errors - -var ErrNotExist = errors.New("does not exist") - -type ReadSeekCloser interface { - io.Reader - io.Seeker - io.Closer -} - type Storage interface { - Stat(filepath string) (*ObjectInfo, error) - ReadObject(filepath string) (ReadSeekCloser, error) + Stat(filepath string) (*types.ObjectInfo, error) + ReadObject(filepath string) (types.ReadSeekCloser, error) WriteObject(filepath string, data io.Reader, size int64, persist bool) error DeleteObject(filepath string) error - List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo -} - -type ObjectInfo struct { - Path string - - LastModified time.Time - - Err error + List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan types.ObjectInfo } // ObjStorage wraps a Storage providing additional helper functions @@ -62,7 +42,7 @@ func (s *ObjStorage) Delimiter() string { return s.delimiter } -func (s *ObjStorage) List(prefix, startWith string, recursive bool, doneCh <-chan struct{}) <-chan ObjectInfo { +func (s *ObjStorage) List(prefix, startWith string, recursive bool, doneCh <-chan struct{}) <-chan types.ObjectInfo { delimiter := s.delimiter if recursive { delimiter = "" diff --git a/internal/objectstorage/objectstorage_test.go b/internal/objectstorage/objectstorage_test.go index c008da2..7f8bdbe 100644 --- a/internal/objectstorage/objectstorage_test.go +++ b/internal/objectstorage/objectstorage_test.go @@ -18,10 +18,15 @@ import ( "fmt" "io/ioutil" "os" + "path" "path/filepath" "reflect" "strings" "testing" + + "github.com/sorintlab/agola/internal/objectstorage/posix" + "github.com/sorintlab/agola/internal/objectstorage/posixflat" + "github.com/sorintlab/agola/internal/objectstorage/s3" ) func TestList(t *testing.T) { @@ -31,12 +36,16 @@ func TestList(t *testing.T) { } defer os.RemoveAll(dir) - ls, err := NewPosixStorage(dir) + ps, err := posix.New(path.Join(dir, "posix")) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + pfs, err := posixflat.New(path.Join(dir, "posixflat")) if err != nil { t.Fatalf("unexpected err: %v", err) } - var s3 *S3Storage + var s3s *s3.S3Storage minioEndpoint := os.Getenv("MINIO_ENDPOINT") minioAccessKey := os.Getenv("MINIO_ACCESSKEY") minioSecretKey := os.Getenv("MINIO_SECRETKEY") @@ -44,7 +53,7 @@ func TestList(t *testing.T) { t.Logf("missing MINIO_ENDPOINT env, skipping tests with minio storage") } else { var err error - s3, err = NewS3Storage(filepath.Base(dir), "", minioEndpoint, minioAccessKey, minioSecretKey, false) + s3s, err = s3.New(filepath.Base(dir), "", minioEndpoint, minioAccessKey, minioSecretKey, false) if err != nil { t.Fatalf("err: %v", err) } @@ -62,7 +71,7 @@ func TestList(t *testing.T) { ops []listop }{ { - map[string]Storage{"local": ls}, + map[string]Storage{"posixflat": pfs}, []string{ // Minio (as of 20190201) IMHO is not real S3 since it tries to map to a // file system and not a flat namespace like S3. For this reason this test @@ -172,7 +181,7 @@ func TestList(t *testing.T) { }, }, { - map[string]Storage{"local": ls, "minio": s3}, + map[string]Storage{"posix": ps, "posixflat": pfs, "minio": s3s}, []string{ // These are multiple of 8 chars on purpose to test the filemarker behavior to // distinguish between a file or a directory when the files ends at the path @@ -252,7 +261,7 @@ func TestList(t *testing.T) { for sname, s := range tt.s { t.Run(fmt.Sprintf("test with storage type %s", sname), func(t *testing.T) { switch s := s.(type) { - case *S3Storage: + case *s3.S3Storage: if s == nil { t.SkipNow() } diff --git a/internal/objectstorage/posix/posix.go b/internal/objectstorage/posix/posix.go new file mode 100644 index 0000000..2e26227 --- /dev/null +++ b/internal/objectstorage/posix/posix.go @@ -0,0 +1,234 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package posix + +import ( + "io" + "os" + "path" + "path/filepath" + "strings" + + "github.com/pkg/errors" + "github.com/sorintlab/agola/internal/objectstorage/common" + "github.com/sorintlab/agola/internal/objectstorage/types" +) + +const ( + dataDirName = "data" + tmpDirName = "tmp" +) + +type PosixStorage struct { + dataDir string + tmpDir string +} + +func New(baseDir string) (*PosixStorage, error) { + if err := os.MkdirAll(baseDir, 0770); err != nil { + return nil, err + } + dataDir := filepath.Join(baseDir, dataDirName) + tmpDir := filepath.Join(baseDir, tmpDirName) + if err := os.MkdirAll(dataDir, 0770); err != nil { + return nil, errors.Wrapf(err, "failed to create data dir") + } + if err := os.MkdirAll(tmpDir, 0770); err != nil { + return nil, errors.Wrapf(err, "failed to create tmp dir") + } + return &PosixStorage{ + dataDir: dataDir, + tmpDir: tmpDir, + }, nil +} + +func (s *PosixStorage) fsPath(p string) (string, error) { + return filepath.Join(s.dataDir, p), nil +} + +func (s *PosixStorage) Stat(p string) (*types.ObjectInfo, error) { + fspath, err := s.fsPath(p) + if err != nil { + return nil, err + } + + fi, err := os.Stat(fspath) + if err != nil { + if os.IsNotExist(err) { + return nil, types.ErrNotExist + } + return nil, err + } + + return &types.ObjectInfo{Path: p, LastModified: fi.ModTime()}, nil +} + +func (s *PosixStorage) ReadObject(p string) (types.ReadSeekCloser, error) { + fspath, err := s.fsPath(p) + if err != nil { + return nil, err + } + + f, err := os.Open(fspath) + if err != nil && os.IsNotExist(err) { + return nil, types.ErrNotExist + } + return f, err +} + +func (s *PosixStorage) WriteObject(p string, data io.Reader, size int64, persist bool) error { + fspath, err := s.fsPath(p) + if err != nil { + return err + } + + if err := os.MkdirAll(path.Dir(fspath), 0770); err != nil { + return err + } + return common.WriteFileAtomicFunc(fspath, s.dataDir, s.tmpDir, 0660, persist, func(f io.Writer) error { + _, err := io.Copy(f, data) + return err + }) +} + +func (s *PosixStorage) DeleteObject(p string) error { + fspath, err := s.fsPath(p) + if err != nil { + return err + } + + if err := os.Remove(fspath); err != nil { + if os.IsNotExist(err) { + return types.ErrNotExist + } + return err + } + + // try to remove parent empty dirs + // TODO(sgotti) if this fails we ignore errors and the dirs will be left as + // empty, clean them asynchronously + pdir := filepath.Dir(fspath) + for { + if pdir == s.dataDir || !strings.HasPrefix(pdir, s.dataDir) { + break + } + f, err := os.Open(pdir) + if err != nil { + return nil + } + + _, err = f.Readdirnames(1) + if err == io.EOF { + f.Close() + if err := os.Remove(pdir); err != nil { + return nil + } + } else { + f.Close() + break + } + + pdir = filepath.Dir(pdir) + } + return nil +} + +func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan types.ObjectInfo { + objectCh := make(chan types.ObjectInfo, 1) + + if len(delimiter) > 1 { + objectCh <- types.ObjectInfo{Err: errors.Errorf("wrong delimiter %q", delimiter)} + return objectCh + } + + if startWith != "" && !strings.Contains(startWith, prefix) { + objectCh <- types.ObjectInfo{Err: errors.Errorf("wrong startwith value %q for prefix %q", startWith, prefix)} + return objectCh + } + + recursive := delimiter == "" + + // remove leading slash from prefix + if strings.HasPrefix(prefix, "/") { + prefix = strings.TrimPrefix(prefix, "/") + } + + fprefix := filepath.Join(s.dataDir, prefix) + root := filepath.Dir(fprefix) + if len(root) < len(s.dataDir) { + root = s.dataDir + } + + // remove leading slash + if strings.HasPrefix(startWith, "/") { + startWith = strings.TrimPrefix(startWith, "/") + } + + go func(objectCh chan<- types.ObjectInfo) { + defer close(objectCh) + err := filepath.Walk(root, func(ep string, info os.FileInfo, err error) error { + if err != nil && !os.IsNotExist(err) { + return err + } + if os.IsNotExist(err) { + return nil + } + p := ep + + // get the path with / separator + p = filepath.ToSlash(p) + + p, err = filepath.Rel(s.dataDir, p) + if err != nil { + return err + } + if !recursive && len(p) > len(prefix) { + rel := strings.TrimPrefix(p, prefix) + skip := strings.Contains(rel, delimiter) + + if info.IsDir() && skip { + return filepath.SkipDir + } + if skip { + return nil + } + } + + if info.IsDir() { + return nil + } + + if strings.HasPrefix(p, prefix) && p > startWith { + select { + // Send object content. + case objectCh <- types.ObjectInfo{Path: p, LastModified: info.ModTime()}: + // If receives done from the caller, return here. + case <-doneCh: + return io.EOF + } + } + + return nil + }) + if err != nil && err != io.EOF { + objectCh <- types.ObjectInfo{ + Err: err, + } + return + } + }(objectCh) + + return objectCh +} diff --git a/internal/objectstorage/posix/posix_test.go b/internal/objectstorage/posix/posix_test.go new file mode 100644 index 0000000..aca1a2e --- /dev/null +++ b/internal/objectstorage/posix/posix_test.go @@ -0,0 +1,60 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package posix + +import ( + "bytes" + "io/ioutil" + "os" + "path/filepath" + "testing" +) + +func TestDeleteObject(t *testing.T) { + objects := []string{"☺☺☺☺a☺☺☺☺☺☺b☺☺☺☺", "s3/is/nota/fil.fa", "s3/is/not/a/file///system/fi%l%%e01"} + + dir, err := ioutil.TempDir("", "objectstorage") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + //defer os.RemoveAll(dir) + + ls, err := New(dir) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + for _, obj := range objects { + if err := ls.WriteObject(obj, bytes.NewReader([]byte{}), 0, true); err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := ls.DeleteObject(obj); err != nil { + t.Fatalf("unexpected err: %v", err) + } + } + + // no files and directories should be left + bd, err := os.Open(filepath.Join(dir, dataDirName)) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + files, err := bd.Readdirnames(0) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if len(files) > 0 { + t.Fatalf("expected 0 files got %d files", len(files)) + } +} diff --git a/internal/objectstorage/posix.go b/internal/objectstorage/posixflat/posixflat.go similarity index 75% rename from internal/objectstorage/posix.go rename to internal/objectstorage/posixflat/posixflat.go index 75a687b..d37cb14 100644 --- a/internal/objectstorage/posix.go +++ b/internal/objectstorage/posixflat/posixflat.go @@ -12,11 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -package objectstorage +package posixflat import ( "io" - "io/ioutil" "os" "path" "path/filepath" @@ -24,6 +23,9 @@ import ( "strings" "unicode/utf8" + "github.com/sorintlab/agola/internal/objectstorage/common" + "github.com/sorintlab/agola/internal/objectstorage/types" + "github.com/pkg/errors" ) @@ -204,12 +206,12 @@ func unescape(s string) (string, bool, error) { return string(t), hasFileMarker, nil } -type PosixStorage struct { +type PosixFlatStorage struct { dataDir string tmpDir string } -func NewPosixStorage(baseDir string) (*PosixStorage, error) { +func New(baseDir string) (*PosixFlatStorage, error) { if err := os.MkdirAll(baseDir, 0770); err != nil { return nil, err } @@ -221,20 +223,20 @@ func NewPosixStorage(baseDir string) (*PosixStorage, error) { if err := os.MkdirAll(tmpDir, 0770); err != nil { return nil, errors.Wrapf(err, "failed to create tmp dir") } - return &PosixStorage{ + return &PosixFlatStorage{ dataDir: dataDir, tmpDir: tmpDir, }, nil } -func (s *PosixStorage) fsPath(p string) (string, error) { +func (s *PosixFlatStorage) fsPath(p string) (string, error) { if p == "" { return "", errors.Errorf("empty key name") } return filepath.Join(s.dataDir, escape(p)), nil } -func (s *PosixStorage) Stat(p string) (*ObjectInfo, error) { +func (s *PosixFlatStorage) Stat(p string) (*types.ObjectInfo, error) { fspath, err := s.fsPath(p) if err != nil { return nil, err @@ -243,15 +245,15 @@ func (s *PosixStorage) Stat(p string) (*ObjectInfo, error) { fi, err := os.Stat(fspath) if err != nil { if os.IsNotExist(err) { - return nil, ErrNotExist + return nil, types.ErrNotExist } return nil, err } - return &ObjectInfo{Path: p, LastModified: fi.ModTime()}, nil + return &types.ObjectInfo{Path: p, LastModified: fi.ModTime()}, nil } -func (s *PosixStorage) ReadObject(p string) (ReadSeekCloser, error) { +func (s *PosixFlatStorage) ReadObject(p string) (types.ReadSeekCloser, error) { fspath, err := s.fsPath(p) if err != nil { return nil, err @@ -259,12 +261,12 @@ func (s *PosixStorage) ReadObject(p string) (ReadSeekCloser, error) { f, err := os.Open(fspath) if err != nil && os.IsNotExist(err) { - return nil, ErrNotExist + return nil, types.ErrNotExist } return f, err } -func (s *PosixStorage) WriteObject(p string, data io.Reader, size int64, persist bool) error { +func (s *PosixFlatStorage) WriteObject(p string, data io.Reader, size int64, persist bool) error { fspath, err := s.fsPath(p) if err != nil { return err @@ -273,13 +275,13 @@ func (s *PosixStorage) WriteObject(p string, data io.Reader, size int64, persist if err := os.MkdirAll(path.Dir(fspath), 0770); err != nil { return err } - return s.WriteFileAtomicFunc(fspath, 0660, persist, func(f io.Writer) error { + return common.WriteFileAtomicFunc(fspath, s.dataDir, s.tmpDir, 0660, persist, func(f io.Writer) error { _, err := io.Copy(f, data) return err }) } -func (s *PosixStorage) DeleteObject(p string) error { +func (s *PosixFlatStorage) DeleteObject(p string) error { fspath, err := s.fsPath(p) if err != nil { return err @@ -287,7 +289,7 @@ func (s *PosixStorage) DeleteObject(p string) error { if err := os.Remove(fspath); err != nil { if os.IsNotExist(err) { - return ErrNotExist + return types.ErrNotExist } return err } @@ -321,16 +323,16 @@ func (s *PosixStorage) DeleteObject(p string) error { return nil } -func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo { - objectCh := make(chan ObjectInfo, 1) +func (s *PosixFlatStorage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan types.ObjectInfo { + objectCh := make(chan types.ObjectInfo, 1) if len(delimiter) > 1 { - objectCh <- ObjectInfo{Err: errors.Errorf("wrong delimiter %q", delimiter)} + objectCh <- types.ObjectInfo{Err: errors.Errorf("wrong delimiter %q", delimiter)} return objectCh } if startWith != "" && !strings.Contains(startWith, prefix) { - objectCh <- ObjectInfo{Err: errors.Errorf("wrong startwith value %q for prefix %q", startWith, prefix)} + objectCh <- types.ObjectInfo{Err: errors.Errorf("wrong startwith value %q for prefix %q", startWith, prefix)} return objectCh } @@ -352,7 +354,7 @@ func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan s startWith = strings.TrimPrefix(startWith, "/") } - go func(objectCh chan<- ObjectInfo) { + go func(objectCh chan<- types.ObjectInfo) { var prevp string defer close(objectCh) err := filepath.Walk(root, func(ep string, info os.FileInfo, err error) error { @@ -410,7 +412,7 @@ func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan s if p > prevp { select { // Send object content. - case objectCh <- ObjectInfo{Path: p, LastModified: info.ModTime()}: + case objectCh <- types.ObjectInfo{Path: p, LastModified: info.ModTime()}: // If receives done from the caller, return here. case <-doneCh: return io.EOF @@ -422,7 +424,7 @@ func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan s return nil }) if err != nil && err != io.EOF { - objectCh <- ObjectInfo{ + objectCh <- types.ObjectInfo{ Err: err, } return @@ -431,64 +433,3 @@ func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan s return objectCh } - -// WriteFileAtomicFunc atomically writes a file, it achieves this by creating a -// temporary file and then moving it. writeFunc is the func that will write -// data to the file. -// TODO(sgotti) remove left over tmp files if process crashes before calling -// os.Remove -func (s *PosixStorage) WriteFileAtomicFunc(p string, perm os.FileMode, persist bool, writeFunc func(f io.Writer) error) error { - f, err := ioutil.TempFile(s.tmpDir, "tmpfile") - if err != nil { - return err - } - err = writeFunc(f) - if persist && err == nil { - err = f.Sync() - } - if closeErr := f.Close(); err == nil { - err = closeErr - } - if permErr := os.Chmod(f.Name(), perm); err == nil { - err = permErr - } - if err == nil { - err = os.Rename(f.Name(), p) - } - if err != nil { - os.Remove(f.Name()) - return err - } - - if !persist { - return nil - } - // sync parent dirs - pdir := filepath.Dir(p) - for { - if !strings.HasPrefix(pdir, s.dataDir) { - break - } - f, err := os.Open(pdir) - if err != nil { - f.Close() - return nil - } - if err := f.Sync(); err != nil { - f.Close() - return nil - } - f.Close() - - pdir = filepath.Dir(pdir) - } - return nil -} - -func (s *PosixStorage) WriteFileAtomic(filename string, perm os.FileMode, persist bool, data []byte) error { - return s.WriteFileAtomicFunc(filename, perm, persist, - func(f io.Writer) error { - _, err := f.Write(data) - return err - }) -} diff --git a/internal/objectstorage/posix_test.go b/internal/objectstorage/posixflat/posixflat_test.go similarity index 98% rename from internal/objectstorage/posix_test.go rename to internal/objectstorage/posixflat/posixflat_test.go index 35f1255..4fc4328 100644 --- a/internal/objectstorage/posix_test.go +++ b/internal/objectstorage/posixflat/posixflat_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package objectstorage +package posixflat import ( "bytes" @@ -86,7 +86,7 @@ func TestDeleteObject(t *testing.T) { } //defer os.RemoveAll(dir) - ls, err := NewPosixStorage(dir) + ls, err := New(dir) if err != nil { t.Fatalf("unexpected err: %v", err) } diff --git a/internal/objectstorage/s3.go b/internal/objectstorage/s3/s3.go similarity index 86% rename from internal/objectstorage/s3.go rename to internal/objectstorage/s3/s3.go index b94ba23..fc9dc61 100644 --- a/internal/objectstorage/s3.go +++ b/internal/objectstorage/s3/s3.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package objectstorage +package s3 import ( "io" @@ -21,6 +21,8 @@ import ( "os" "strings" + "github.com/sorintlab/agola/internal/objectstorage/types" + minio "github.com/minio/minio-go" "github.com/pkg/errors" ) @@ -32,7 +34,7 @@ type S3Storage struct { minioCore *minio.Core } -func NewS3Storage(bucket, location, endpoint, accessKeyID, secretAccessKey string, secure bool) (*S3Storage, error) { +func New(bucket, location, endpoint, accessKeyID, secretAccessKey string, secure bool) (*S3Storage, error) { minioClient, err := minio.New(endpoint, accessKeyID, secretAccessKey, secure) if err != nil { return nil, err @@ -60,24 +62,24 @@ func NewS3Storage(bucket, location, endpoint, accessKeyID, secretAccessKey strin }, nil } -func (s *S3Storage) Stat(p string) (*ObjectInfo, error) { +func (s *S3Storage) Stat(p string) (*types.ObjectInfo, error) { oi, err := s.minioClient.StatObject(s.bucket, p, minio.StatObjectOptions{}) if err != nil { merr := minio.ToErrorResponse(err) if merr.StatusCode == http.StatusNotFound { - return nil, ErrNotExist + return nil, types.ErrNotExist } return nil, merr } - return &ObjectInfo{Path: p, LastModified: oi.LastModified}, nil + return &types.ObjectInfo{Path: p, LastModified: oi.LastModified}, nil } -func (s *S3Storage) ReadObject(filepath string) (ReadSeekCloser, error) { +func (s *S3Storage) ReadObject(filepath string) (types.ReadSeekCloser, error) { if _, err := s.minioClient.StatObject(s.bucket, filepath, minio.StatObjectOptions{}); err != nil { merr := minio.ToErrorResponse(err) if merr.StatusCode == http.StatusNotFound { - return nil, ErrNotExist + return nil, types.ErrNotExist } return nil, merr } @@ -117,11 +119,11 @@ func (s *S3Storage) DeleteObject(filepath string) error { return s.minioClient.RemoveObject(s.bucket, filepath) } -func (s *S3Storage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo { - objectCh := make(chan ObjectInfo, 1) +func (s *S3Storage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan types.ObjectInfo { + objectCh := make(chan types.ObjectInfo, 1) if len(delimiter) > 1 { - objectCh <- ObjectInfo{ + objectCh <- types.ObjectInfo{ Err: errors.Errorf("wrong delimiter %q", delimiter), } return objectCh @@ -136,7 +138,7 @@ func (s *S3Storage) List(prefix, startWith, delimiter string, doneCh <-chan stru } // Initiate list objects goroutine here. - go func(objectCh chan<- ObjectInfo) { + go func(objectCh chan<- types.ObjectInfo) { defer close(objectCh) // Save continuationToken for next request. var continuationToken string @@ -144,7 +146,7 @@ func (s *S3Storage) List(prefix, startWith, delimiter string, doneCh <-chan stru // Get list of objects a maximum of 1000 per request. result, err := s.minioCore.ListObjectsV2(s.bucket, prefix, continuationToken, false, delimiter, 1000, startWith) if err != nil { - objectCh <- ObjectInfo{ + objectCh <- types.ObjectInfo{ Err: err, } return @@ -154,7 +156,7 @@ func (s *S3Storage) List(prefix, startWith, delimiter string, doneCh <-chan stru for _, object := range result.Contents { select { // Send object content. - case objectCh <- ObjectInfo{Path: object.Key, LastModified: object.LastModified}: + case objectCh <- types.ObjectInfo{Path: object.Key, LastModified: object.LastModified}: // If receives done from the caller, return here. case <-doneCh: return diff --git a/internal/objectstorage/types/types.go b/internal/objectstorage/types/types.go new file mode 100644 index 0000000..cd11a90 --- /dev/null +++ b/internal/objectstorage/types/types.go @@ -0,0 +1,41 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package types + +import ( + "errors" + "io" + "time" +) + +// TODO(sgotti) +// define common errors (like notFound) so the implementations will return them +// instead of their own errors + +var ErrNotExist = errors.New("does not exist") + +type ReadSeekCloser interface { + io.Reader + io.Seeker + io.Closer +} + +type ObjectInfo struct { + Path string + + LastModified time.Time + + Err error +} diff --git a/internal/services/configstore/readdb/readdb.go b/internal/services/configstore/readdb/readdb.go index 45b8f19..e48237e 100644 --- a/internal/services/configstore/readdb/readdb.go +++ b/internal/services/configstore/readdb/readdb.go @@ -28,6 +28,7 @@ import ( "github.com/sorintlab/agola/internal/db" "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" + ostypes "github.com/sorintlab/agola/internal/objectstorage/types" "github.com/sorintlab/agola/internal/sequence" "github.com/sorintlab/agola/internal/services/types" "github.com/sorintlab/agola/internal/util" @@ -125,10 +126,10 @@ func (r *ReadDB) ResetDB() error { func (r *ReadDB) SyncFromDump() (string, error) { dumpIndex, err := r.dm.GetLastDataStatus() - if err != nil && err != objectstorage.ErrNotExist { + if err != nil && err != ostypes.ErrNotExist { return "", errors.WithStack(err) } - if err == objectstorage.ErrNotExist { + if err == ostypes.ErrNotExist { return "", nil } for dataType, files := range dumpIndex.Files { diff --git a/internal/services/runservice/api/api.go b/internal/services/runservice/api/api.go index 926cd4f..5c6c5ba 100644 --- a/internal/services/runservice/api/api.go +++ b/internal/services/runservice/api/api.go @@ -26,6 +26,7 @@ import ( "github.com/sorintlab/agola/internal/db" "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" + ostypes "github.com/sorintlab/agola/internal/objectstorage/types" "github.com/sorintlab/agola/internal/services/runservice/action" "github.com/sorintlab/agola/internal/services/runservice/common" "github.com/sorintlab/agola/internal/services/runservice/readdb" @@ -198,7 +199,7 @@ func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, se } f, err := h.ost.ReadObject(logPath) if err != nil { - if err == objectstorage.ErrNotExist { + if err == ostypes.ErrNotExist { return common.NewErrNotExist(err), true } return err, true diff --git a/internal/services/runservice/api/executor.go b/internal/services/runservice/api/executor.go index fa126d1..7ca04d6 100644 --- a/internal/services/runservice/api/executor.go +++ b/internal/services/runservice/api/executor.go @@ -22,13 +22,15 @@ import ( "net/http" "strconv" - "github.com/gorilla/mux" "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" + ostypes "github.com/sorintlab/agola/internal/objectstorage/types" "github.com/sorintlab/agola/internal/services/runservice/action" "github.com/sorintlab/agola/internal/services/runservice/common" "github.com/sorintlab/agola/internal/services/runservice/store" "github.com/sorintlab/agola/internal/services/runservice/types" + + "github.com/gorilla/mux" "go.uber.org/zap" ) @@ -244,7 +246,7 @@ func (h *ArchivesHandler) readArchive(rtID string, step int, w io.Writer) error archivePath := store.OSTRunTaskArchivePath(rtID, step) f, err := h.ost.ReadObject(archivePath) if err != nil { - if err == objectstorage.ErrNotExist { + if err == ostypes.ErrNotExist { return common.NewErrNotExist(err) } return err @@ -321,7 +323,7 @@ func matchCache(ost *objectstorage.ObjStorage, key string, prefix bool) (string, defer close(doneCh) // get the latest modified object - var lastObject *objectstorage.ObjectInfo + var lastObject *ostypes.ObjectInfo for object := range ost.List(store.OSTCacheDir()+"/"+key, "", false, doneCh) { if object.Err != nil { return "", object.Err @@ -340,7 +342,7 @@ func matchCache(ost *objectstorage.ObjStorage, key string, prefix bool) (string, } _, err := ost.Stat(cachePath) - if err == objectstorage.ErrNotExist { + if err == ostypes.ErrNotExist { return "", nil } if err != nil { @@ -353,7 +355,7 @@ func (h *CacheHandler) readCache(key string, w io.Writer) error { cachePath := store.OSTCachePath(key) f, err := h.ost.ReadObject(cachePath) if err != nil { - if err == objectstorage.ErrNotExist { + if err == ostypes.ErrNotExist { return common.NewErrNotExist(err) } return err diff --git a/internal/services/runservice/readdb/readdb.go b/internal/services/runservice/readdb/readdb.go index 95079a9..042a80a 100644 --- a/internal/services/runservice/readdb/readdb.go +++ b/internal/services/runservice/readdb/readdb.go @@ -32,6 +32,7 @@ import ( "github.com/sorintlab/agola/internal/db" "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" + ostypes "github.com/sorintlab/agola/internal/objectstorage/types" "github.com/sorintlab/agola/internal/sequence" "github.com/sorintlab/agola/internal/services/runservice/common" "github.com/sorintlab/agola/internal/services/runservice/store" @@ -647,10 +648,10 @@ func (r *ReadDB) SyncObjectStorage(ctx context.Context) error { func (r *ReadDB) SyncFromDump() (string, error) { dumpIndex, err := r.dm.GetLastDataStatus() - if err != nil && err != objectstorage.ErrNotExist { + if err != nil && err != ostypes.ErrNotExist { return "", errors.WithStack(err) } - if err == objectstorage.ErrNotExist { + if err == ostypes.ErrNotExist { return "", nil } for dataType, files := range dumpIndex.Files { diff --git a/internal/services/runservice/scheduler.go b/internal/services/runservice/scheduler.go index 14e863d..832834f 100644 --- a/internal/services/runservice/scheduler.go +++ b/internal/services/runservice/scheduler.go @@ -27,7 +27,7 @@ import ( "github.com/sorintlab/agola/internal/datamanager" "github.com/sorintlab/agola/internal/etcd" slog "github.com/sorintlab/agola/internal/log" - "github.com/sorintlab/agola/internal/objectstorage" + ostypes "github.com/sorintlab/agola/internal/objectstorage/types" "github.com/sorintlab/agola/internal/runconfig" "github.com/sorintlab/agola/internal/services/runservice/common" "github.com/sorintlab/agola/internal/services/runservice/store" @@ -862,7 +862,7 @@ func (s *Runservice) runTasksUpdater(ctx context.Context) error { func (s *Runservice) OSTFileExists(path string) (bool, error) { _, err := s.ost.Stat(path) - if err != nil && err != objectstorage.ErrNotExist { + if err != nil && err != ostypes.ErrNotExist { return false, err } return err == nil, nil @@ -1363,7 +1363,7 @@ func (s *Runservice) cacheCleaner(ctx context.Context, cacheExpireInterval time. } if object.LastModified.Add(cacheExpireInterval).Before(time.Now()) { if err := s.ost.DeleteObject(object.Path); err != nil { - if err != objectstorage.ErrNotExist { + if err != ostypes.ErrNotExist { log.Warnf("failed to delete cache object %q: %v", object.Path, err) } } diff --git a/internal/services/runservice/store/store.go b/internal/services/runservice/store/store.go index 93eb579..8370593 100644 --- a/internal/services/runservice/store/store.go +++ b/internal/services/runservice/store/store.go @@ -24,7 +24,7 @@ import ( "github.com/sorintlab/agola/internal/datamanager" "github.com/sorintlab/agola/internal/etcd" - "github.com/sorintlab/agola/internal/objectstorage" + ostypes "github.com/sorintlab/agola/internal/objectstorage/types" "github.com/sorintlab/agola/internal/services/runservice/common" "github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/util" @@ -540,7 +540,7 @@ func GetRunEtcdOrOST(ctx context.Context, e *etcd.Store, dm *datamanager.DataMan } if r == nil { r, err = OSTGetRun(dm, runID) - if err != nil && err != objectstorage.ErrNotExist { + if err != nil && err != ostypes.ErrNotExist { return nil, err } }