From 86e8479de978c42b6c296a821013c53878b1627f Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Fri, 22 Feb 2019 08:45:59 +0100 Subject: [PATCH] Add initial objectstorage wal manager --- go.mod | 22 +- go.sum | 109 ++- internal/testutil/utils.go | 389 +++++++++++ internal/wal/changes.go | 374 +++++++++++ internal/wal/wal.go | 1280 ++++++++++++++++++++++++++++++++++++ internal/wal/wal_test.go | 275 ++++++++ 6 files changed, 2371 insertions(+), 78 deletions(-) create mode 100644 internal/testutil/utils.go create mode 100644 internal/wal/changes.go create mode 100644 internal/wal/wal.go create mode 100644 internal/wal/wal_test.go diff --git a/go.mod b/go.mod index 819e27f..5d775f8 100644 --- a/go.mod +++ b/go.mod @@ -3,23 +3,21 @@ module github.com/sorintlab/agola require ( github.com/Masterminds/squirrel v1.1.0 github.com/go-ini/ini v1.42.0 // indirect - github.com/go-sql-driver/mysql v1.4.1 // indirect - github.com/gopherjs/gopherjs v0.0.0-20181103185306-d547d1d9531e // indirect - github.com/jtolds/gls v4.2.1+incompatible // indirect - github.com/lib/pq v1.0.0 // indirect + github.com/gogo/protobuf v1.2.1 // indirect + github.com/golang/protobuf v1.3.0 // indirect + github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect + github.com/kr/pty v1.1.3 // indirect github.com/mattn/go-sqlite3 v1.10.0 github.com/minio/minio-go v6.0.14+incompatible github.com/mitchellh/go-homedir v1.1.0 // indirect - github.com/pkg/errors v0.8.0 + github.com/pkg/errors v0.8.1 github.com/sanity-io/litter v1.1.0 - github.com/smartystreets/assertions v0.0.0-20190215210624-980c5ac6f3ac // indirect - github.com/smartystreets/goconvey v0.0.0-20181108003508-044398e4856c // indirect + github.com/satori/go.uuid v1.2.0 + github.com/sgotti/gexpect v0.0.0-20161123102107-0afc6c19f50a go.etcd.io/etcd v0.0.0-20181128220305-dedae6eb7c25 go.uber.org/zap v1.9.1 - golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 // indirect - golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e // indirect - golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 // indirect - google.golang.org/appengine v1.4.0 // indirect - gopkg.in/ini.v1 v1.42.0 // indirect + golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25 // indirect + golang.org/x/net v0.0.0-20190301231341-16b79f2e4e95 // indirect + google.golang.org/grpc v1.19.0 // indirect gopkg.in/yaml.v2 v2.2.2 ) diff --git a/go.sum b/go.sum index 54a3c3f..e7b2ec2 100644 --- a/go.sum +++ b/go.sum @@ -1,119 +1,97 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Masterminds/squirrel v1.1.0 h1:baP1qLdoQCeTw3ifCdOq2dkYc6vGcmRdaociKLbEJXs= github.com/Masterminds/squirrel v1.1.0/go.mod h1:yaPeOnPG5ZRwL9oKdTsO/prlkPbXWZlRVMQ/gGlzIuA= -github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= -github.com/coreos/go-semver v0.2.0 h1:3Jm3tLmsgAYcjC+4Up7hJrFBPr+n7rAqYeSw/SZazuY= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/coreos/etcd v3.3.12+incompatible h1:pAWNwdf7QiT1zfaWyqCtNZQWCLByQyA3JrSQyuYAqnQ= +github.com/coreos/etcd v3.3.12+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7 h1:u9SHYsPQNyt5tgDm3YN7+9dYrpK96E5wFilTFWIDZOM= github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf h1:CAKfRE2YtTUIjjh1bkBtyYFaUT/WmOqsJjgtihT0vMI= github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= -github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4 h1:qk/FSDDxo05wdJH28W+p5yivv7LuLYLRXPPD8KQCtZs= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= -github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= -github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-ini/ini v1.42.0 h1:TWr1wGj35+UiWHlBA8er89seFXxzwFn11spilrrj+38= github.com/go-ini/ini v1.42.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= -github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= -github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= -github.com/gogo/protobuf v1.0.0 h1:2jyBKDKU/8v3v2xVR2PtiWQviFUyiaGk2rpfyFT8rTM= github.com/gogo/protobuf v1.0.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= +github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= +github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903 h1:LbsanbbD6LieFkXbj9YNNBupiGHJgFeLpO0j0Fza1h8= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= -github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/google/btree v0.0.0-20180124185431-e89373fe6b4a h1:ZJu5NB1Bk5ms4vw0Xu4i+jD32SE9jQXyfnOvwhHqlT0= +github.com/golang/protobuf v1.3.0 h1:kbxbvI4Un1LUWKxufD+BiE6AEExYYgkQLQmLFqA1LFk= +github.com/golang/protobuf v1.3.0/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a1/R87v0= github.com/google/btree v0.0.0-20180124185431-e89373fe6b4a/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/gopherjs/gopherjs v0.0.0-20181103185306-d547d1d9531e h1:JKmoR8x90Iww1ks85zJ1lfDGgIiMDuIptTOhJq+zKyg= -github.com/gopherjs/gopherjs v0.0.0-20181103185306-d547d1d9531e/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= -github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c h1:Lh2aW+HnU2Nbe1gqD9SOJLJxW1jBMmQOktN2acDyJk8= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:Iju5GlWwrvL6UBg4zJJt3btmonfrMlCDdsejg4CZE7c= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= -github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= -github.com/grpc-ecosystem/grpc-gateway v1.4.1 h1:pX7cnDwSSmG0dR9yNjCQSSpmsJOqFdT7SzVp5Yl9uVw= github.com/grpc-ecosystem/grpc-gateway v1.4.1/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw= -github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= -github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= -github.com/jtolds/gls v4.2.1+incompatible h1:fSuqC+Gmlu6l/ZYAoZzx2pyucC8Xza35fpRVWLVmUEE= -github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= +github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= +github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/pty v1.0.0/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/pty v1.1.3 h1:/Um6a/ZmD5tF7peoOJ5oN5KMQ0DrGVQSXLNwyckutPk= +github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 h1:SOEGU9fKiNWd/HOJuq6+3iTQz8KNCLtVX6idSoTLdUw= github.com/lann/builder v0.0.0-20180802200727-47ae307949d0/go.mod h1:dXGbAdH5GtBTC4WfIxhKZfyBF/HBFgRZSWwZ9g/He9o= github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 h1:P6pPBnrTSX3DEVR4fDembhRWSsG5rVo6hYhAB/ADZrk= github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6FmdpVm2joNMFikkuWg0EoCKLGUMNw= -github.com/lib/pq v1.0.0 h1:X5PMW56eZitiTeO7tKzZxFCSpbFZJtkMMooicw2us9A= -github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/mattn/go-sqlite3 v1.10.0 h1:jbhqpg7tQe4SupckyijYiy0mJJ/pRyHvXf7JdWK860o= github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= -github.com/matttproud/golang_protobuf_extensions v1.0.0 h1:YNOwxxSJzSUARoD9KRZLzM9Y858MNGCOACTvCW9TSAc= github.com/matttproud/golang_protobuf_extensions v1.0.0/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/minio/minio-go v6.0.14+incompatible h1:fnV+GD28LeqdN6vT2XdGKW8Qe/IfjJDswNVuni6km9o= github.com/minio/minio-go v6.0.14+incompatible/go.mod h1:7guKYtitv8dktvNUGrhzmNlA5wrAABTQXCoesZdFQO8= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= -github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I= github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= -github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v0.8.0 h1:1921Yw9Gc3iSc4VQh3PIoOqgPCZS7G/4xQNVUp8Mda8= github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= -github.com/prometheus/client_model v0.0.0-20170216185247-6f3806018612 h1:13pIdM2tpaDi4OVe24fgoIS7ZTqMt0QI+bwQsX5hq+g= github.com/prometheus/client_model v0.0.0-20170216185247-6f3806018612/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= -github.com/prometheus/common v0.0.0-20180518154759-7600349dcfe1 h1:osmNoEW2SCW3L7EX0km2LYM8HKpNWRiouxjE3XHkyGc= github.com/prometheus/common v0.0.0-20180518154759-7600349dcfe1/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= -github.com/prometheus/procfs v0.0.0-20180612222113-7d6f385de8be h1:MoyXp/VjXUwM0GyDcdwT7Ubea2gxOSHpPaFo3qV+Y2A= github.com/prometheus/procfs v0.0.0-20180612222113-7d6f385de8be/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/sanity-io/litter v1.1.0 h1:BllcKWa3VbZmOZbDCoszYLk7zCsKHz5Beossi8SUcTc= github.com/sanity-io/litter v1.1.0/go.mod h1:CJ0VCw2q4qKU7LaQr3n7UOSHzgEMgcGco7N/SkZQPjw= -github.com/sirupsen/logrus v1.0.5 h1:8c8b5uO0zS4X6RPl/sd1ENwSkIc0/H2PaHxE3udaE8I= +github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= +github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +github.com/sgotti/gexpect v0.0.0-20161123102107-0afc6c19f50a h1:u7WP9TGHJIkJoi/dRDhvYPSthMIdUQPDETiZET/Utl8= +github.com/sgotti/gexpect v0.0.0-20161123102107-0afc6c19f50a/go.mod h1:HvB0+YQff1QGS1nct9E3/J8wo8s/EVjq+VXrJSDlQEY= github.com/sirupsen/logrus v1.0.5/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc= -github.com/smartystreets/assertions v0.0.0-20190215210624-980c5ac6f3ac h1:wbW+Bybf9pXxnCFAOWZTqkRjAc7rAIwo2e1ArUhiHxg= -github.com/smartystreets/assertions v0.0.0-20190215210624-980c5ac6f3ac/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= -github.com/smartystreets/goconvey v0.0.0-20181108003508-044398e4856c h1:Ho+uVpkel/udgjbwB5Lktg9BtvJSh2DT0Hi6LPSyI2w= -github.com/smartystreets/goconvey v0.0.0-20181108003508-044398e4856c/go.mod h1:XDJAKZRPZ1CvBcN2aX5YOUTYGHki24fSF0Iv48Ibg0s= -github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= -github.com/spf13/pflag v1.0.1 h1:aCvUg6QPl3ibpQUxyLkrEkCHtPqYJL4x9AuhqVqFis4= github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= -github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8 h1:ndzgwNDnKIqyCvHTXaCqh9KlOWKvBry6nuXMJmonVsE= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= -github.com/ugorji/go v1.1.1 h1:gmervu+jDMvXTbcHQ0pd2wee85nEoE0BsVyEuzkfK8w= github.com/ugorji/go v1.1.1/go.mod h1:hnLbHMwcvSihnDhEfx2/BzKp2xb0Y+ErdfYcrs9tkJQ= github.com/urfave/cli v1.18.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= -github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18 h1:MPPkRncZLN9Kh4MEFmbnK4h3BD7AUmskWv2+EeZJCCs= github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= -go.etcd.io/bbolt v1.3.1-etcd.7 h1:M0l89sIuZ+RkW0rLbUsmxescVzLwLUs+Kvks+0jeHdM= go.etcd.io/bbolt v1.3.1-etcd.7/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/etcd v0.0.0-20181128220305-dedae6eb7c25 h1:CwNwQW5bpjZHQ9KrGCx5vGYoq+LKcm7ZRGiCYYz6uXM= go.etcd.io/etcd v0.0.0-20181128220305-dedae6eb7c25/go.mod h1:weASp41xM3dk0YHg1s/W8ecdGP5G4teSTMBPpYAaUgA= +go.etcd.io/etcd v3.3.12+incompatible h1:V6PRYRGpU4k5EajJaaj/GL3hqIdzyPnBU8aPUp+35yw= +go.etcd.io/etcd v3.3.12+incompatible/go.mod h1:yaeTdrJi5lOmYerz05bd8+V7KubZs8YSFZfzsF9A6aI= go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= @@ -121,40 +99,39 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/ go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o= go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20180608092829-8ac0e0d97ce4/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= -golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 h1:mKdxBk7AujPs8kU4m80U72y/zjbZ3UcXC7dClwKbUI0= -golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= -golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25 h1:jsG6UpNLt9iAsb0S2AGW28DveNzzgmbXR+ENoPjUeIU= +golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e h1:bRhVy7zSSasaqNksaRZiA5EEI+Ei4I1nO5Jh72wfHlg= -golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190301231341-16b79f2e4e95 h1:fY7Dsw114eJN4boqzVSbpVHO6rTdhq6/GnXeu+PKnzU= +golang.org/x/net v0.0.0-20190301231341-16b79f2e4e95/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw= -golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e h1:o3PsSEY8E4eXWkXrIP9YJALUkVZqzHJT5DOasTyn8Vs= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 h1:+DCIGbF/swA92ohVg0//6X2IVY3KZs6p9mix0ziNYJM= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -google.golang.org/appengine v1.4.0 h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508= -google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= -google.golang.org/genproto v0.0.0-20180608181217-32ee49c4dd80 h1:GL7nK1hkDKrkor0eVOYcMdIsUGErFnaC2gpBOVC+vbI= +golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/genproto v0.0.0-20180608181217-32ee49c4dd80/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= -google.golang.org/grpc v1.14.0 h1:ArxJuB1NWfPY6r9Gp9gqwplT0Ge7nqv9msgu03lHLmo= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20180831171423-11092d34479b h1:lohp5blsw53GBXtLyLNaTXPXS9pJ1tiTw61ZHUoE9Qw= +google.golang.org/genproto v0.0.0-20180831171423-11092d34479b/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= -gopkg.in/airbrake/gobrake.v2 v2.0.9 h1:7z2uVWwn7oVeeugY1DtlPAy5H+KYgB1KeKTnqjNatLo= +google.golang.org/grpc v1.19.0 h1:cfg4PD8YEdSFnm7qLV4++93WcmhH2nIUhMjhdCvl3j8= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw= -gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= -gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 h1:OAj3g0cR6Dx/R07QgQe8wkA9RNjB2u4i700xBkIT4e0= gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2/go.mod h1:Xk6kEKp8OKb+X14hQBKWaSkCsqBpgog8nAV2xsGOxlo= -gopkg.in/ini.v1 v1.42.0 h1:7N3gPTt50s8GuLortA00n8AqRTk75qOP98+mTPpgzRk= -gopkg.in/ini.v1 v1.42.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= -gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/internal/testutil/utils.go b/internal/testutil/utils.go new file mode 100644 index 0000000..7ff9e76 --- /dev/null +++ b/internal/testutil/utils.go @@ -0,0 +1,389 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package testutil + +import ( + "bufio" + "context" + "fmt" + "net" + "net/url" + "os" + "os/exec" + "path/filepath" + "strconv" + "sync" + "testing" + "time" + + "github.com/sorintlab/agola/internal/etcd" + "go.etcd.io/etcd/embed" + "go.uber.org/zap" + + uuid "github.com/satori/go.uuid" + "github.com/sgotti/gexpect" +) + +const ( + sleepInterval = 500 * time.Millisecond + etcdTimeout = 5 * time.Second + + MinPort = 2048 + MaxPort = 16384 +) + +var curPort = MinPort +var portMutex = sync.Mutex{} + +type Process struct { + t *testing.T + uid string + name string + args []string + Cmd *gexpect.ExpectSubprocess + bin string +} + +func (p *Process) start() error { + if p.Cmd != nil { + panic(fmt.Errorf("%s: cmd not cleanly stopped", p.uid)) + } + cmd := exec.Command(p.bin, p.args...) + pr, pw, err := os.Pipe() + if err != nil { + return err + } + p.Cmd = &gexpect.ExpectSubprocess{Cmd: cmd, Output: pw} + if err := p.Cmd.Start(); err != nil { + return err + } + go func() { + scanner := bufio.NewScanner(pr) + for scanner.Scan() { + p.t.Logf("[%s %s]: %s", p.name, p.uid, scanner.Text()) + } + }() + + return nil +} + +func (p *Process) Start() error { + if err := p.start(); err != nil { + return err + } + p.Cmd.Continue() + return nil +} + +func (p *Process) StartExpect() error { + return p.start() +} + +func (p *Process) Signal(sig os.Signal) error { + p.t.Logf("signalling %s %s with %s", p.name, p.uid, sig) + if p.Cmd == nil { + panic(fmt.Errorf("p: %s, cmd is empty", p.uid)) + } + return p.Cmd.Cmd.Process.Signal(sig) +} + +func (p *Process) Kill() { + p.t.Logf("killing %s %s", p.name, p.uid) + if p.Cmd == nil { + panic(fmt.Errorf("p: %s, cmd is empty", p.uid)) + } + p.Cmd.Cmd.Process.Signal(os.Kill) + p.Cmd.Wait() + p.Cmd = nil +} + +func (p *Process) Stop() { + p.t.Logf("stopping %s %s", p.name, p.uid) + if p.Cmd == nil { + panic(fmt.Errorf("p: %s, cmd is empty", p.uid)) + } + p.Cmd.Continue() + p.Cmd.Cmd.Process.Signal(os.Interrupt) + p.Cmd.Wait() + p.Cmd = nil +} + +func (p *Process) Wait(timeout time.Duration) error { + timeoutCh := time.NewTimer(timeout).C + endCh := make(chan error) + go func() { + err := p.Cmd.Wait() + endCh <- err + }() + select { + case <-timeoutCh: + return fmt.Errorf("timeout waiting on process") + case <-endCh: + return nil + } +} + +type TestEmbeddedEtcd struct { + t *testing.T + *TestEtcd + Etcd *embed.Etcd + Endpoint string + ListenAddress string + Port string +} + +func NewTestEmbeddedEtcd(t *testing.T, logger *zap.Logger, dir string, a ...string) (*TestEmbeddedEtcd, error) { + u := uuid.NewV4() + uid := fmt.Sprintf("%x", u[:4]) + + dataDir := filepath.Join(dir, fmt.Sprintf("etcd%s", uid)) + + listenAddress, port, err := GetFreePort(true, false) + if err != nil { + return nil, err + } + listenAddress2, port2, err := GetFreePort(true, false) + if err != nil { + return nil, err + } + + cfg := embed.NewConfig() + cfg.Name = uid + cfg.Dir = dataDir + cfg.Logger = "zap" + cfg.LogOutputs = []string{"stdout"} + lcurl, _ := url.Parse(fmt.Sprintf("http://%s:%s", listenAddress, port)) + lpurl, _ := url.Parse(fmt.Sprintf("http://%s:%s", listenAddress2, port2)) + + cfg.LCUrls = []url.URL{*lcurl} + cfg.ACUrls = []url.URL{*lcurl} + cfg.LPUrls = []url.URL{*lpurl} + cfg.APUrls = []url.URL{*lpurl} + + cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) + + t.Logf("starting embedded etcd server") + embeddedEtcd, err := embed.StartEtcd(cfg) + if err != nil { + return nil, err + } + + storeEndpoint := fmt.Sprintf("http://%s:%s", listenAddress, port) + + storeConfig := etcd.Config{ + Logger: logger, + Endpoints: storeEndpoint, + } + e, err := etcd.New(storeConfig) + if err != nil { + return nil, fmt.Errorf("cannot create store: %v", err) + } + + tectd := &TestEmbeddedEtcd{ + t: t, + TestEtcd: &TestEtcd{ + e, + t, + }, + Etcd: embeddedEtcd, + Endpoint: storeEndpoint, + ListenAddress: listenAddress, + Port: port, + } + return tectd, nil +} + +func (te *TestEmbeddedEtcd) Start() error { + <-te.Etcd.Server.ReadyNotify() + return nil +} + +func (te *TestEmbeddedEtcd) Stop() error { + te.Etcd.Close() + return nil +} + +func (te *TestEmbeddedEtcd) Kill() error { + te.Etcd.Close() + return nil +} + +type TestExternalEtcd struct { + t *testing.T + *TestEtcd + Process + Endpoint string + ListenAddress string + Port string +} + +func NewTestExternalEtcd(t *testing.T, logger *zap.Logger, dir string, a ...string) (*TestExternalEtcd, error) { + u := uuid.NewV4() + uid := fmt.Sprintf("%x", u[:4]) + + dataDir := filepath.Join(dir, fmt.Sprintf("etcd%s", uid)) + + listenAddress, port, err := GetFreePort(true, false) + if err != nil { + return nil, err + } + listenAddress2, port2, err := GetFreePort(true, false) + if err != nil { + return nil, err + } + + args := []string{} + args = append(args, fmt.Sprintf("--name=%s", uid)) + args = append(args, fmt.Sprintf("--data-dir=%s", dataDir)) + args = append(args, fmt.Sprintf("--listen-client-urls=http://%s:%s", listenAddress, port)) + args = append(args, fmt.Sprintf("--advertise-client-urls=http://%s:%s", listenAddress, port)) + args = append(args, fmt.Sprintf("--listen-peer-urls=http://%s:%s", listenAddress2, port2)) + args = append(args, fmt.Sprintf("--initial-advertise-peer-urls=http://%s:%s", listenAddress2, port2)) + args = append(args, fmt.Sprintf("--initial-cluster=%s=http://%s:%s", uid, listenAddress2, port2)) + args = append(args, a...) + + storeEndpoint := fmt.Sprintf("http://%s:%s", listenAddress, port) + + storeConfig := etcd.Config{ + Logger: logger, + Endpoints: storeEndpoint, + } + e, err := etcd.New(storeConfig) + if err != nil { + return nil, fmt.Errorf("cannot create store: %v", err) + } + + bin := os.Getenv("ETCD_BIN") + if bin == "" { + return nil, fmt.Errorf("missing ETCD_BIN env") + } + tectd := &TestExternalEtcd{ + t: t, + TestEtcd: &TestEtcd{ + e, + t, + }, + Process: Process{ + t: t, + uid: uid, + name: "etcd", + bin: bin, + args: args, + }, + Endpoint: storeEndpoint, + ListenAddress: listenAddress, + Port: port, + } + return tectd, nil +} + +type TestEtcd struct { + *etcd.Store + t *testing.T +} + +func (te *TestEtcd) Compact() error { + ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout) + defer cancel() + resp, err := te.Get(ctx, "anykey") + if err != nil && err != etcd.ErrKeyNotFound { + return err + } + + _, err = te.Client().Compact(ctx, resp.Header.Revision) + return err +} + +func (te *TestEtcd) WaitUp(timeout time.Duration) error { + start := time.Now() + for time.Now().Add(-timeout).Before(start) { + ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout) + defer cancel() + _, err := te.Get(ctx, "anykey") + if err != nil && err == etcd.ErrKeyNotFound { + return nil + } + if err == nil { + return nil + } + time.Sleep(sleepInterval) + } + + return fmt.Errorf("timeout") +} + +func (te *TestEtcd) WaitDown(timeout time.Duration) error { + start := time.Now() + for time.Now().Add(-timeout).Before(start) { + ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout) + defer cancel() + _, err := te.Get(ctx, "anykey") + if err != nil && err != etcd.ErrKeyNotFound { + return nil + } + time.Sleep(sleepInterval) + } + + return fmt.Errorf("timeout") +} + +func testFreeTCPPort(port int) error { + ln, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port)) + if err != nil { + return err + } + ln.Close() + return nil +} + +func testFreeUDPPort(port int) error { + ln, err := net.ListenPacket("udp", fmt.Sprintf("localhost:%d", port)) + if err != nil { + return err + } + ln.Close() + return nil +} + +// Hack to find a free tcp and udp port +func GetFreePort(tcp bool, udp bool) (string, string, error) { + portMutex.Lock() + defer portMutex.Unlock() + + if !tcp && !udp { + return "", "", fmt.Errorf("at least one of tcp or udp port shuld be required") + } + localhostIP, err := net.ResolveIPAddr("ip", "localhost") + if err != nil { + return "", "", fmt.Errorf("failed to resolve ip addr: %v", err) + } + for { + curPort++ + if curPort > MaxPort { + return "", "", fmt.Errorf("all available ports to test have been exausted") + } + if tcp { + if err := testFreeTCPPort(curPort); err != nil { + continue + } + } + if udp { + if err := testFreeUDPPort(curPort); err != nil { + continue + } + } + return localhostIP.IP.String(), strconv.Itoa(curPort), nil + } +} diff --git a/internal/wal/changes.go b/internal/wal/changes.go new file mode 100644 index 0000000..5f466e7 --- /dev/null +++ b/internal/wal/changes.go @@ -0,0 +1,374 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "context" + "encoding/json" + "fmt" + "io" + "path" + "sort" + "strings" + "sync" + "time" + + "github.com/sorintlab/agola/internal/etcd" + + "github.com/pkg/errors" + etcdclientv3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" + "go.etcd.io/etcd/mvcc/mvccpb" +) + +type WalChanges struct { + actions map[string][]*Action + puts map[string]string + deletes map[string]string + pathsOrdered []string + walSeq string + revision int64 + changeGroupsRevisions changeGroupsRevisions + initialized bool + sync.Mutex +} + +func NewWalChanges() *WalChanges { + return &WalChanges{ + actions: make(map[string][]*Action), + puts: make(map[string]string), + deletes: make(map[string]string), + changeGroupsRevisions: make(changeGroupsRevisions), + } +} + +func (c *WalChanges) String() string { + return fmt.Sprintf("puts: %s, deletes: %s, walSeq: %s, revision: %d, initialized: %t", c.puts, c.deletes, c.walSeq, c.revision, c.initialized) +} + +func (c *WalChanges) curRevision() int64 { + return c.revision +} + +func (c *WalChanges) putRevision(revision int64) { + c.revision = revision +} + +func (c *WalChanges) curWalSeq() string { + return c.walSeq +} + +func (c *WalChanges) getPut(p string) (string, bool) { + walseq, ok := c.puts[p] + return walseq, ok +} + +func (c *WalChanges) getDeletesMap() map[string]struct{} { + dmap := map[string]struct{}{} + for p := range c.deletes { + dmap[p] = struct{}{} + } + return dmap +} + +func (c *WalChanges) getDelete(p string) bool { + _, ok := c.deletes[p] + return ok +} + +func (c *WalChanges) addPut(p, walseq string, revision int64) { + delete(c.deletes, p) + c.puts[p] = walseq + + c.walSeq = walseq + c.revision = revision +} + +func (c *WalChanges) removePut(p string, revision int64) { + delete(c.puts, p) + + c.revision = revision +} + +func (c *WalChanges) addDelete(p, walseq string, revision int64) { + delete(c.puts, p) + c.deletes[p] = walseq + + c.walSeq = walseq + c.revision = revision +} + +func (c *WalChanges) removeDelete(p string, revision int64) { + delete(c.deletes, p) + + c.revision = revision +} + +func (c *WalChanges) getChangeGroups(cgNames []string) changeGroupsRevisions { + cgr := map[string]int64{} + for _, cgName := range cgNames { + if rev, ok := c.changeGroupsRevisions[cgName]; ok { + cgr[cgName] = rev + } else { + // for non existing changegroups use a changegroup with revision = 0 + cgr[cgName] = 0 + } + } + + return cgr +} + +func (c *WalChanges) putChangeGroup(cgName string, cgRev int64) { + c.changeGroupsRevisions[cgName] = cgRev +} + +func (c *WalChanges) removeChangeGroup(cgName string) { + delete(c.changeGroupsRevisions, cgName) +} + +func (c *WalChanges) updatePathsOrdered() { + c.pathsOrdered = make([]string, len(c.puts)) + i := 0 + for p := range c.puts { + c.pathsOrdered[i] = p + i++ + } + sort.Sort(sort.StringSlice(c.pathsOrdered)) +} + +func (w *WalManager) applyWalChanges(ctx context.Context, walData *WalData, revision int64) error { + walDataFilePath := w.storageWalDataFile(walData.WalDataFileID) + + walDataFile, err := w.lts.ReadObject(walDataFilePath) + if err != nil { + return errors.Wrapf(err, "failed to read waldata %q", walDataFilePath) + } + defer walDataFile.Close() + dec := json.NewDecoder(walDataFile) + + w.changes.Lock() + defer w.changes.Unlock() + for { + var action *Action + + err := dec.Decode(&action) + if err == io.EOF { + // all done + break + } + if err != nil { + return errors.Wrapf(err, "failed to decode wal file") + } + + w.applyWalChangesAction(ctx, action, walData.WalSequence, revision) + + additionalActions, err := w.additionalActionsFunc(action) + if err != nil { + return err + } + for _, action := range additionalActions { + w.applyWalChangesAction(ctx, action, walData.WalSequence, revision) + } + } + + w.changes.updatePathsOrdered() + + return nil +} + +func (w *WalManager) applyWalChangesAction(ctx context.Context, action *Action, walSequence string, revision int64) { + switch action.ActionType { + case ActionTypePut: + w.changes.addPut(action.Path, walSequence, revision) + + case ActionTypeDelete: + w.changes.addDelete(action.Path, walSequence, revision) + } + if w.changes.actions[walSequence] == nil { + w.changes.actions[walSequence] = []*Action{} + } + w.changes.actions[walSequence] = append(w.changes.actions[walSequence], action) +} + +func (w *WalManager) watcherLoop(ctx context.Context) error { + for { + initialized := w.changes.initialized + if !initialized { + if err := w.initializeChanges(ctx); err != nil { + w.log.Errorf("watcher err: %+v", err) + } + } else { + if err := w.watcher(ctx); err != nil { + w.log.Errorf("watcher err: %+v", err) + } + } + + select { + case <-ctx.Done(): + w.log.Infof("watcher exiting") + return nil + default: + } + + time.Sleep(1 * time.Second) + } +} + +func (w *WalManager) initializeChanges(ctx context.Context) error { + var revision int64 + var continuation *etcd.ListPagedContinuation + for { + listResp, err := w.e.ListPaged(ctx, etcdWalsDir+"/", 0, 10, continuation) + if err != nil { + return err + } + resp := listResp.Resp + continuation = listResp.Continuation + + revision = resp.Header.Revision + + for _, kv := range resp.Kvs { + var walData *WalData + if err := json.Unmarshal(kv.Value, &walData); err != nil { + return err + } + if err := w.applyWalChanges(ctx, walData, revision); err != nil { + return err + } + } + if !listResp.HasMore { + break + } + } + + continuation = nil + // use the same revision + for { + listResp, err := w.e.ListPaged(ctx, etcdChangeGroupsDir+"/", 0, 10, continuation) + if err != nil { + return err + } + resp := listResp.Resp + continuation = listResp.Continuation + + for _, kv := range resp.Kvs { + w.changes.Lock() + changeGroup := path.Base(string(kv.Key)) + w.changes.putChangeGroup(changeGroup, kv.ModRevision) + w.changes.Unlock() + } + if !listResp.HasMore { + break + } + } + + w.changes.Lock() + w.changes.revision = revision + w.changes.initialized = true + w.changes.Unlock() + + return nil +} + +func (w *WalManager) watcher(ctx context.Context) error { + w.changes.Lock() + revision := w.changes.curRevision() + w.changes.Unlock() + + wctx, cancel := context.WithCancel(ctx) + defer cancel() + wch := w.e.Watch(wctx, etcdWalBaseDir+"/", revision+1) + for wresp := range wch { + if wresp.Canceled { + err := wresp.Err() + if err == etcdclientv3rpc.ErrCompacted { + w.log.Errorf("required events already compacted, reinitializing watcher changes") + w.changes.Lock() + w.changes.initialized = false + w.changes.Unlock() + } + return errors.Wrapf(err, "watch error") + } + revision := wresp.Header.Revision + + for _, ev := range wresp.Events { + key := string(ev.Kv.Key) + + switch { + case strings.HasPrefix(key, etcdWalsDir+"/"): + switch ev.Type { + case mvccpb.PUT: + var walData *WalData + if err := json.Unmarshal(ev.Kv.Value, &walData); err != nil { + return err + } + if walData.WalStatus != WalStatusCommitted { + continue + } + if err := w.applyWalChanges(ctx, walData, revision); err != nil { + return err + } + case mvccpb.DELETE: + walseq := path.Base(string(key)) + w.changes.Lock() + putsToDelete := []string{} + deletesToDelete := []string{} + for p, pwalseq := range w.changes.puts { + if pwalseq == walseq { + putsToDelete = append(putsToDelete, p) + } + } + for p, pwalseq := range w.changes.deletes { + if pwalseq == walseq { + deletesToDelete = append(deletesToDelete, p) + } + } + for _, p := range putsToDelete { + w.changes.removePut(p, revision) + } + for _, p := range deletesToDelete { + w.changes.removeDelete(p, revision) + } + + delete(w.changes.actions, walseq) + + w.changes.updatePathsOrdered() + + w.changes.Unlock() + } + + case strings.HasPrefix(key, etcdChangeGroupsDir+"/"): + switch ev.Type { + case mvccpb.PUT: + w.changes.Lock() + changeGroup := path.Base(string(ev.Kv.Key)) + w.changes.putChangeGroup(changeGroup, ev.Kv.ModRevision) + w.changes.Unlock() + case mvccpb.DELETE: + w.changes.Lock() + changeGroup := path.Base(string(ev.Kv.Key)) + w.changes.removeChangeGroup(changeGroup) + w.changes.Unlock() + } + + case key == etcdPingKey: + w.changes.Lock() + w.changes.putRevision(wresp.Header.Revision) + w.changes.Unlock() + } + } + } + + return nil +} diff --git a/internal/wal/wal.go b/internal/wal/wal.go new file mode 100644 index 0000000..89149f1 --- /dev/null +++ b/internal/wal/wal.go @@ -0,0 +1,1280 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "bytes" + "container/ring" + "context" + "encoding/json" + "io" + "io/ioutil" + "path" + "strings" + "time" + + uuid "github.com/satori/go.uuid" + "github.com/sorintlab/agola/internal/etcd" + "github.com/sorintlab/agola/internal/objectstorage" + "github.com/sorintlab/agola/internal/sequence" + + "github.com/pkg/errors" + etcdclientv3 "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/clientv3/concurrency" + etcdclientv3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" + "go.etcd.io/etcd/mvcc/mvccpb" + "go.uber.org/zap" +) + +// TODO(sgotti) handle etcd unwanted changes: +// * Etcd cluster rebuild: we cannot rely on etcd header ClusterID since it could be the same as it's generated using the listen urls. We should add our own clusterid key and use it. +// * Etcd cluster restored to a previous revision: really bad cause should detect that the revision is smaller than the current one + +// Storage paths +// wals/{walSeq} +// +// Etcd paths +// wals/{walSeq} + +const ( + DefaultEtcdWalsKeepNum = 100 +) + +var ( + ErrCompacted = errors.New("required revision has been compacted") + ErrConcurrency = errors.New("wal concurrency error: change groups already updated") +) + +var ( + // Storage paths. Always use path (not filepath) to use the "/" separator + storageObjectsPrefix = "data/" + storageWalsDir = "wals" + storageWalsStatusDir = path.Join(storageWalsDir, "status") + storageWalsDataDir = path.Join(storageWalsDir, "data") + + // etcd paths. Always use path (not filepath) to use the "/" separator + etcdWalBaseDir = "walmanager" + etcdWalsDir = path.Join(etcdWalBaseDir, "wals") + etcdWalsDataKey = path.Join(etcdWalBaseDir, "walsdata") + etcdWalSeqKey = path.Join(etcdWalBaseDir, "walseq") + etcdLastCommittedStorageWalSeqKey = path.Join(etcdWalBaseDir, "lastcommittedstoragewalseq") + + etcdSyncLockKey = path.Join(etcdWalBaseDir, "synclock") + etcdCheckpointLockKey = path.Join(etcdWalBaseDir, "checkpointlock") + etcdWalCleanerLockKey = path.Join(etcdWalBaseDir, "walcleanerlock") + + etcdChangeGroupsDir = path.Join(etcdWalBaseDir, "changegroups") + etcdChangeGroupMinRevisionKey = path.Join(etcdWalBaseDir, "changegroupsminrev") + + etcdPingKey = path.Join(etcdWalBaseDir, "ping") +) + +const ( + etcdChangeGroupMinRevisionRange = 1000 +) + +func (w *WalManager) toStorageDataPath(path string) string { + return w.basePath + storageObjectsPrefix + path +} + +func (w *WalManager) fromStorageDataPath(path string) string { + return strings.TrimPrefix(path, w.basePath+storageObjectsPrefix) +} + +func (w *WalManager) storageWalStatusFile(walSeq string) string { + return path.Join(w.basePath, storageWalsStatusDir, walSeq) +} + +func (w *WalManager) storageWalDataFile(walFileID string) string { + return path.Join(w.basePath, storageWalsDataDir, walFileID) +} + +func etcdWalKey(walSeq string) string { + return path.Join(etcdWalsDir, walSeq) +} + +type ActionType string + +const ( + ActionTypePut ActionType = "put" + ActionTypeDelete ActionType = "delete" +) + +type Action struct { + ActionType ActionType + Path string + Data []byte +} + +type WalHeader struct { + WalDataFileID string + PreviousWalSequence string + ChangeGroups map[string]int64 +} + +type WalStatus string + +const ( + // WalStatusCommitted represent a wal written to the lts + WalStatusCommitted WalStatus = "committed" + // WalStatusCommittedStorage represent the .committed marker file written to the lts + WalStatusCommittedStorage WalStatus = "committed_storage" + // WalStatusCheckpointed mean that all the wal actions have been executed on the lts + WalStatusCheckpointed WalStatus = "checkpointed" +) + +type WalsData struct { + LastCommittedWalSequence string + Revision int64 `json:"-"` +} + +type WalData struct { + WalDataFileID string + WalStatus WalStatus + WalSequence string + PreviousWalSequence string + ChangeGroups map[string]int64 +} + +type ChangeGroupsUpdateToken struct { + CurRevision int64 `json:"cur_revision"` + ChangeGroupsRevisions changeGroupsRevisions `json:"change_groups_revisions"` +} + +type changeGroupsRevisions map[string]int64 + +func (w *WalManager) GetChangeGroupsUpdateToken(cgNames []string) *ChangeGroupsUpdateToken { + w.changes.Lock() + revision := w.changes.curRevision() + cgr := w.changes.getChangeGroups(cgNames) + w.changes.Unlock() + return &ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: cgr} +} + +func (w *WalManager) MergeChangeGroupsUpdateTokens(cgts []*ChangeGroupsUpdateToken) *ChangeGroupsUpdateToken { + mcgt := &ChangeGroupsUpdateToken{ChangeGroupsRevisions: make(changeGroupsRevisions)} + for _, cgt := range cgts { + // keep the lower curRevision + if cgt.CurRevision != 0 && cgt.CurRevision < mcgt.CurRevision { + mcgt.CurRevision = cgt.CurRevision + } + // keep the lower changegroup revision + for cgName, cgRev := range cgt.ChangeGroupsRevisions { + if mr, ok := mcgt.ChangeGroupsRevisions[cgName]; ok { + if cgRev < mr { + mcgt.ChangeGroupsRevisions[cgName] = cgRev + } + } else { + mcgt.ChangeGroupsRevisions[cgName] = cgRev + } + } + } + + return mcgt +} + +func (w *WalManager) ReadObject(p string, cgNames []string) (io.ReadCloser, *ChangeGroupsUpdateToken, error) { + w.changes.Lock() + walseq, ok := w.changes.getPut(p) + revision := w.changes.curRevision() + cgr := w.changes.getChangeGroups(cgNames) + actions := w.changes.actions[walseq] + w.changes.Unlock() + + cgt := &ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: cgr} + + if ok { + for _, action := range actions { + if action.ActionType == ActionTypePut && action.Path == p { + w.log.Debugf("reading file from wal: %q", action.Path) + return ioutil.NopCloser(bytes.NewReader(action.Data)), cgt, nil + } + + additionalActions, err := w.additionalActionsFunc(action) + if err != nil { + return nil, nil, err + } + for _, action := range additionalActions { + if action.ActionType == ActionTypePut && action.Path == p { + w.log.Debugf("reading file from wal additional actions: %q", action.Path) + return ioutil.NopCloser(bytes.NewReader(action.Data)), cgt, nil + } + } + } + return nil, nil, errors.Errorf("no file %s in wal %s", p, walseq) + } + + f, err := w.lts.ReadObject(w.toStorageDataPath(p)) + return f, cgt, err +} + +func (w *WalManager) changesList(paths []string, prefix, startWith string, recursive bool) []string { + fpaths := []string{} + for _, p := range paths { + if !recursive && len(p) > len(prefix) { + rel := strings.TrimPrefix(p, prefix) + skip := strings.Contains(rel, w.lts.Delimiter()) + if skip { + continue + } + } + if strings.HasPrefix(p, prefix) && p > startWith { + fpaths = append(fpaths, p) + } + } + + return fpaths +} + +func (w *WalManager) List(prefix, startWith string, recursive bool, doneCh <-chan struct{}) <-chan objectstorage.ObjectInfo { + objectCh := make(chan objectstorage.ObjectInfo, 1) + + prefix = w.toStorageDataPath(prefix) + startWith = w.toStorageDataPath(startWith) + + w.changes.Lock() + changesList := w.changesList(w.changes.pathsOrdered, prefix, startWith, recursive) + deletedChangesMap := w.changes.getDeletesMap() + w.changes.Unlock() + + ci := 0 + go func(objectCh chan<- objectstorage.ObjectInfo) { + defer close(objectCh) + for object := range w.lts.List(prefix, startWith, recursive, doneCh) { + if object.Err != nil { + objectCh <- object + return + } + object.Path = w.fromStorageDataPath(object.Path) + + for ci < len(changesList) { + p := changesList[ci] + if p < object.Path { + //w.log.Infof("using path from changelist: %q", p) + select { + // Send object content. + case objectCh <- objectstorage.ObjectInfo{Path: p}: + // If receives done from the caller, return here. + case <-doneCh: + return + } + ci++ + } else if p == object.Path { + ci++ + break + } else { + break + } + } + + if _, ok := deletedChangesMap[object.Path]; ok { + continue + } + + //w.log.Infof("using path from objectstorage: %q", object.Path) + select { + // Send object content. + case objectCh <- object: + // If receives done from the caller, return here. + case <-doneCh: + return + } + } + for ci < len(changesList) { + //w.log.Infof("using path from changelist: %q", changesList[ci]) + objectCh <- objectstorage.ObjectInfo{ + Path: changesList[ci], + } + ci++ + } + }(objectCh) + + return objectCh +} + +func (w *WalManager) HasLtsWal(walseq string) (bool, error) { + _, err := w.lts.Stat(w.storageWalStatusFile(walseq)) + if err == objectstorage.ErrNotExist { + return false, nil + } + if err != nil { + return false, err + } + return true, nil +} + +func (w *WalManager) ReadWal(walseq string) (io.ReadCloser, error) { + return w.lts.ReadObject(w.storageWalStatusFile(walseq) + ".committed") +} + +func (w *WalManager) ReadWalData(walFileID string) (io.ReadCloser, error) { + return w.lts.ReadObject(w.storageWalDataFile(walFileID)) +} + +type WalFile struct { + WalSequence string + Err error + Committed bool + Checkpointed bool +} + +func (w *WalManager) ListLtsWals(start string) <-chan *WalFile { + walCh := make(chan *WalFile, 1) + + go func() { + doneCh := make(chan struct{}) + defer close(doneCh) + defer close(walCh) + + curWal := &WalFile{} + var startPath string + if start != "" { + startPath = w.storageWalStatusFile(start) + } + + for object := range w.lts.List(path.Join(w.basePath, storageWalsStatusDir)+"/", startPath, true, doneCh) { + if object.Err != nil { + walCh <- &WalFile{ + Err: object.Err, + } + return + } + + name := path.Base(object.Path) + ext := path.Ext(name) + walSequence := strings.TrimSuffix(name, ext) + // wal file refers to another wal, so return the current one + if curWal.WalSequence != walSequence { + // if this happen something is wrong on the lts + if !curWal.Committed && curWal.Checkpointed { + walCh <- &WalFile{ + Err: errors.Errorf("wal is checkpointed but not committed. this should never happen"), + } + return + } + + if curWal.WalSequence != "" { + // skip not committed wals + if curWal.Committed { + walCh <- curWal + } + } + + curWal = &WalFile{ + WalSequence: walSequence, + } + } + + if ext == ".committed" { + curWal.Committed = true + } + if ext == ".checkpointed" { + curWal.Checkpointed = true + } + } + if curWal.WalSequence != "" { + walCh <- curWal + } + }() + + return walCh +} + +type ListEtcdWalsElement struct { + WalData *WalData + Err error +} + +func (w *WalManager) ListEtcdWals(ctx context.Context, revision int64) <-chan *ListEtcdWalsElement { + walCh := make(chan *ListEtcdWalsElement, 1) + + go func() { + defer close(walCh) + var continuation *etcd.ListPagedContinuation + for { + listResp, err := w.e.ListPaged(ctx, etcdWalsDir, revision, 10, continuation) + if err != nil { + walCh <- &ListEtcdWalsElement{ + Err: err, + } + return + } + resp := listResp.Resp + continuation = listResp.Continuation + + for _, kv := range resp.Kvs { + var walData *WalData + err := json.Unmarshal(kv.Value, &walData) + walCh <- &ListEtcdWalsElement{ + WalData: walData, + Err: err, + } + } + if !listResp.HasMore { + break + } + } + }() + + return walCh +} + +// FirstAvailableWalData returns the first (the one with smaller sequence) wal +// and returns it (or nil if not available) and the etcd revision at the time of +// the operation +func (w *WalManager) FirstAvailableWalData(ctx context.Context) (*WalData, int64, error) { + // list waldata and just get the first if available + listResp, err := w.e.ListPaged(ctx, etcdWalsDir, 0, 1, nil) + if err != nil { + return nil, 0, err + } + resp := listResp.Resp + revision := resp.Header.Revision + + if len(resp.Kvs) == 0 { + return nil, revision, nil + } + + var walData *WalData + if err := json.Unmarshal(resp.Kvs[0].Value, &walData); err != nil { + return nil, 0, err + } + + return walData, revision, nil +} + +func (w *WalManager) LastCommittedStorageWal(ctx context.Context) (string, int64, error) { + resp, err := w.e.Get(ctx, etcdLastCommittedStorageWalSeqKey) + if err != nil && err != etcd.ErrKeyNotFound { + return "", 0, err + } + if err == etcd.ErrKeyNotFound { + return "", 0, errors.Errorf("no last committedstorage wal on etcd") + } + lastCommittedStorageWal := string(resp.Kvs[0].Value) + revision := resp.Header.Revision + + return lastCommittedStorageWal, revision, nil +} + +type WatchElement struct { + Revision int64 + WalData *WalData + ChangeGroupsRevisions changeGroupsRevisions + + Err error +} + +func (w *WalManager) Watch(ctx context.Context, revision int64) <-chan *WatchElement { + walCh := make(chan *WatchElement, 1) + + // TODO(sgotti) if the etcd cluster goes down, watch won't return an error but + // wait until it comes back. We have to find a way to detect when the cluster + // is down and report an error so our clients can react (i.e. a readdb could + // mark itself as not in sync) + wctx := etcdclientv3.WithRequireLeader(ctx) + wch := w.e.Watch(wctx, etcdWalBaseDir+"/", revision) + + go func() { + defer close(walCh) + for wresp := range wch { + we := &WatchElement{ChangeGroupsRevisions: make(changeGroupsRevisions)} + + if wresp.Canceled { + err := wresp.Err() + switch err { + case etcdclientv3rpc.ErrCompacted: + we.Err = ErrCompacted + default: + we.Err = err + } + + walCh <- we + return + } + + we.Revision = wresp.Header.Revision + + for _, ev := range wresp.Events { + key := string(ev.Kv.Key) + + switch { + case strings.HasPrefix(key, etcdWalsDir+"/"): + switch ev.Type { + case mvccpb.PUT: + var walData *WalData + if err := json.Unmarshal(ev.Kv.Value, &walData); err != nil { + we.Err = wresp.Err() + walCh <- we + return + } + + we.WalData = walData + } + + case strings.HasPrefix(key, etcdChangeGroupsDir+"/"): + switch ev.Type { + case mvccpb.PUT: + changeGroup := path.Base(string(ev.Kv.Key)) + we.ChangeGroupsRevisions[changeGroup] = ev.Kv.ModRevision + case mvccpb.DELETE: + changeGroup := path.Base(string(ev.Kv.Key)) + we.ChangeGroupsRevisions[changeGroup] = 0 + } + + default: + continue + } + } + + walCh <- we + } + }() + + return walCh +} + +// WriteWal writes the provided actions in a wal file. The wal will be marked as +// "committed" on etcd if the provided group changes aren't changed in the +// meantime or a optimistic concurrency error will be returned and the wal won't +// be committed +// +// TODO(sgotti) save inside the wal file also the previous committed wal to +// handle possible lts list operation eventual consistency gaps (list won't +// report a wal at seq X but a wal at X+n, if this kind of eventual consistency +// ever exists) +func (w *WalManager) WriteWal(ctx context.Context, actions []*Action, cgt *ChangeGroupsUpdateToken) (*ChangeGroupsUpdateToken, error) { + return w.WriteWalAdditionalOps(ctx, actions, cgt, nil, nil) +} + +func (w *WalManager) WriteWalAdditionalOps(ctx context.Context, actions []*Action, cgt *ChangeGroupsUpdateToken, cmp []etcdclientv3.Cmp, then []etcdclientv3.Op) (*ChangeGroupsUpdateToken, error) { + if len(actions) == 0 { + return nil, errors.Errorf("cannot write wal: actions is empty") + } + + walSequence, err := sequence.IncSequence(ctx, w.e, etcdWalSeqKey) + if err != nil { + return nil, err + } + + resp, err := w.e.Get(ctx, etcdWalsDataKey) + if err != nil { + return nil, err + } + + var walsData WalsData + if err := json.Unmarshal(resp.Kvs[0].Value, &walsData); err != nil { + return nil, err + } + walsData.Revision = resp.Kvs[0].ModRevision + + walDataFileID := uuid.NewV4().String() + walDataFilePath := w.storageWalDataFile(walDataFileID) + walKey := etcdWalKey(walSequence.String()) + + var buf bytes.Buffer + for _, action := range actions { + actionj, err := json.Marshal(action) + if err != nil { + return nil, err + } + if _, err := buf.Write(actionj); err != nil { + return nil, err + } + } + if err := w.lts.WriteObject(walDataFilePath, bytes.NewReader(buf.Bytes())); err != nil { + return nil, err + } + w.log.Debugf("wrote wal file: %s", walDataFilePath) + + walsData.LastCommittedWalSequence = walSequence.String() + + walData := &WalData{ + WalSequence: walSequence.String(), + WalDataFileID: walDataFileID, + WalStatus: WalStatusCommitted, + } + + walsDataj, err := json.Marshal(walsData) + if err != nil { + return nil, err + } + walDataj, err := json.Marshal(walData) + if err != nil { + return nil, err + } + + if cmp == nil { + cmp = []etcdclientv3.Cmp{} + } + if then == nil { + then = []etcdclientv3.Op{} + } + + getWalsData := etcdclientv3.OpGet(etcdWalsDataKey) + getWal := etcdclientv3.OpGet(walKey) + + //w.log.Infof("cgt: %s", util.Dump(cgt)) + if cgt != nil { + for cgName, cgRev := range cgt.ChangeGroupsRevisions { + cgKey := path.Join(etcdChangeGroupsDir, cgName) + if cgRev > 0 { + cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.ModRevision(cgKey), "=", cgRev)) + } else { + cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(cgKey), "=", 0)) + } + then = append(then, etcdclientv3.OpPut(cgKey, "")) + } + + if cgt.CurRevision > 0 { + cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.ModRevision(etcdChangeGroupMinRevisionKey), "<", cgt.CurRevision+etcdChangeGroupMinRevisionRange)) + } + } + + cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.ModRevision(etcdWalsDataKey), "=", walsData.Revision)) + cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.Version(walKey), "=", 0)) + + then = append(then, etcdclientv3.OpPut(etcdWalsDataKey, string(walsDataj))) + then = append(then, etcdclientv3.OpPut(walKey, string(walDataj))) + + // This will only succeed if no one else have concurrently updated the walsData + // TODO(sgotti) retry if it failed due to concurrency errors + txn := w.e.Client().Txn(ctx).If(cmp...).Then(then...).Else(getWalsData, getWal) + tresp, err := txn.Commit() + if err != nil { + return nil, etcd.FromEtcdError(err) + } + if !tresp.Succeeded { + walsDataRev := tresp.Responses[0].GetResponseRange().Kvs[0].ModRevision + walDataCreateRev := tresp.Responses[0].GetResponseRange().Kvs[0].CreateRevision + + // TODO(sgotti) If the tx failed due to walsdata already updated we could retry + if walsDataRev == walsData.Revision && walDataCreateRev == 0 { + return nil, errors.Errorf("failed to write committed wal: wals groups already updated") + } + return nil, ErrConcurrency + } + + ncgt := &ChangeGroupsUpdateToken{CurRevision: tresp.Header.Revision, ChangeGroupsRevisions: make(changeGroupsRevisions)} + if cgt != nil { + for cgName := range cgt.ChangeGroupsRevisions { + ncgt.ChangeGroupsRevisions[cgName] = tresp.Header.Revision + } + } + + // try to commit storage right now + if err := w.sync(ctx); err != nil { + w.log.Errorf("wal sync error: %+v", err) + } + + return ncgt, nil +} + +func (w *WalManager) syncLoop(ctx context.Context) { + for { + w.log.Debugf("syncer") + if err := w.sync(ctx); err != nil { + w.log.Errorf("syncer error: %+v", err) + } + + select { + case <-ctx.Done(): + return + default: + } + + time.Sleep(5 * time.Second) + } +} + +func (w *WalManager) sync(ctx context.Context) error { + session, err := concurrency.NewSession(w.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) + if err != nil { + return err + } + defer session.Close() + + m := concurrency.NewMutex(session, etcdSyncLockKey) + + if err := m.Lock(ctx); err != nil { + return err + } + defer m.Unlock(ctx) + + resp, err := w.e.List(ctx, etcdWalsDir+"/", "", 0) + if err != nil { + return err + } + for _, kv := range resp.Kvs { + var walData WalData + if err := json.Unmarshal(kv.Value, &walData); err != nil { + return err + } + // wals must be committed and checkpointed in order. + // TODO(sgotti) this could be optimized by parallelizing writes of wals that don't have common change groups + switch walData.WalStatus { + case WalStatusCommitted: + walFilePath := w.storageWalStatusFile(walData.WalSequence) + w.log.Debugf("syncing committed wal to storage") + header := &WalHeader{ + WalDataFileID: walData.WalDataFileID, + ChangeGroups: walData.ChangeGroups, + PreviousWalSequence: walData.PreviousWalSequence, + } + headerj, err := json.Marshal(header) + if err != nil { + return err + } + + walFileCommittedPath := walFilePath + ".committed" + if err := w.lts.WriteObject(walFileCommittedPath, bytes.NewReader(headerj)); err != nil { + return err + } + + w.log.Debugf("updating wal to state %q", WalStatusCommittedStorage) + walData.WalStatus = WalStatusCommittedStorage + walDataj, err := json.Marshal(walData) + if err != nil { + return err + } + + cmp := []etcdclientv3.Cmp{} + then := []etcdclientv3.Op{} + cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.ModRevision(string(kv.Key)), "=", kv.ModRevision)) + then = append(then, etcdclientv3.OpPut(string(kv.Key), string(walDataj))) + then = append(then, etcdclientv3.OpPut(string(etcdLastCommittedStorageWalSeqKey), string(walData.WalSequence))) + + // This will only succeed if the no one else have concurrently updated the wal keys in etcd + txn := w.e.Client().Txn(ctx).If(cmp...).Then(then...) + tresp, err := txn.Commit() + if err != nil { + return etcd.FromEtcdError(err) + } + if !tresp.Succeeded { + return errors.Errorf("failed to write committedstorage wal: concurrent update") + } + case WalStatusCheckpointed: + walFilePath := w.storageWalStatusFile(walData.WalSequence) + w.log.Debugf("checkpointing committed wal to storage") + walFileCheckpointedPath := walFilePath + ".checkpointed" + if err := w.lts.WriteObject(walFileCheckpointedPath, bytes.NewReader([]byte{})); err != nil { + return err + } + } + } + return nil +} + +func (w *WalManager) checkpointLoop(ctx context.Context) { + for { + w.log.Debugf("checkpointer") + if err := w.checkpoint(ctx); err != nil { + w.log.Errorf("checkpoint error: %v", err) + } + + select { + case <-ctx.Done(): + return + default: + } + + time.Sleep(2 * time.Second) + } +} + +func (w *WalManager) checkpoint(ctx context.Context) error { + session, err := concurrency.NewSession(w.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) + if err != nil { + return err + } + defer session.Close() + + m := concurrency.NewMutex(session, etcdCheckpointLockKey) + + if err := m.Lock(ctx); err != nil { + return err + } + defer m.Unlock(ctx) + + resp, err := w.e.List(ctx, etcdWalsDir+"/", "", 0) + if err != nil { + return err + } + for _, kv := range resp.Kvs { + var walData WalData + if err := json.Unmarshal(kv.Value, &walData); err != nil { + return err + } + if walData.WalStatus == WalStatusCommitted { + w.log.Warnf("wal %s not yet committed storage", walData.WalSequence) + break + } + if walData.WalStatus == WalStatusCheckpointed { + continue + } + walFilePath := w.storageWalDataFile(walData.WalDataFileID) + w.log.Debugf("checkpointing wal: %q", walData.WalSequence) + + walFile, err := w.lts.ReadObject(walFilePath) + if err != nil { + return err + } + defer walFile.Close() + dec := json.NewDecoder(walFile) + for { + var action *Action + + err := dec.Decode(&action) + if err == io.EOF { + // all done + break + } + if err != nil { + return err + } + + if err := w.checkpointAction(ctx, action); err != nil { + return err + } + + additionalActions, err := w.additionalActionsFunc(action) + if err != nil { + return err + } + for _, action := range additionalActions { + if err := w.checkpointAction(ctx, action); err != nil { + return err + } + } + } + + w.log.Debugf("updating wal to state %q", WalStatusCheckpointed) + walData.WalStatus = WalStatusCheckpointed + walDataj, err := json.Marshal(walData) + if err != nil { + return err + } + if _, err := w.e.AtomicPut(ctx, string(kv.Key), walDataj, kv.ModRevision, nil); err != nil { + return err + } + } + return nil +} + +func (w *WalManager) checkpointAction(ctx context.Context, action *Action) error { + path := w.toStorageDataPath(action.Path) + switch action.ActionType { + case ActionTypePut: + w.log.Debugf("writing file: %q", path) + if err := w.lts.WriteObject(path, bytes.NewReader(action.Data)); err != nil { + return err + } + + case ActionTypeDelete: + w.log.Debugf("deleting file: %q", path) + if err := w.lts.DeleteObject(path); err != nil && err != objectstorage.ErrNotExist { + return err + } + } + + return nil +} + +func (w *WalManager) walCleanerLoop(ctx context.Context) { + for { + w.log.Debugf("walcleaner") + if err := w.walCleaner(ctx); err != nil { + w.log.Errorf("walcleaner error: %v", err) + } + + select { + case <-ctx.Done(): + return + default: + } + + time.Sleep(2 * time.Second) + } +} + +// walCleaner will clean already checkpointed wals from etcd +// it must always keep at least one wal that is needed for resync operations +// from clients +func (w *WalManager) walCleaner(ctx context.Context) error { + session, err := concurrency.NewSession(w.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) + if err != nil { + return err + } + defer session.Close() + + m := concurrency.NewMutex(session, etcdWalCleanerLockKey) + + if err := m.Lock(ctx); err != nil { + return err + } + defer m.Unlock(ctx) + + resp, err := w.e.List(ctx, etcdWalsDir+"/", "", 0) + if err != nil { + return err + } + if len(resp.Kvs) <= w.etcdWalsKeepNum { + return nil + } + removeCount := len(resp.Kvs) - w.etcdWalsKeepNum + + for _, kv := range resp.Kvs { + var walData WalData + if err := json.Unmarshal(kv.Value, &walData); err != nil { + return err + } + if walData.WalStatus != WalStatusCheckpointed { + break + } + + // TODO(sgotti) check that the objectstorage returns the wal actions as checkpointed. + // With eventual consistent object storages like S3 we shouldn't remove a wal + // file from etcd (and so from the cache) until we are sure there're no + // eventual consistency issues. The difficult part is how to check them and be + // sure that no objects with old data will be returned? Is it enough to read + // it back or the result could just be luckily correct but another client may + // arrive to a differnt S3 server that is not yet in sync? + w.log.Infof("removing wal %q from etcd", walData.WalSequence) + if _, err := w.e.AtomicDelete(ctx, string(kv.Key), kv.ModRevision); err != nil { + return err + } + + removeCount-- + if removeCount == 0 { + return nil + } + } + + return nil +} + +func (w *WalManager) compactChangeGroupsLoop(ctx context.Context) { + for { + if err := w.compactChangeGroups(ctx); err != nil { + w.log.Errorf("err: %+v", err) + } + + select { + case <-ctx.Done(): + return + default: + } + + time.Sleep(1 * time.Second) + } +} + +func (w *WalManager) compactChangeGroups(ctx context.Context) error { + resp, err := w.e.Client().Get(ctx, etcdChangeGroupMinRevisionKey) + if err != nil { + return err + } + + revision := resp.Kvs[0].ModRevision + + // first update minrevision + cmp := etcdclientv3.Compare(etcdclientv3.ModRevision(etcdChangeGroupMinRevisionKey), "=", revision) + then := etcdclientv3.OpPut(etcdChangeGroupMinRevisionKey, "") + txn := w.e.Client().Txn(ctx).If(cmp).Then(then) + tresp, err := txn.Commit() + if err != nil { + return etcd.FromEtcdError(err) + } + if !tresp.Succeeded { + return errors.Errorf("failed to update change group min revision key due to concurrent update") + } + + revision = tresp.Header.Revision + + // then remove all the groups keys with modrevision < minrevision + resp, err = w.e.List(ctx, etcdChangeGroupsDir, "", 0) + if err != nil { + return err + } + for _, kv := range resp.Kvs { + if kv.ModRevision < revision-etcdChangeGroupMinRevisionRange { + cmp := etcdclientv3.Compare(etcdclientv3.ModRevision(string(kv.Key)), "=", kv.ModRevision) + then := etcdclientv3.OpDelete(string(kv.Key)) + txn := w.e.Client().Txn(ctx).If(cmp).Then(then) + tresp, err := txn.Commit() + if err != nil { + return etcd.FromEtcdError(err) + } + if !tresp.Succeeded { + w.log.Errorf("failed to update change group min revision key due to concurrent update") + } + } + } + return nil +} + +// etcdPingerLoop periodically updates a key. +// This is used by watchers to inform the client of the current revision +// this is needed since if other users are updating other unwatched keys on +// etcd we won't be notified, not updating the known revisions and thus all the +// walWrites will fails since the provided changegrouptoken will have an old +// revision +// TODO(sgotti) use upcoming etcd 3.4 watch RequestProgress??? +func (w *WalManager) etcdPingerLoop(ctx context.Context) { + for { + if err := w.etcdPinger(ctx); err != nil { + w.log.Errorf("err: %+v", err) + } + + select { + case <-ctx.Done(): + return + default: + } + + time.Sleep(1 * time.Second) + } +} + +func (w *WalManager) etcdPinger(ctx context.Context) error { + if _, err := w.e.Put(ctx, etcdPingKey, []byte{}, nil); err != nil { + return err + } + return nil +} + +func (w *WalManager) InitEtcd(ctx context.Context) error { + writeWal := func(wal *WalFile) error { + w.log.Infof("wal seq: %s", wal.WalSequence) + walFile, err := w.lts.ReadObject(w.storageWalStatusFile(wal.WalSequence) + ".committed") + if err != nil { + return err + } + dec := json.NewDecoder(walFile) + var header *WalHeader + if err = dec.Decode(&header); err != nil && err != io.EOF { + walFile.Close() + return err + } + walFile.Close() + + walData := &WalData{ + WalSequence: wal.WalSequence, + WalDataFileID: header.WalDataFileID, + WalStatus: WalStatusCommitted, + ChangeGroups: header.ChangeGroups, + } + if wal.Checkpointed { + walData.WalStatus = WalStatusCheckpointed + } + walDataj, err := json.Marshal(walData) + if err != nil { + return err + } + + cmp := []etcdclientv3.Cmp{} + then := []etcdclientv3.Op{} + // only add if it doesn't exist + cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(etcdWalKey(wal.WalSequence)), "=", 0)) + then = append(then, etcdclientv3.OpPut(etcdWalKey(wal.WalSequence), string(walDataj))) + txn := w.e.Client().Txn(ctx).If(cmp...).Then(then...) + tresp, err := txn.Commit() + if err != nil { + return etcd.FromEtcdError(err) + } + if !tresp.Succeeded { + return errors.Errorf("failed to sync etcd: wal %q already written", wal.WalSequence) + } + return nil + } + + // Create changegroup min revision if it doesn't exists + cmp := []etcdclientv3.Cmp{} + then := []etcdclientv3.Op{} + + cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(etcdChangeGroupMinRevisionKey), "=", 0)) + then = append(then, etcdclientv3.OpPut(etcdChangeGroupMinRevisionKey, "")) + txn := w.e.Client().Txn(ctx).If(cmp...).Then(then...) + if _, err := txn.Commit(); err != nil { + return etcd.FromEtcdError(err) + } + + _, err := w.e.Get(ctx, etcdWalsDataKey) + if err != nil && err != etcd.ErrKeyNotFound { + return err + } + if err == nil { + return nil + } + + w.log.Infof("no data found in etcd, initializing") + + // walsdata not found in etcd + + // if there're some wals in the lts this means etcd has been reset. + // So take all the wals in committed or checkpointed state starting from the + // first not checkpointed wal and put them in etcd + lastCommittedStorageWalsRing := ring.New(100) + lastCommittedStorageWalElem := lastCommittedStorageWalsRing + lastCommittedStorageWalSequence := "" + wroteWals := 0 + for wal := range w.ListLtsWals("") { + w.log.Infof("wal: %s", wal) + if wal.Err != nil { + return wal.Err + } + + lastCommittedStorageWalElem.Value = wal + lastCommittedStorageWalElem = lastCommittedStorageWalElem.Next() + lastCommittedStorageWalSequence = wal.WalSequence + if wal.Checkpointed { + continue + } + + if err := writeWal(wal); err != nil { + return err + } + wroteWals++ + } + + // if no wal has been written (because all are checkpointed), write at least + // the ones in the ring + if wroteWals == 0 { + var err error + lastCommittedStorageWalsRing.Do(func(e interface{}) { + if e == nil { + return + } + wal := e.(*WalFile) + err = writeWal(wal) + if err != nil { + return + } + lastCommittedStorageWalSequence = wal.WalSequence + }) + if err != nil { + return err + } + } + + walsData := &WalsData{ + LastCommittedWalSequence: lastCommittedStorageWalSequence, + } + walsDataj, err := json.Marshal(walsData) + if err != nil { + return err + } + + // save walsdata and lastcommittedstoragewalseq only after writing all the + // wals in etcd + // in this way if something fails while adding wals to etcd it'll be retried + // since waldata doesn't exists + cmp = []etcdclientv3.Cmp{} + then = []etcdclientv3.Op{} + + cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(etcdWalsDataKey), "=", 0)) + then = append(then, etcdclientv3.OpPut(etcdWalsDataKey, string(walsDataj))) + then = append(then, etcdclientv3.OpPut(etcdLastCommittedStorageWalSeqKey, lastCommittedStorageWalSequence)) + txn = w.e.Client().Txn(ctx).If(cmp...).Then(then...) + tresp, err := txn.Commit() + if err != nil { + return etcd.FromEtcdError(err) + } + if !tresp.Succeeded { + return errors.Errorf("failed to sync etcd: waldata already written") + } + + return nil +} + +type AdditionalActionsFunc func(action *Action) ([]*Action, error) + +func NoOpAdditionalActionFunc(action *Action) ([]*Action, error) { + return []*Action{}, nil +} + +type WalManagerConfig struct { + BasePath string + E *etcd.Store + Lts *objectstorage.ObjStorage + AdditionalActionsFunc AdditionalActionsFunc + EtcdWalsKeepNum int +} + +type WalManager struct { + basePath string + log *zap.SugaredLogger + e *etcd.Store + lts *objectstorage.ObjStorage + changes *WalChanges + additionalActionsFunc AdditionalActionsFunc + etcdWalsKeepNum int +} + +func NewWalManager(ctx context.Context, logger *zap.Logger, conf *WalManagerConfig) (*WalManager, error) { + if conf.EtcdWalsKeepNum == 0 { + conf.EtcdWalsKeepNum = DefaultEtcdWalsKeepNum + } + if conf.EtcdWalsKeepNum < 1 { + return nil, errors.New("etcdWalsKeepNum must be greater than 0") + } + + additionalActionsFunc := conf.AdditionalActionsFunc + if additionalActionsFunc == nil { + additionalActionsFunc = NoOpAdditionalActionFunc + } + + w := &WalManager{ + basePath: conf.BasePath, + log: logger.Sugar(), + e: conf.E, + lts: conf.Lts, + additionalActionsFunc: additionalActionsFunc, + etcdWalsKeepNum: conf.EtcdWalsKeepNum, + } + + // add trailing slash the basepath + if w.basePath != "" && !strings.HasSuffix(w.basePath, "/") { + w.basePath = w.basePath + "/" + } + + return w, nil +} + +func (w *WalManager) Run(ctx context.Context) error { + w.changes = NewWalChanges() + + for { + err := w.InitEtcd(ctx) + if err == nil { + break + } + w.log.Errorf("failed to initialize etcd: %+v", err) + time.Sleep(1 * time.Second) + } + + go w.watcherLoop(ctx) + go w.syncLoop(ctx) + go w.checkpointLoop(ctx) + go w.walCleanerLoop(ctx) + go w.compactChangeGroupsLoop(ctx) + go w.etcdPingerLoop(ctx) + + select { + case <-ctx.Done(): + w.log.Infof("walmanager exiting") + return nil + } +} diff --git a/internal/wal/wal_test.go b/internal/wal/wal_test.go new file mode 100644 index 0000000..928b910 --- /dev/null +++ b/internal/wal/wal_test.go @@ -0,0 +1,275 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "testing" + "time" + + slog "github.com/sorintlab/agola/internal/log" + "github.com/sorintlab/agola/internal/objectstorage" + "github.com/sorintlab/agola/internal/testutil" + + "github.com/google/go-cmp/cmp" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +var level = zap.NewAtomicLevelAt(zapcore.InfoLevel) +var logger = slog.New(level) +var log = logger.Sugar() + +func setupEtcd(t *testing.T, dir string) *testutil.TestEmbeddedEtcd { + tetcd, err := testutil.NewTestEmbeddedEtcd(t, logger, dir) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := tetcd.Start(); err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := tetcd.WaitUp(30 * time.Second); err != nil { + t.Fatalf("error waiting on store up: %v", err) + } + return tetcd +} + +func shutdownEtcd(tetcd *testutil.TestEmbeddedEtcd) { + if tetcd.Etcd != nil { + tetcd.Kill() + } +} + +type noopCheckpointer struct { +} + +func (c *noopCheckpointer) Checkpoint(ctx context.Context, action *Action) error { + return nil +} + +func TestEtcdReset(t *testing.T) { + dir, err := ioutil.TempDir("", "agola") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer os.RemoveAll(dir) + + etcdDir, err := ioutil.TempDir(dir, "etcd") + tetcd := setupEtcd(t, etcdDir) + defer shutdownEtcd(tetcd) + + ctx, cancel := context.WithCancel(context.Background()) + + ltsDir, err := ioutil.TempDir(dir, "lts") + + lts, err := objectstorage.NewPosixStorage(ltsDir) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + walConfig := &WalManagerConfig{ + BasePath: "basepath", + E: tetcd.TestEtcd.Store, + Lts: objectstorage.NewObjStorage(lts, "/"), + EtcdWalsKeepNum: 10, + } + wal, err := NewWalManager(ctx, logger, walConfig) + go wal.Run(ctx) + time.Sleep(1 * time.Second) + + actions := []*Action{ + { + ActionType: ActionTypePut, + Data: []byte("{}"), + }, + } + + expectedObjects := []string{} + for i := 0; i < 20; i++ { + objectPath := fmt.Sprintf("object%02d", i) + expectedObjects = append(expectedObjects, objectPath) + actions[0].Path = objectPath + if _, err := wal.WriteWal(ctx, actions, nil); err != nil { + t.Fatalf("unexpected err: %v", err) + } + } + + // wait for wal to be committed storage + time.Sleep(5 * time.Second) + + // Reset etcd + shutdownEtcd(tetcd) + tetcd.WaitDown(10 * time.Second) + os.RemoveAll(etcdDir) + if err := tetcd.Start(); err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer shutdownEtcd(tetcd) + + cancel() + ctx = context.Background() + go wal.Run(ctx) + time.Sleep(5 * time.Second) + + curObjects := []string{} + doneCh := make(chan struct{}) + for object := range wal.List("", "", true, doneCh) { + t.Logf("path: %q", object.Path) + if object.Err != nil { + t.Fatalf("unexpected err: %v", object.Err) + } + curObjects = append(curObjects, object.Path) + } + close(doneCh) + t.Logf("curObjects: %s", curObjects) + + if diff := cmp.Diff(expectedObjects, curObjects); diff != "" { + t.Error(diff) + } +} + +func TestConcurrentUpdate(t *testing.T) { + dir, err := ioutil.TempDir("", "agola") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer os.RemoveAll(dir) + + etcdDir, err := ioutil.TempDir(dir, "etcd") + tetcd := setupEtcd(t, etcdDir) + defer shutdownEtcd(tetcd) + + ctx := context.Background() + + ltsDir, err := ioutil.TempDir(dir, "lts") + + lts, err := objectstorage.NewPosixStorage(ltsDir) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + walConfig := &WalManagerConfig{ + E: tetcd.TestEtcd.Store, + Lts: objectstorage.NewObjStorage(lts, "/"), + EtcdWalsKeepNum: 10, + } + wal, err := NewWalManager(ctx, logger, walConfig) + + actions := []*Action{ + { + ActionType: ActionTypePut, + Path: "/object01", + Data: []byte("{}"), + }, + } + + go wal.Run(ctx) + time.Sleep(1 * time.Second) + + cgNames := []string{"changegroup01", "changegroup02"} + cgt := wal.GetChangeGroupsUpdateToken(cgNames) + + // populate with a wal + cgt, err = wal.WriteWal(ctx, actions, cgt) + if err != nil { + t.Fatalf("err: %v", err) + } + + // this must work successfully + oldcgt := cgt + cgt, err = wal.WriteWal(ctx, actions, cgt) + if err != nil { + t.Fatalf("err: %v", err) + } + + // this must fail since we are using the old cgt + _, err = wal.WriteWal(ctx, actions, oldcgt) + if err != ErrConcurrency { + t.Fatalf("expected err: %v, got %v", ErrConcurrency, err) + } + + oldcgt = cgt + // this must work successfully + cgt, err = wal.WriteWal(ctx, actions, cgt) + if err != nil { + t.Fatalf("err: %v", err) + } + + // this must fail since we are using the old cgt + _, err = wal.WriteWal(ctx, actions, oldcgt) + if err != ErrConcurrency { + t.Fatalf("expected err: %v, got %v", ErrConcurrency, err) + } +} + +func TestWalCleaner(t *testing.T) { + dir, err := ioutil.TempDir("", "agola") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer os.RemoveAll(dir) + + etcdDir, err := ioutil.TempDir(dir, "etcd") + tetcd := setupEtcd(t, etcdDir) + defer shutdownEtcd(tetcd) + + ctx := context.Background() + + ltsDir, err := ioutil.TempDir(dir, "lts") + + lts, err := objectstorage.NewPosixStorage(ltsDir) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + walKeepNum := 10 + walConfig := &WalManagerConfig{ + E: tetcd.TestEtcd.Store, + Lts: objectstorage.NewObjStorage(lts, "/"), + EtcdWalsKeepNum: walKeepNum, + } + wal, err := NewWalManager(ctx, logger, walConfig) + + actions := []*Action{ + { + ActionType: ActionTypePut, + Path: "/object01", + Data: []byte("{}"), + }, + } + + go wal.Run(ctx) + time.Sleep(1 * time.Second) + + for i := 0; i < 20; i++ { + if _, err := wal.WriteWal(ctx, actions, nil); err != nil { + t.Fatalf("unexpected err: %v", err) + } + } + + // wait for walCleaner to complete + time.Sleep(5 * time.Second) + + walsCount := 0 + for range wal.ListEtcdWals(ctx, 0) { + walsCount++ + } + if walsCount != walKeepNum { + t.Fatalf("expected %d wals in etcd, got %d wals", walKeepNum, walsCount) + } +}