From c44a9b8956ad11279a8b733c7ab73edf9523cf31 Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Thu, 12 Oct 2023 15:49:29 +0800 Subject: [PATCH 1/7] issue 6663: changes for configurable data path concurrency Signed-off-by: Lyndon-Li --- go.mod | 4 +- go.sum | 18 +++++ pkg/cmd/cli/nodeagent/server.go | 79 ++++++++++++++++++- pkg/controller/data_download_controller.go | 4 +- .../data_download_controller_test.go | 5 +- pkg/controller/data_upload_controller.go | 6 +- pkg/controller/data_upload_controller_test.go | 4 +- .../pod_volume_backup_controller.go | 4 +- .../pod_volume_restore_controller.go | 4 +- pkg/nodeagent/node_agent.go | 43 +++++++++- 10 files changed, 154 insertions(+), 17 deletions(-) diff --git a/go.mod b/go.mod index 51cd71913..1d2a088e8 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,8 @@ module github.com/vmware-tanzu/velero -go 1.20 +go 1.21 + +toolchain go1.21.3 require ( cloud.google.com/go/storage v1.32.0 diff --git a/go.sum b/go.sum index bc4c0ed7c..b3d6162e3 100644 --- a/go.sum +++ b/go.sum @@ -59,7 +59,9 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.1/go.mod h1:uE9zaUfEQT/nbQ github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 h1:sXr+ck84g/ZlZUOZiNELInmMgOsuGwdjjVkEIde0OtY= github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0/go.mod h1:okt5dMMTOFjX/aovMlrjvvXoPMBVSPzk9185BT0+eZM= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/internal v1.1.2 h1:mLY+pNLjCUeKhgnAJWAKhEUQM+RJQo2H1fuGSw1Ky1E= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/internal v1.1.2/go.mod h1:FbdwsQ2EzwvXxOPcMFYO8ogEc9uMMIj3YkmCdXdAFmk= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.0.0 h1:ECsQtyERDVz3NP3kvDOTLvbQhqWp/x9EsGKtb4ogUr8= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.0.0/go.mod h1:s1tW/At+xHqjNFvWU4G0c0Qv33KOhvbGNj0RCTQDV8s= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.3.0 h1:LcJtQjCXJUm1s7JpUHZvu+bpgURhCatxVNbGADXniX0= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.3.0/go.mod h1:+OgGVo0Httq7N5oayfvaLQ/Jq+2gJdqfp++Hyyl7Tws= github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.1.0 h1:nVocQV40OQne5613EeLayJiRAJuKlBGy+m22qWG+WRg= @@ -114,6 +116,7 @@ github.com/AzureAD/microsoft-authentication-library-for-go v1.1.1/go.mod h1:wP83 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/GehirnInc/crypt v0.0.0-20230320061759-8cc1b52080c5 h1:IEjq88XO4PuBDcvmjQJcQGg+w+UaafSy8G5Kcb5tBhI= +github.com/GehirnInc/crypt v0.0.0-20230320061759-8cc1b52080c5/go.mod h1:exZ0C/1emQJAw5tHOaUDyY1ycttqBAPcxuzf7QbY6ec= github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ= github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= @@ -127,6 +130,7 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/alessio/shellescape v1.4.1 h1:V7yhSDDn8LP4lc4jS8pFkt0zCnzVJlG5JXy9BVKJUX0= +github.com/alessio/shellescape v1.4.1/go.mod h1:PZAiSCk0LJaZkiCSkPv8qIobYglO3FPpyFjDCtHLS30= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210826220005-b48c857c3a0e/go.mod h1:F7bn7fEU90QkQ3tnmaTx3LTKLEDqnwWODIYppRQ5hnY= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= @@ -141,6 +145,7 @@ github.com/aws/aws-sdk-go v1.44.256/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8 github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= +github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -196,6 +201,7 @@ github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7Do github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/danieljoos/wincred v1.2.0 h1:ozqKHaLK0W/ii4KVbbvluM91W2H3Sh0BncbUNPS7jLE= +github.com/danieljoos/wincred v1.2.0/go.mod h1:FzQLLMKBFdvu+osBrnFODiv32YGwCfx0SkRa/eYHgec= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -205,6 +211,7 @@ github.com/dimchansky/utfbom v1.1.0/go.mod h1:rO41eb7gLfo8SF1jd9F8HplJm1Fewwi4mQ github.com/dimchansky/utfbom v1.1.1 h1:vV6w1AhK4VMnhBno/TPVCoK9U/LP0PkLCS9tbxHdi/U= github.com/dimchansky/utfbom v1.1.1/go.mod h1:SxdoEBH5qIqFocHMyGOXVAybYJdr71b1Q/j0mACtrfE= github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI= +github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ= github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= @@ -242,6 +249,7 @@ github.com/flowstack/go-jsonschema v0.1.1/go.mod h1:yL7fNggx1o8rm9RlgXv7hTBWxdBM github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/frankban/quicktest v1.13.1 h1:xVm/f9seEhZFL9+n5kv5XLrGwy6elc4V9v/XFY2vmd8= +github.com/frankban/quicktest v1.13.1/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU= @@ -297,6 +305,7 @@ github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= +github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= @@ -377,6 +386,7 @@ github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXi github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/martian/v3 v3.3.2 h1:IqNFLAmvJOgVlpdEBiQbDc2EwKW77amAycfTuWKdfvw= +github.com/google/martian/v3 v3.3.2/go.mod h1:oBOf6HBosgwRXnUGWUB05QECsc6uvmMiJ3+6W4l/CUk= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20191218002539-d4f498aebedc/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= @@ -422,6 +432,7 @@ github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/hanwen/go-fuse/v2 v2.4.0 h1:12OhD7CkXXQdvxG2osIdBQLdXh+nmLXY9unkUIe/xaU= +github.com/hanwen/go-fuse/v2 v2.4.0/go.mod h1:xKwi1cF7nXAOBCXujD5ie0ZKsxc8GGSA1rlMJc+8IJs= github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q= github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8= github.com/hashicorp/cronexpr v1.1.2 h1:wG/ZYIKT+RT3QkOdgYc+xsKWVRgnxJ1OJtjjy84fJ9A= @@ -501,12 +512,14 @@ github.com/klauspost/reedsolomon v1.11.8/go.mod h1:4bXRN+cVzMdml6ti7qLouuYi32KHJ github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kopia/htmluibuild v0.0.1-0.20230917154246-98806054261e h1:XogFUFI4mcT5qyywKiGY5WqLi7l4b/eMi7BlEzgLTd0= +github.com/kopia/htmluibuild v0.0.1-0.20230917154246-98806054261e/go.mod h1:cSImbrlwvv2phvj5RfScL2v08ghX6xli0PcK6f+t8S0= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -601,6 +614,7 @@ github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9k github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= github.com/onsi/ginkgo/v2 v2.1.6 h1:Fx2POJZfKRQcM1pH49qSZiYeu319wji004qX+GDovrU= +github.com/onsi/ginkgo/v2 v2.1.6/go.mod h1:MEH45j8TBi6u9BMogfbp0stKC5cdGjumZj5Y7AG4VIk= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= @@ -664,6 +678,7 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= @@ -726,6 +741,7 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/tg123/go-htpasswd v1.2.1 h1:i4wfsX1KvvkyoMiHZzjS0VzbAPWfxzI8INcZAKtutoU= +github.com/tg123/go-htpasswd v1.2.1/go.mod h1:erHp1B86KXdwQf1X5ZrLb7erXZnWueEQezb2dql4q58= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= @@ -749,6 +765,7 @@ github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1 github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/zalando/go-keyring v0.2.3 h1:v9CUu9phlABObO4LPWycf+zwMG7nlbb3t/B5wa97yms= +github.com/zalando/go-keyring v0.2.3/go.mod h1:HL4k+OXQfJUWaMnqyuSOc0drfGPX2b51Du6K+MRgZMk= github.com/zeebo/assert v1.1.0 h1:hU1L1vLTHsnO8x8c9KAR5GmM5QscxHg5RNU5z5qbUWY= github.com/zeebo/assert v1.1.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/blake3 v0.2.3 h1:TFoLXsjeXqRNFxSbk35Dk4YtszE/MQQGK10BH4ptoTg= @@ -805,6 +822,7 @@ go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= +go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 10df06f89..b715e0660 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -19,6 +19,7 @@ package nodeagent import ( "context" "fmt" + "math" "net/http" "os" "strings" @@ -32,6 +33,7 @@ import ( storagev1api "k8s.io/api/storage/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" @@ -53,7 +55,9 @@ import ( "github.com/vmware-tanzu/velero/pkg/cmd" "github.com/vmware-tanzu/velero/pkg/cmd/util/signals" "github.com/vmware-tanzu/velero/pkg/controller" + "github.com/vmware-tanzu/velero/pkg/datapath" "github.com/vmware-tanzu/velero/pkg/metrics" + "github.com/vmware-tanzu/velero/pkg/nodeagent" "github.com/vmware-tanzu/velero/pkg/repository" "github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/logging" @@ -73,12 +77,14 @@ const ( defaultResourceTimeout = 10 * time.Minute defaultDataMoverPrepareTimeout = 30 * time.Minute + defaultDataPathConcurrentNum = 1 ) type nodeAgentServerConfig struct { metricsAddress string resourceTimeout time.Duration dataMoverPrepareTimeout time.Duration + dataPathConcurrentNum int } func NewServerCommand(f client.Factory) *cobra.Command { @@ -88,6 +94,7 @@ func NewServerCommand(f client.Factory) *cobra.Command { metricsAddress: defaultMetricsAddress, resourceTimeout: defaultResourceTimeout, dataMoverPrepareTimeout: defaultDataMoverPrepareTimeout, + dataPathConcurrentNum: defaultDataPathConcurrentNum, } command := &cobra.Command{ @@ -114,6 +121,7 @@ func NewServerCommand(f client.Factory) *cobra.Command { command.Flags().Var(formatFlag, "log-format", fmt.Sprintf("The format for log output. Valid values are %s.", strings.Join(formatFlag.AllowedValues(), ", "))) command.Flags().DurationVar(&config.resourceTimeout, "resource-timeout", config.resourceTimeout, "How long to wait for resource processes which are not covered by other specific timeout parameters. Default is 10 minutes.") command.Flags().DurationVar(&config.dataMoverPrepareTimeout, "data-mover-prepare-timeout", config.dataMoverPrepareTimeout, "How long to wait for preparing a DataUpload/DataDownload. Default is 30 minutes.") + command.Flags().IntVar(&config.dataPathConcurrentNum, "data-path-concurrent-num", config.dataPathConcurrentNum, "The concurrent number of data path in the current node. Default is 1.") return command } @@ -131,6 +139,7 @@ type nodeAgentServer struct { config nodeAgentServerConfig kubeClient kubernetes.Interface csiSnapshotClient *snapshotv1client.Clientset + dataPathMgr *datapath.Manager } func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, config nodeAgentServerConfig) (*nodeAgentServer, error) { @@ -217,6 +226,16 @@ func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, confi if err != nil { return nil, err } + + dataPathConcurrentNum := config.dataPathConcurrentNum + if dataPathConcurrentNum <= 0 { + dataPathConcurrentNum = defaultDataPathConcurrentNum + s.logger.Warnf("Data path concurrency number %v is invalid, use the default value %v", config.dataPathConcurrentNum, defaultDataPathConcurrentNum) + } + + dataPathConcurrentNum = s.getDataPathConcurrentNum(dataPathConcurrentNum, s.logger) + s.dataPathMgr = datapath.NewManager(dataPathConcurrentNum) + return s, nil } @@ -261,24 +280,24 @@ func (s *nodeAgentServer) run() { credentialGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore} repoEnsurer := repository.NewEnsurer(s.mgr.GetClient(), s.logger, s.config.resourceTimeout) - pvbReconciler := controller.NewPodVolumeBackupReconciler(s.mgr.GetClient(), repoEnsurer, + pvbReconciler := controller.NewPodVolumeBackupReconciler(s.mgr.GetClient(), s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.mgr.GetScheme(), s.metrics, s.logger) if err := pvbReconciler.SetupWithManager(s.mgr); err != nil { s.logger.Fatal(err, "unable to create controller", "controller", controller.PodVolumeBackup) } - if err = controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), repoEnsurer, credentialGetter, s.logger).SetupWithManager(s.mgr); err != nil { + if err = controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), s.dataPathMgr, repoEnsurer, credentialGetter, s.logger).SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller") } - dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) + dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) s.markDataUploadsCancel(dataUploadReconciler) if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the data upload controller") } - dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) + dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) s.markDataDownloadsCancel(dataDownloadReconciler) if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the data download controller") @@ -476,3 +495,55 @@ func (s *nodeAgentServer) markInProgressPVRsFailed(client ctrlclient.Client) { s.logger.WithField("podvolumerestore", pvr.GetName()).Warn(pvr.Status.Message) } } + +func (s *nodeAgentServer) getDataPathConcurrentNum(globalNum int, logger logrus.FieldLogger) int { + configs, err := nodeagent.GetConfigs(s.ctx, s.namespace, s.kubeClient.CoreV1()) + if err != nil { + logger.WithError(err).Warn("Failed to get node agent configs") + } + + if configs == nil || configs.DataPathConcurrency == nil { + logger.Infof("Node agent configs are not found, use the global number %v", globalNum) + return globalNum + } + + curNode, err := s.kubeClient.CoreV1().Nodes().Get(s.ctx, s.nodeName, metav1.GetOptions{}) + if err != nil { + logger.WithError(err).Warnf("Failed to get node info for %s, use the global number %v", s.nodeName, globalNum) + return globalNum + } + + selectors := map[labels.Selector]int{} + for rule, number := range configs.DataPathConcurrency.ConfigRules { + selector, err := labels.Parse(rule) + if err != nil { + logger.WithError(err).Warnf("Failed to parse rule with label selector %s, skip it", rule) + continue + } + + if number <= 0 { + logger.Warnf("Rule with label selector %s is with an invalid number %v, skip it", rule, number) + continue + } + + selectors[selector] = number + } + + concurrentNum := math.MaxInt32 + for selector, number := range selectors { + if selector.Matches(labels.Set(curNode.GetLabels())) { + if concurrentNum > number { + concurrentNum = number + } + } + } + + if concurrentNum == math.MaxInt32 { + logger.Infof("Per node number for node %s is not found, use the global number %v", s.nodeName, globalNum) + concurrentNum = globalNum + } else { + logger.Infof("Use the per node number %v over global number %v for node %s", concurrentNum, globalNum, s.nodeName) + } + + return concurrentNum +} diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index cc60cacef..d67ab590c 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -69,7 +69,7 @@ type DataDownloadReconciler struct { metrics *metrics.ServerMetrics } -func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Interface, +func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, repoEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler { return &DataDownloadReconciler{ client: client, @@ -81,7 +81,7 @@ func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Inter nodeName: nodeName, repositoryEnsurer: repoEnsurer, restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger), - dataPathMgr: datapath.NewManager(1), + dataPathMgr: dataPathMgr, preparingTimeout: preparingTimeout, metrics: metrics, } diff --git a/pkg/controller/data_download_controller_test.go b/pkg/controller/data_download_controller_test.go index 46162f0cb..de9fa7516 100644 --- a/pkg/controller/data_download_controller_test.go +++ b/pkg/controller/data_download_controller_test.go @@ -142,7 +142,10 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ... if err != nil { return nil, err } - return NewDataDownloadReconciler(fakeClient, fakeKubeClient, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil + + dataPathMgr := datapath.NewManager(1) + + return NewDataDownloadReconciler(fakeClient, fakeKubeClient, dataPathMgr, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil } func TestDataDownloadReconcile(t *testing.T) { diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index 8bc650f5f..6c4ce1847 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -79,8 +79,8 @@ type DataUploadReconciler struct { metrics *metrics.ServerMetrics } -func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interface, - csiSnapshotClient snapshotter.SnapshotV1Interface, repoEnsurer *repository.Ensurer, clock clocks.WithTickerAndDelayedExecution, +func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interface, csiSnapshotClient snapshotter.SnapshotV1Interface, + dataPathMgr *datapath.Manager, repoEnsurer *repository.Ensurer, clock clocks.WithTickerAndDelayedExecution, cred *credentials.CredentialGetter, nodeName string, fs filesystem.Interface, preparingTimeout time.Duration, log logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataUploadReconciler { return &DataUploadReconciler{ client: client, @@ -93,7 +93,7 @@ func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interfa logger: log, repoEnsurer: repoEnsurer, snapshotExposerList: map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(kubeClient, csiSnapshotClient, log)}, - dataPathMgr: datapath.NewManager(1), + dataPathMgr: dataPathMgr, preparingTimeout: preparingTimeout, metrics: metrics, } diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index 34bc4a6aa..b61cd07b3 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -162,6 +162,8 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci Spec: appsv1.DaemonSetSpec{}, } + dataPathMgr := datapath.NewManager(1) + now, err := time.Parse(time.RFC1123, time.RFC1123) if err != nil { return nil, err @@ -217,7 +219,7 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci if err != nil { return nil, err } - return NewDataUploadReconciler(fakeClient, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), nil, + return NewDataUploadReconciler(fakeClient, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), dataPathMgr, nil, testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", fakeFS, time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil } diff --git a/pkg/controller/pod_volume_backup_controller.go b/pkg/controller/pod_volume_backup_controller.go index f074e0cc2..6a66f98e1 100644 --- a/pkg/controller/pod_volume_backup_controller.go +++ b/pkg/controller/pod_volume_backup_controller.go @@ -47,7 +47,7 @@ import ( const pVBRRequestor string = "pod-volume-backup-restore" // NewPodVolumeBackupReconciler creates the PodVolumeBackupReconciler instance -func NewPodVolumeBackupReconciler(client client.Client, ensurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, +func NewPodVolumeBackupReconciler(client client.Client, dataPathMgr *datapath.Manager, ensurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, scheme *runtime.Scheme, metrics *metrics.ServerMetrics, logger logrus.FieldLogger) *PodVolumeBackupReconciler { return &PodVolumeBackupReconciler{ Client: client, @@ -59,7 +59,7 @@ func NewPodVolumeBackupReconciler(client client.Client, ensurer *repository.Ensu clock: &clocks.RealClock{}, scheme: scheme, metrics: metrics, - dataPathMgr: datapath.NewManager(1), + dataPathMgr: dataPathMgr, } } diff --git a/pkg/controller/pod_volume_restore_controller.go b/pkg/controller/pod_volume_restore_controller.go index d48925710..34d6c6551 100644 --- a/pkg/controller/pod_volume_restore_controller.go +++ b/pkg/controller/pod_volume_restore_controller.go @@ -50,7 +50,7 @@ import ( "github.com/vmware-tanzu/velero/pkg/util/filesystem" ) -func NewPodVolumeRestoreReconciler(client client.Client, ensurer *repository.Ensurer, +func NewPodVolumeRestoreReconciler(client client.Client, dataPathMgr *datapath.Manager, ensurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, logger logrus.FieldLogger) *PodVolumeRestoreReconciler { return &PodVolumeRestoreReconciler{ Client: client, @@ -59,7 +59,7 @@ func NewPodVolumeRestoreReconciler(client client.Client, ensurer *repository.Ens credentialGetter: credentialGetter, fileSystem: filesystem.NewFileSystem(), clock: &clocks.RealClock{}, - dataPathMgr: datapath.NewManager(1), + dataPathMgr: dataPathMgr, } } diff --git a/pkg/nodeagent/node_agent.go b/pkg/nodeagent/node_agent.go index 83e76d2a4..04e9babca 100644 --- a/pkg/nodeagent/node_agent.go +++ b/pkg/nodeagent/node_agent.go @@ -18,6 +18,7 @@ package nodeagent import ( "context" + "encoding/json" "fmt" "github.com/pkg/errors" @@ -33,13 +34,25 @@ import ( const ( // daemonSet is the name of the Velero node agent daemonset. - daemonSet = "node-agent" + daemonSet = "node-agent" + configName = "node-agent-configs" + dataPathConConfigName = "data-path-concurrency" ) var ( ErrDaemonSetNotFound = errors.New("daemonset not found") ) +type DataPathConcurrency struct { + // ConfigRules specifies the concurrency number to nodes matched by rules + ConfigRules map[string]int `json:"configRules"` +} + +type Configs struct { + // DataPathConcurrency is the config for data path concurrency per node. + DataPathConcurrency *DataPathConcurrency +} + // IsRunning checks if the node agent daemonset is running properly. If not, return the error found func IsRunning(ctx context.Context, kubeClient kubernetes.Interface, namespace string) error { if _, err := kubeClient.AppsV1().DaemonSets(namespace).Get(ctx, daemonSet, metav1.GetOptions{}); apierrors.IsNotFound(err) { @@ -83,3 +96,31 @@ func GetPodSpec(ctx context.Context, kubeClient kubernetes.Interface, namespace return &ds.Spec.Template.Spec, nil } + +func GetConfigs(ctx context.Context, namespace string, cmClient corev1client.ConfigMapsGetter) (*Configs, error) { + cm, err := cmClient.ConfigMaps(namespace).Get(ctx, configName, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, nil + } else { + return nil, errors.Wrapf(err, "error to get node agent configs %s", configName) + } + } + + if cm.Data == nil { + return nil, errors.Errorf("data is not available in config map %s", configName) + } + + jsonBytes, exist := cm.Data[dataPathConConfigName] + if !exist { + return nil, nil + } + + concurrencyConfigs := DataPathConcurrency{} + err = json.Unmarshal([]byte(jsonBytes), &concurrencyConfigs) + if err != nil { + return nil, errors.Wrapf(err, "error to unmarshall data path concurrency configs from %s", configName) + } + + return &Configs{DataPathConcurrency: &concurrencyConfigs}, nil +} From 04a9851ee975f6074abedb9890750b7cf724f499 Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Mon, 30 Oct 2023 11:41:42 +0800 Subject: [PATCH 2/7] configurable data path concurrency: all in cm Signed-off-by: Lyndon-Li --- changelogs/unreleased/7059-Lyndon-Li | 1 + pkg/cmd/cli/nodeagent/server.go | 59 +++++++++++++--------------- pkg/nodeagent/node_agent.go | 23 ++++++++--- 3 files changed, 45 insertions(+), 38 deletions(-) create mode 100644 changelogs/unreleased/7059-Lyndon-Li diff --git a/changelogs/unreleased/7059-Lyndon-Li b/changelogs/unreleased/7059-Lyndon-Li new file mode 100644 index 000000000..77b3a1765 --- /dev/null +++ b/changelogs/unreleased/7059-Lyndon-Li @@ -0,0 +1 @@ +Add the implementation for design #6950, configurable data path concurrency \ No newline at end of file diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index b02d1ec42..1e320a4a3 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -84,7 +84,6 @@ type nodeAgentServerConfig struct { metricsAddress string resourceTimeout time.Duration dataMoverPrepareTimeout time.Duration - dataPathConcurrentNum int } func NewServerCommand(f client.Factory) *cobra.Command { @@ -94,7 +93,6 @@ func NewServerCommand(f client.Factory) *cobra.Command { metricsAddress: defaultMetricsAddress, resourceTimeout: defaultResourceTimeout, dataMoverPrepareTimeout: defaultDataMoverPrepareTimeout, - dataPathConcurrentNum: defaultDataPathConcurrentNum, } command := &cobra.Command{ @@ -122,7 +120,6 @@ func NewServerCommand(f client.Factory) *cobra.Command { command.Flags().DurationVar(&config.resourceTimeout, "resource-timeout", config.resourceTimeout, "How long to wait for resource processes which are not covered by other specific timeout parameters. Default is 10 minutes.") command.Flags().DurationVar(&config.dataMoverPrepareTimeout, "data-mover-prepare-timeout", config.dataMoverPrepareTimeout, "How long to wait for preparing a DataUpload/DataDownload. Default is 30 minutes.") command.Flags().StringVar(&config.metricsAddress, "metrics-address", config.metricsAddress, "The address to expose prometheus metrics") - command.Flags().IntVar(&config.dataPathConcurrentNum, "data-path-concurrent-num", config.dataPathConcurrentNum, "The concurrent number of data path in the current node. Default is 1.") return command } @@ -229,13 +226,7 @@ func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, confi return nil, err } - dataPathConcurrentNum := config.dataPathConcurrentNum - if dataPathConcurrentNum <= 0 { - dataPathConcurrentNum = defaultDataPathConcurrentNum - s.logger.Warnf("Data path concurrency number %v is invalid, use the default value %v", config.dataPathConcurrentNum, defaultDataPathConcurrentNum) - } - - dataPathConcurrentNum = s.getDataPathConcurrentNum(dataPathConcurrentNum, s.logger) + dataPathConcurrentNum := s.getDataPathConcurrentNum(defaultDataPathConcurrentNum, s.logger) s.dataPathMgr = datapath.NewManager(dataPathConcurrentNum) return s, nil @@ -498,15 +489,23 @@ func (s *nodeAgentServer) markInProgressPVRsFailed(client ctrlclient.Client) { } } -func (s *nodeAgentServer) getDataPathConcurrentNum(globalNum int, logger logrus.FieldLogger) int { +func (s *nodeAgentServer) getDataPathConcurrentNum(defaultNum int, logger logrus.FieldLogger) int { configs, err := nodeagent.GetConfigs(s.ctx, s.namespace, s.kubeClient.CoreV1()) if err != nil { logger.WithError(err).Warn("Failed to get node agent configs") + return defaultNum } if configs == nil || configs.DataPathConcurrency == nil { - logger.Infof("Node agent configs are not found, use the global number %v", globalNum) - return globalNum + logger.Infof("Node agent configs are not found, use the default number %v", defaultNum) + return defaultNum + } + + globalNum := configs.DataPathConcurrency.GlobalConfig + + if globalNum <= 0 { + logger.Warnf("Global number %v is invalid, use the default value %v", globalNum, defaultNum) + globalNum = defaultNum } curNode, err := s.kubeClient.CoreV1().Nodes().Get(s.ctx, s.nodeName, metav1.GetOptions{}) @@ -515,27 +514,23 @@ func (s *nodeAgentServer) getDataPathConcurrentNum(globalNum int, logger logrus. return globalNum } - selectors := map[labels.Selector]int{} - for rule, number := range configs.DataPathConcurrency.ConfigRules { - selector, err := labels.Parse(rule) - if err != nil { - logger.WithError(err).Warnf("Failed to parse rule with label selector %s, skip it", rule) - continue - } - - if number <= 0 { - logger.Warnf("Rule with label selector %s is with an invalid number %v, skip it", rule, number) - continue - } - - selectors[selector] = number - } - concurrentNum := math.MaxInt32 - for selector, number := range selectors { + + for _, rule := range configs.DataPathConcurrency.PerNodeConfig { + selector, err := metav1.LabelSelectorAsSelector(&rule.NodeSelector) + if err != nil { + logger.WithError(err).Warnf("Failed to parse rule with label selector %s, skip it", rule.NodeSelector.String()) + continue + } + + if rule.Number <= 0 { + logger.Warnf("Rule with label selector %s is with an invalid number %v, skip it", rule.NodeSelector.String(), rule.Number) + continue + } + if selector.Matches(labels.Set(curNode.GetLabels())) { - if concurrentNum > number { - concurrentNum = number + if concurrentNum > rule.Number { + concurrentNum = rule.Number } } } diff --git a/pkg/nodeagent/node_agent.go b/pkg/nodeagent/node_agent.go index 04e9babca..f8978500a 100644 --- a/pkg/nodeagent/node_agent.go +++ b/pkg/nodeagent/node_agent.go @@ -44,8 +44,19 @@ var ( ) type DataPathConcurrency struct { - // ConfigRules specifies the concurrency number to nodes matched by rules - ConfigRules map[string]int `json:"configRules"` + // GlobalConfig specifies the concurrency number to all nodes for which per-node config is not specified + GlobalConfig int `json:"globalConfig,omitempty"` + + // PerNodeConfig specifies the concurrency number to nodes matched by rules + PerNodeConfig []RuledConfigs `json:"perNodeConfig,omitempty"` +} + +type RuledConfigs struct { + // NodeSelector specifies the label selector to match nodes + NodeSelector metav1.LabelSelector `json:"nodeSelector"` + + // Number specifies the number value associated to the matched nodes + Number int `json:"number"` } type Configs struct { @@ -111,16 +122,16 @@ func GetConfigs(ctx context.Context, namespace string, cmClient corev1client.Con return nil, errors.Errorf("data is not available in config map %s", configName) } - jsonBytes, exist := cm.Data[dataPathConConfigName] + jsonString, exist := cm.Data[dataPathConConfigName] if !exist { return nil, nil } - concurrencyConfigs := DataPathConcurrency{} - err = json.Unmarshal([]byte(jsonBytes), &concurrencyConfigs) + concurrencyConfigs := &DataPathConcurrency{} + err = json.Unmarshal([]byte(jsonString), concurrencyConfigs) if err != nil { return nil, errors.Wrapf(err, "error to unmarshall data path concurrency configs from %s", configName) } - return &Configs{DataPathConcurrency: &concurrencyConfigs}, nil + return &Configs{DataPathConcurrency: concurrencyConfigs}, nil } From 68579448d64cb564ecfecf95d974a9f1ac331c99 Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Mon, 6 Nov 2023 18:36:57 +0800 Subject: [PATCH 3/7] configurable data path concurrency: UT Signed-off-by: Lyndon-Li --- pkg/builder/node_builder.go | 5 + pkg/builder/pod_builder.go | 5 + pkg/cmd/cli/nodeagent/server.go | 28 ++- pkg/cmd/cli/nodeagent/server_test.go | 262 +++++++++++++++++++++ pkg/nodeagent/node_agent_test.go | 332 +++++++++++++++++++++++++++ pkg/test/test_logger.go | 2 +- 6 files changed, 622 insertions(+), 12 deletions(-) create mode 100644 pkg/nodeagent/node_agent_test.go diff --git a/pkg/builder/node_builder.go b/pkg/builder/node_builder.go index 52e2c1e2e..d3b6f51ec 100644 --- a/pkg/builder/node_builder.go +++ b/pkg/builder/node_builder.go @@ -41,6 +41,11 @@ func ForNode(name string) *NodeBuilder { } } +func (b *NodeBuilder) Labels(labels map[string]string) *NodeBuilder { + b.object.Labels = labels + return b +} + // Result returns the built Node. func (b *NodeBuilder) Result() *corev1api.Node { return b.object diff --git a/pkg/builder/pod_builder.go b/pkg/builder/pod_builder.go index 8931c14b9..886d7a411 100644 --- a/pkg/builder/pod_builder.go +++ b/pkg/builder/pod_builder.go @@ -101,3 +101,8 @@ func (b *PodBuilder) ContainerStatuses(containerStatuses ...*corev1api.Container } return b } + +func (b *PodBuilder) Phase(phase corev1api.PodPhase) *PodBuilder { + b.object.Status.Phase = phase + return b +} diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 2a1e0470e..133992006 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -226,7 +226,7 @@ func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, confi return nil, err } - dataPathConcurrentNum := s.getDataPathConcurrentNum(defaultDataPathConcurrentNum, s.logger) + dataPathConcurrentNum := s.getDataPathConcurrentNum(defaultDataPathConcurrentNum) s.dataPathMgr = datapath.NewManager(dataPathConcurrentNum) return s, nil @@ -489,28 +489,34 @@ func (s *nodeAgentServer) markInProgressPVRsFailed(client ctrlclient.Client) { } } -func (s *nodeAgentServer) getDataPathConcurrentNum(defaultNum int, logger logrus.FieldLogger) int { - configs, err := nodeagent.GetConfigs(s.ctx, s.namespace, s.kubeClient) +var getConfigsFunc = nodeagent.GetConfigs + +func (s *nodeAgentServer) getDataPathConcurrentNum(defaultNum int) int { + configs, err := getConfigsFunc(s.ctx, s.namespace, s.kubeClient) if err != nil { - logger.WithError(err).Warn("Failed to get node agent configs") + s.logger.WithError(err).Warn("Failed to get node agent configs") return defaultNum } if configs == nil || configs.DataPathConcurrency == nil { - logger.Infof("Node agent configs are not found, use the default number %v", defaultNum) + s.logger.Infof("Node agent configs are not found, use the default number %v", defaultNum) return defaultNum } globalNum := configs.DataPathConcurrency.GlobalConfig if globalNum <= 0 { - logger.Warnf("Global number %v is invalid, use the default value %v", globalNum, defaultNum) + s.logger.Warnf("Global number %v is invalid, use the default value %v", globalNum, defaultNum) globalNum = defaultNum } + if len(configs.DataPathConcurrency.PerNodeConfig) == 0 { + return globalNum + } + curNode, err := s.kubeClient.CoreV1().Nodes().Get(s.ctx, s.nodeName, metav1.GetOptions{}) if err != nil { - logger.WithError(err).Warnf("Failed to get node info for %s, use the global number %v", s.nodeName, globalNum) + s.logger.WithError(err).Warnf("Failed to get node info for %s, use the global number %v", s.nodeName, globalNum) return globalNum } @@ -519,12 +525,12 @@ func (s *nodeAgentServer) getDataPathConcurrentNum(defaultNum int, logger logrus for _, rule := range configs.DataPathConcurrency.PerNodeConfig { selector, err := metav1.LabelSelectorAsSelector(&rule.NodeSelector) if err != nil { - logger.WithError(err).Warnf("Failed to parse rule with label selector %s, skip it", rule.NodeSelector.String()) + s.logger.WithError(err).Warnf("Failed to parse rule with label selector %s, skip it", rule.NodeSelector.String()) continue } if rule.Number <= 0 { - logger.Warnf("Rule with label selector %s is with an invalid number %v, skip it", rule.NodeSelector.String(), rule.Number) + s.logger.Warnf("Rule with label selector %s is with an invalid number %v, skip it", rule.NodeSelector.String(), rule.Number) continue } @@ -536,10 +542,10 @@ func (s *nodeAgentServer) getDataPathConcurrentNum(defaultNum int, logger logrus } if concurrentNum == math.MaxInt32 { - logger.Infof("Per node number for node %s is not found, use the global number %v", s.nodeName, globalNum) + s.logger.Infof("Per node number for node %s is not found, use the global number %v", s.nodeName, globalNum) concurrentNum = globalNum } else { - logger.Infof("Use the per node number %v over global number %v for node %s", concurrentNum, globalNum, s.nodeName) + s.logger.Infof("Use the per node number %v over global number %v for node %s", concurrentNum, globalNum, s.nodeName) } return concurrentNum diff --git a/pkg/cmd/cli/nodeagent/server_test.go b/pkg/cmd/cli/nodeagent/server_test.go index d66fc08eb..f3115923b 100644 --- a/pkg/cmd/cli/nodeagent/server_test.go +++ b/pkg/cmd/cli/nodeagent/server_test.go @@ -17,16 +17,22 @@ package nodeagent import ( "context" + "fmt" "os" "path/filepath" + "strings" "testing" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" "github.com/vmware-tanzu/velero/pkg/builder" + "github.com/vmware-tanzu/velero/pkg/nodeagent" testutil "github.com/vmware-tanzu/velero/pkg/test" ) @@ -107,3 +113,259 @@ func Test_validatePodVolumesHostPath(t *testing.T) { }) } } + +func Test_getDataPathConcurrentNum(t *testing.T) { + defaultNum := 100001 + globalNum := 6 + nodeName := "node-agent-node" + node1 := builder.ForNode("node-agent-node").Result() + node2 := builder.ForNode("node-agent-node").Labels(map[string]string{ + "host-name": "node-1", + "xxxx": "yyyyy", + }).Result() + + invalidLabelSelector := metav1.LabelSelector{ + MatchLabels: map[string]string{ + "inva/lid": "inva/lid", + }, + } + validLabelSelector1 := metav1.LabelSelector{ + MatchLabels: map[string]string{ + "host-name": "node-1", + }, + } + validLabelSelector2 := metav1.LabelSelector{ + MatchLabels: map[string]string{ + "xxxx": "yyyyy", + }, + } + + tests := []struct { + name string + getFunc func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) + setKubeClient bool + kubeClientObj []runtime.Object + expectNum int + expectLog string + }{ + { + name: "failed to get configs", + getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { + return nil, errors.New("fake-get-error") + }, + expectLog: "Failed to get node agent configs", + expectNum: defaultNum, + }, + { + name: "configs cm not found", + getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { + return nil, nil + }, + expectLog: fmt.Sprintf("Node agent configs are not found, use the default number %v", defaultNum), + expectNum: defaultNum, + }, + { + name: "configs cm's data path concurrency is nil", + getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { + return &nodeagent.Configs{}, nil + }, + expectLog: fmt.Sprintf("Node agent configs are not found, use the default number %v", defaultNum), + expectNum: defaultNum, + }, + { + name: "global number is invalid", + getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { + return &nodeagent.Configs{ + DataPathConcurrency: &nodeagent.DataPathConcurrency{ + GlobalConfig: -1, + }, + }, nil + }, + expectLog: fmt.Sprintf("Global number %v is invalid, use the default value %v", -1, defaultNum), + expectNum: defaultNum, + }, + { + name: "global number is valid", + getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { + return &nodeagent.Configs{ + DataPathConcurrency: &nodeagent.DataPathConcurrency{ + GlobalConfig: globalNum, + }, + }, nil + }, + expectNum: globalNum, + }, + { + name: "node is not found", + getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { + return &nodeagent.Configs{ + DataPathConcurrency: &nodeagent.DataPathConcurrency{ + GlobalConfig: globalNum, + PerNodeConfig: []nodeagent.RuledConfigs{ + { + Number: 100, + }, + }, + }, + }, nil + }, + setKubeClient: true, + expectLog: fmt.Sprintf("Failed to get node info for %s, use the global number %v", nodeName, globalNum), + expectNum: globalNum, + }, + { + name: "failed to get selector", + getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { + return &nodeagent.Configs{ + DataPathConcurrency: &nodeagent.DataPathConcurrency{ + GlobalConfig: globalNum, + PerNodeConfig: []nodeagent.RuledConfigs{ + { + NodeSelector: invalidLabelSelector, + Number: 100, + }, + }, + }, + }, nil + }, + setKubeClient: true, + kubeClientObj: []runtime.Object{node1}, + expectLog: fmt.Sprintf("Failed to parse rule with label selector %s, skip it", invalidLabelSelector.String()), + expectNum: globalNum, + }, + { + name: "rule number is invalid", + getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { + return &nodeagent.Configs{ + DataPathConcurrency: &nodeagent.DataPathConcurrency{ + GlobalConfig: globalNum, + PerNodeConfig: []nodeagent.RuledConfigs{ + { + NodeSelector: validLabelSelector1, + Number: -1, + }, + }, + }, + }, nil + }, + setKubeClient: true, + kubeClientObj: []runtime.Object{node1}, + expectLog: fmt.Sprintf("Rule with label selector %s is with an invalid number %v, skip it", validLabelSelector1.String(), -1), + expectNum: globalNum, + }, + { + name: "label doesn't match", + getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { + return &nodeagent.Configs{ + DataPathConcurrency: &nodeagent.DataPathConcurrency{ + GlobalConfig: globalNum, + PerNodeConfig: []nodeagent.RuledConfigs{ + { + NodeSelector: validLabelSelector1, + Number: -1, + }, + }, + }, + }, nil + }, + setKubeClient: true, + kubeClientObj: []runtime.Object{node1}, + expectLog: fmt.Sprintf("Per node number for node %s is not found, use the global number %v", nodeName, globalNum), + expectNum: globalNum, + }, + { + name: "match one rule", + getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { + return &nodeagent.Configs{ + DataPathConcurrency: &nodeagent.DataPathConcurrency{ + GlobalConfig: globalNum, + PerNodeConfig: []nodeagent.RuledConfigs{ + { + NodeSelector: validLabelSelector1, + Number: 66, + }, + }, + }, + }, nil + }, + setKubeClient: true, + kubeClientObj: []runtime.Object{node2}, + expectLog: fmt.Sprintf("Use the per node number %v over global number %v for node %s", 66, globalNum, nodeName), + expectNum: 66, + }, + { + name: "match multiple rules", + getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { + return &nodeagent.Configs{ + DataPathConcurrency: &nodeagent.DataPathConcurrency{ + GlobalConfig: globalNum, + PerNodeConfig: []nodeagent.RuledConfigs{ + { + NodeSelector: validLabelSelector1, + Number: 66, + }, + { + NodeSelector: validLabelSelector2, + Number: 36, + }, + }, + }, + }, nil + }, + setKubeClient: true, + kubeClientObj: []runtime.Object{node2}, + expectLog: fmt.Sprintf("Use the per node number %v over global number %v for node %s", 36, globalNum, nodeName), + expectNum: 36, + }, + { + name: "match multiple rules 2", + getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { + return &nodeagent.Configs{ + DataPathConcurrency: &nodeagent.DataPathConcurrency{ + GlobalConfig: globalNum, + PerNodeConfig: []nodeagent.RuledConfigs{ + { + NodeSelector: validLabelSelector1, + Number: 36, + }, + { + NodeSelector: validLabelSelector2, + Number: 66, + }, + }, + }, + }, nil + }, + setKubeClient: true, + kubeClientObj: []runtime.Object{node2}, + expectLog: fmt.Sprintf("Use the per node number %v over global number %v for node %s", 36, globalNum, nodeName), + expectNum: 36, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...) + + logBuffer := "" + + s := &nodeAgentServer{ + nodeName: nodeName, + logger: testutil.NewSingleLogger(&logBuffer), + } + + if test.setKubeClient { + s.kubeClient = fakeKubeClient + } + + getConfigsFunc = test.getFunc + + num := s.getDataPathConcurrentNum(defaultNum) + assert.Equal(t, test.expectNum, num) + if test.expectLog == "" { + assert.Equal(t, "", logBuffer) + } else { + assert.True(t, strings.Contains(logBuffer, test.expectLog)) + } + }) + } +} diff --git a/pkg/nodeagent/node_agent_test.go b/pkg/nodeagent/node_agent_test.go new file mode 100644 index 000000000..9ef8c00b1 --- /dev/null +++ b/pkg/nodeagent/node_agent_test.go @@ -0,0 +1,332 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodeagent + +import ( + "context" + "testing" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" + clientTesting "k8s.io/client-go/testing" + clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "github.com/vmware-tanzu/velero/pkg/builder" +) + +type reactor struct { + verb string + resource string + reactorFunc clientTesting.ReactionFunc +} + +func TestIsRunning(t *testing.T) { + daemonSet := &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "fake-ns", + Name: "node-agent", + }, + TypeMeta: metav1.TypeMeta{ + Kind: "DaemonSet", + }, + } + + tests := []struct { + name string + kubeClientObj []runtime.Object + namespace string + kubeReactors []reactor + expectErr string + }{ + { + name: "ds is not found", + namespace: "fake-ns", + expectErr: "daemonset not found", + }, + { + name: "ds get error", + namespace: "fake-ns", + kubeReactors: []reactor{ + { + verb: "get", + resource: "daemonsets", + reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.New("fake-get-error") + }, + }, + }, + expectErr: "fake-get-error", + }, + { + name: "succeed", + namespace: "fake-ns", + kubeClientObj: []runtime.Object{ + daemonSet, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...) + + for _, reactor := range test.kubeReactors { + fakeKubeClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc) + } + + err := IsRunning(context.TODO(), fakeKubeClient, test.namespace) + if test.expectErr == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, test.expectErr) + } + }) + } +} + +func TestIsRunningInNode(t *testing.T) { + scheme := runtime.NewScheme() + corev1.AddToScheme(scheme) + + nonNodeAgentPod := builder.ForPod("fake-ns", "fake-pod").Result() + nodeAgentPodNotRunning := builder.ForPod("fake-ns", "fake-pod").Labels(map[string]string{"name": "node-agent"}).Result() + nodeAgentPodRunning1 := builder.ForPod("fake-ns", "fake-pod-1").Labels(map[string]string{"name": "node-agent"}).Phase(corev1.PodRunning).Result() + nodeAgentPodRunning2 := builder.ForPod("fake-ns", "fake-pod-2").Labels(map[string]string{"name": "node-agent"}).Phase(corev1.PodRunning).Result() + nodeAgentPodRunning3 := builder.ForPod("fake-ns", "fake-pod-3"). + Labels(map[string]string{"name": "node-agent"}). + Phase(corev1.PodRunning). + NodeName("fake-node"). + Result() + + tests := []struct { + name string + kubeClientObj []runtime.Object + nodeName string + expectErr string + }{ + { + name: "node name is empty", + expectErr: "node name is empty", + }, + { + name: "ds pod not found", + nodeName: "fake-node", + kubeClientObj: []runtime.Object{ + nonNodeAgentPod, + }, + expectErr: "daemonset pod not found in running state in node fake-node", + }, + { + name: "ds po are not all running", + nodeName: "fake-node", + kubeClientObj: []runtime.Object{ + nodeAgentPodNotRunning, + nodeAgentPodRunning1, + }, + expectErr: "daemonset pod not found in running state in node fake-node", + }, + { + name: "ds pods wrong node name", + nodeName: "fake-node", + kubeClientObj: []runtime.Object{ + nodeAgentPodNotRunning, + nodeAgentPodRunning1, + nodeAgentPodRunning2, + }, + expectErr: "daemonset pod not found in running state in node fake-node", + }, + { + name: "succeed", + nodeName: "fake-node", + kubeClientObj: []runtime.Object{ + nodeAgentPodNotRunning, + nodeAgentPodRunning1, + nodeAgentPodRunning2, + nodeAgentPodRunning3, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeClientBuilder := clientFake.NewClientBuilder() + fakeClientBuilder = fakeClientBuilder.WithScheme(scheme) + + fakeClient := fakeClientBuilder.WithRuntimeObjects(test.kubeClientObj...).Build() + + err := IsRunningInNode(context.TODO(), "", test.nodeName, fakeClient) + if test.expectErr == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, test.expectErr) + } + }) + } +} + +func TestGetPodSpec(t *testing.T) { + podSpec := corev1.PodSpec{ + NodeName: "fake-node", + } + + daemonSet := &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "fake-ns", + Name: "node-agent", + }, + TypeMeta: metav1.TypeMeta{ + Kind: "DaemonSet", + }, + Spec: appsv1.DaemonSetSpec{ + Template: corev1.PodTemplateSpec{ + Spec: podSpec, + }, + }, + } + + tests := []struct { + name string + kubeClientObj []runtime.Object + namespace string + expectErr string + expectSpec corev1.PodSpec + }{ + { + name: "ds is not found", + namespace: "fake-ns", + expectErr: "error to get node-agent daemonset: daemonsets.apps \"node-agent\" not found", + }, + { + name: "succeed", + namespace: "fake-ns", + kubeClientObj: []runtime.Object{ + daemonSet, + }, + expectSpec: podSpec, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...) + + spec, err := GetPodSpec(context.TODO(), fakeKubeClient, test.namespace) + if test.expectErr == "" { + assert.NoError(t, err) + assert.Equal(t, *spec, test.expectSpec) + } else { + assert.EqualError(t, err, test.expectErr) + } + }) + } +} + +func TestGetConfigs(t *testing.T) { + cm := builder.ForConfigMap("fake-ns", "node-agent-configs").Result() + cmWithInvalidData := builder.ForConfigMap("fake-ns", "node-agent-configs").Data("fake-key", "fake-value").Result() + cmWithInvalidDataFormat := builder.ForConfigMap("fake-ns", "node-agent-configs").Data("data-path-concurrency", "wrong").Result() + cmWithValidData := builder.ForConfigMap("fake-ns", "node-agent-configs").Data("data-path-concurrency", "{\"globalConfig\": 5}").Result() + + tests := []struct { + name string + kubeClientObj []runtime.Object + namespace string + kubeReactors []reactor + expectResult *DataPathConcurrency + expectErr string + }{ + { + name: "cm is not found", + namespace: "fake-ns", + }, + { + name: "cm get error", + namespace: "fake-ns", + kubeReactors: []reactor{ + { + verb: "get", + resource: "configmaps", + reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.New("fake-get-error") + }, + }, + }, + expectErr: "error to get node agent configs node-agent-configs: fake-get-error", + }, + { + name: "cm's data is nil", + namespace: "fake-ns", + kubeClientObj: []runtime.Object{ + cm, + }, + expectErr: "data is not available in config map node-agent-configs", + }, + { + name: "cm's data is not found", + namespace: "fake-ns", + kubeClientObj: []runtime.Object{ + cmWithInvalidData, + }, + }, + { + name: "cm's data is with invalid format", + namespace: "fake-ns", + kubeClientObj: []runtime.Object{ + cmWithInvalidDataFormat, + }, + expectErr: "error to unmarshall data path concurrency configs from node-agent-configs: invalid character 'w' looking for beginning of value", + }, + { + name: "success", + namespace: "fake-ns", + kubeClientObj: []runtime.Object{ + cmWithValidData, + }, + expectResult: &DataPathConcurrency{ + GlobalConfig: 5, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...) + + for _, reactor := range test.kubeReactors { + fakeKubeClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc) + } + + result, err := GetConfigs(context.TODO(), test.namespace, fakeKubeClient) + if test.expectErr == "" { + assert.NoError(t, err) + + if test.expectResult == nil { + assert.Nil(t, result) + } else { + assert.Equal(t, *test.expectResult, *result.DataPathConcurrency) + } + } else { + assert.EqualError(t, err, test.expectErr) + } + }) + } +} diff --git a/pkg/test/test_logger.go b/pkg/test/test_logger.go index d8095a79d..b890fd5da 100644 --- a/pkg/test/test_logger.go +++ b/pkg/test/test_logger.go @@ -40,7 +40,7 @@ type singleLogRecorder struct { } func (s *singleLogRecorder) Write(p []byte) (n int, err error) { - *s.buffer = string(p[:]) + *s.buffer = *s.buffer + string(p[:]) return len(p), nil } From 1fb0529d9882f89e74bad8c013ccb59669b77f3e Mon Sep 17 00:00:00 2001 From: Xun Jiang Date: Mon, 6 Nov 2023 20:32:45 +0800 Subject: [PATCH 4/7] Add DataUpload Result and CSI VolumeSnapshot check for restore PV. Signed-off-by: Xun Jiang --- changelogs/unreleased/7061-blackpiglet | 1 + pkg/builder/volume_snapshot_builder.go | 5 + pkg/controller/restore_controller.go | 6 + pkg/controller/restore_controller_test.go | 2 + pkg/restore/request.go | 2 + pkg/restore/restore.go | 94 +++++++- pkg/restore/restore_test.go | 262 +++++++++++++++++++++- pkg/test/resources.go | 11 + 8 files changed, 375 insertions(+), 8 deletions(-) create mode 100644 changelogs/unreleased/7061-blackpiglet diff --git a/changelogs/unreleased/7061-blackpiglet b/changelogs/unreleased/7061-blackpiglet new file mode 100644 index 000000000..ac965ed13 --- /dev/null +++ b/changelogs/unreleased/7061-blackpiglet @@ -0,0 +1 @@ +Add DataUpload Result and CSI VolumeSnapshot check for restore PV. \ No newline at end of file diff --git a/pkg/builder/volume_snapshot_builder.go b/pkg/builder/volume_snapshot_builder.go index 19815c0f0..bbaedd16e 100644 --- a/pkg/builder/volume_snapshot_builder.go +++ b/pkg/builder/volume_snapshot_builder.go @@ -67,3 +67,8 @@ func (v *VolumeSnapshotBuilder) BoundVolumeSnapshotContentName(vscName string) * v.object.Status.BoundVolumeSnapshotContentName = &vscName return v } + +func (v *VolumeSnapshotBuilder) SourcePVC(name string) *VolumeSnapshotBuilder { + v.object.Spec.Source.PersistentVolumeClaimName = &name + return v +} diff --git a/pkg/controller/restore_controller.go b/pkg/controller/restore_controller.go index 5d6ed505e..f6b9b39d9 100644 --- a/pkg/controller/restore_controller.go +++ b/pkg/controller/restore_controller.go @@ -515,6 +515,11 @@ func (r *restoreReconciler) runValidatedRestore(restore *api.Restore, info backu return errors.Wrap(err, "error fetching volume snapshots metadata") } + csiVolumeSnapshots, err := backupStore.GetCSIVolumeSnapshots(restore.Spec.BackupName) + if err != nil { + return errors.Wrap(err, "fail to fetch CSI VolumeSnapshots metadata") + } + restoreLog.Info("starting restore") var podVolumeBackups []*api.PodVolumeBackup @@ -531,6 +536,7 @@ func (r *restoreReconciler) runValidatedRestore(restore *api.Restore, info backu BackupReader: backupFile, ResourceModifiers: resourceModifiers, DisableInformerCache: r.disableInformerCache, + CSIVolumeSnapshots: csiVolumeSnapshots, } restoreWarnings, restoreErrors := r.restorer.RestoreWithResolvers(restoreReq, actionsResolver, pluginManager) diff --git a/pkg/controller/restore_controller_test.go b/pkg/controller/restore_controller_test.go index b7f6fbd25..9437f1d1c 100644 --- a/pkg/controller/restore_controller_test.go +++ b/pkg/controller/restore_controller_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" @@ -471,6 +472,7 @@ func TestRestoreReconcile(t *testing.T) { } if test.expectedRestorerCall != nil { backupStore.On("GetBackupContents", test.backup.Name).Return(io.NopCloser(bytes.NewReader([]byte("hello world"))), nil) + backupStore.On("GetCSIVolumeSnapshots", test.backup.Name).Return([]*snapshotv1api.VolumeSnapshot{}, nil) restorer.On("RestoreWithResolvers", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(warnings, errors) diff --git a/pkg/restore/request.go b/pkg/restore/request.go index dcc2ef3d6..2a267a5ff 100644 --- a/pkg/restore/request.go +++ b/pkg/restore/request.go @@ -21,6 +21,7 @@ import ( "io" "sort" + snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/runtime" @@ -60,6 +61,7 @@ type Request struct { itemOperationsList *[]*itemoperation.RestoreOperation ResourceModifiers *resourcemodifiers.ResourceModifiers DisableInformerCache bool + CSIVolumeSnapshots []*snapshotv1api.VolumeSnapshot } type restoredItemStatus struct { diff --git a/pkg/restore/restore.go b/pkg/restore/restore.go index dee68d33f..51d29957b 100644 --- a/pkg/restore/restore.go +++ b/pkg/restore/restore.go @@ -30,6 +30,7 @@ import ( "time" "github.com/google/uuid" + snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" @@ -299,6 +300,7 @@ func (kr *kubernetesRestorer) RestoreWithResolvers( pvsToProvision: sets.NewString(), pvRestorer: pvRestorer, volumeSnapshots: req.VolumeSnapshots, + csiVolumeSnapshots: req.CSIVolumeSnapshots, podVolumeBackups: req.PodVolumeBackups, resourceTerminatingTimeout: kr.resourceTerminatingTimeout, resourceTimeout: kr.resourceTimeout, @@ -348,6 +350,7 @@ type restoreContext struct { pvsToProvision sets.String pvRestorer PVRestorer volumeSnapshots []*volume.Snapshot + csiVolumeSnapshots []*snapshotv1api.VolumeSnapshot podVolumeBackups []*velerov1api.PodVolumeBackup resourceTerminatingTimeout time.Duration resourceTimeout time.Duration @@ -1288,7 +1291,35 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso } case hasPodVolumeBackup(obj, ctx): - ctx.log.Infof("Dynamically re-provisioning persistent volume because it has a pod volume backup to be restored.") + ctx.log.WithFields(logrus.Fields{ + "namespace": obj.GetNamespace(), + "name": obj.GetName(), + "groupResource": groupResource.String(), + }).Infof("Dynamically re-provisioning persistent volume because it has a pod volume backup to be restored.") + ctx.pvsToProvision.Insert(name) + + // Return early because we don't want to restore the PV itself, we + // want to dynamically re-provision it. + return warnings, errs, itemExists + + case hasCSIVolumeSnapshot(ctx, obj): + ctx.log.WithFields(logrus.Fields{ + "namespace": obj.GetNamespace(), + "name": obj.GetName(), + "groupResource": groupResource.String(), + }).Infof("Dynamically re-provisioning persistent volume because it has a related CSI VolumeSnapshot.") + ctx.pvsToProvision.Insert(name) + + // Return early because we don't want to restore the PV itself, we + // want to dynamically re-provision it. + return warnings, errs, itemExists + + case hasSnapshotDataUpload(ctx, obj): + ctx.log.WithFields(logrus.Fields{ + "namespace": obj.GetNamespace(), + "name": obj.GetName(), + "groupResource": groupResource.String(), + }).Infof("Dynamically re-provisioning persistent volume because it has a related snapshot DataUpload.") ctx.pvsToProvision.Insert(name) // Return early because we don't want to restore the PV itself, we @@ -1296,7 +1327,11 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso return warnings, errs, itemExists case hasDeleteReclaimPolicy(obj.Object): - ctx.log.Infof("Dynamically re-provisioning persistent volume because it doesn't have a snapshot and its reclaim policy is Delete.") + ctx.log.WithFields(logrus.Fields{ + "namespace": obj.GetNamespace(), + "name": obj.GetName(), + "groupResource": groupResource.String(), + }).Infof("Dynamically re-provisioning persistent volume because it doesn't have a snapshot and its reclaim policy is Delete.") ctx.pvsToProvision.Insert(name) // Return early because we don't want to restore the PV itself, we @@ -1304,7 +1339,11 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso return warnings, errs, itemExists default: - ctx.log.Infof("Restoring persistent volume as-is because it doesn't have a snapshot and its reclaim policy is not Delete.") + ctx.log.WithFields(logrus.Fields{ + "namespace": obj.GetNamespace(), + "name": obj.GetName(), + "groupResource": groupResource.String(), + }).Infof("Restoring persistent volume as-is because it doesn't have a snapshot and its reclaim policy is not Delete.") // Check to see if the claimRef.namespace field needs to be remapped, and do so if necessary. _, err = remapClaimRefNS(ctx, obj) @@ -1937,6 +1976,55 @@ func hasSnapshot(pvName string, snapshots []*volume.Snapshot) bool { return false } +func hasCSIVolumeSnapshot(ctx *restoreContext, unstructuredPV *unstructured.Unstructured) bool { + pv := new(v1.PersistentVolume) + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredPV.Object, pv); err != nil { + ctx.log.WithError(err).Warnf("Unable to convert PV from unstructured to structured") + return false + } + + for _, vs := range ctx.csiVolumeSnapshots { + if pv.Spec.ClaimRef.Name == *vs.Spec.Source.PersistentVolumeClaimName && + pv.Spec.ClaimRef.Namespace == vs.Namespace { + return true + } + } + return false +} + +func hasSnapshotDataUpload(ctx *restoreContext, unstructuredPV *unstructured.Unstructured) bool { + pv := new(v1.PersistentVolume) + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredPV.Object, pv); err != nil { + ctx.log.WithError(err).Warnf("Unable to convert PV from unstructured to structured") + return false + } + + if pv.Spec.ClaimRef == nil { + return false + } + + dataUploadResultList := new(v1.ConfigMapList) + err := ctx.kbClient.List(go_context.TODO(), dataUploadResultList, &crclient.ListOptions{ + LabelSelector: labels.SelectorFromSet(map[string]string{ + velerov1api.RestoreUIDLabel: label.GetValidName(string(ctx.restore.GetUID())), + velerov1api.PVCNamespaceNameLabel: label.GetValidName(pv.Spec.ClaimRef.Namespace + "." + pv.Spec.ClaimRef.Name), + velerov1api.ResourceUsageLabel: label.GetValidName(string(velerov1api.VeleroResourceUsageDataUploadResult)), + }), + }) + if err != nil { + ctx.log.WithError(err).Warnf("Fail to list DataUpload result CM.") + return false + } + + if len(dataUploadResultList.Items) != 1 { + ctx.log.WithError(fmt.Errorf("dataupload result number is not expected")). + Warnf("Got %d DataUpload result. Expect one.", len(dataUploadResultList.Items)) + return false + } + + return true +} + func hasPodVolumeBackup(unstructuredPV *unstructured.Unstructured, ctx *restoreContext) bool { if len(ctx.podVolumeBackups) == 0 { return false diff --git a/pkg/restore/restore_test.go b/pkg/restore/restore_test.go index 4ffd76257..643d001af 100644 --- a/pkg/restore/restore_test.go +++ b/pkg/restore/restore_test.go @@ -25,6 +25,7 @@ import ( "testing" "time" + snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" @@ -2256,6 +2257,7 @@ func (*volumeSnapshotter) DeleteSnapshot(snapshotID string) error { // Verification is done by looking at the contents of the API and the metadata/spec/status of // the items in the API. func TestRestorePersistentVolumes(t *testing.T) { + testPVCName := "testPVC" tests := []struct { name string restore *velerov1api.Restore @@ -2265,6 +2267,8 @@ func TestRestorePersistentVolumes(t *testing.T) { volumeSnapshots []*volume.Snapshot volumeSnapshotLocations []*velerov1api.VolumeSnapshotLocation volumeSnapshotterGetter volumeSnapshotterGetter + csiVolumeSnapshots []*snapshotv1api.VolumeSnapshot + dataUploadResult *corev1api.ConfigMap want []*test.APIResource wantError bool wantWarning bool @@ -2923,6 +2927,77 @@ func TestRestorePersistentVolumes(t *testing.T) { ), }, }, + { + name: "when a PV with a reclaim policy of retain has a CSI VolumeSnapshot and does not exist in-cluster, the PV is not restored", + restore: defaultRestore().Result(), + backup: defaultBackup().Result(), + tarball: test.NewTarWriter(t). + AddItems("persistentvolumes", + builder.ForPersistentVolume("pv-1"). + ReclaimPolicy(corev1api.PersistentVolumeReclaimRetain). + ClaimRef("velero", testPVCName). + Result(), + ). + Done(), + apiResources: []*test.APIResource{ + test.PVs(), + test.PVCs(), + }, + csiVolumeSnapshots: []*snapshotv1api.VolumeSnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "velero", + Name: "test", + }, + Spec: snapshotv1api.VolumeSnapshotSpec{ + Source: snapshotv1api.VolumeSnapshotSource{ + PersistentVolumeClaimName: &testPVCName, + }, + }, + }, + }, + volumeSnapshotLocations: []*velerov1api.VolumeSnapshotLocation{ + builder.ForVolumeSnapshotLocation(velerov1api.DefaultNamespace, "default").Provider("provider-1").Result(), + }, + volumeSnapshotterGetter: map[string]vsv1.VolumeSnapshotter{ + "provider-1": &volumeSnapshotter{ + snapshotVolumes: map[string]string{"snapshot-1": "new-volume"}, + }, + }, + want: []*test.APIResource{}, + }, + { + name: "when a PV with a reclaim policy of retain has a DataUpload result CM and does not exist in-cluster, the PV is not restored", + restore: defaultRestore().ObjectMeta(builder.WithUID("fakeUID")).Result(), + backup: defaultBackup().Result(), + tarball: test.NewTarWriter(t). + AddItems("persistentvolumes", + builder.ForPersistentVolume("pv-1"). + ReclaimPolicy(corev1api.PersistentVolumeReclaimRetain). + ClaimRef("velero", testPVCName). + Result(), + ). + Done(), + apiResources: []*test.APIResource{ + test.PVs(), + test.PVCs(), + test.ConfigMaps(), + }, + volumeSnapshotLocations: []*velerov1api.VolumeSnapshotLocation{ + builder.ForVolumeSnapshotLocation(velerov1api.DefaultNamespace, "default").Provider("provider-1").Result(), + }, + volumeSnapshotterGetter: map[string]vsv1.VolumeSnapshotter{ + "provider-1": &volumeSnapshotter{ + snapshotVolumes: map[string]string{"snapshot-1": "new-volume"}, + }, + }, + dataUploadResult: builder.ForConfigMap("velero", "test").ObjectMeta(builder.WithLabelsMap(map[string]string{ + velerov1api.RestoreUIDLabel: "fakeUID", + velerov1api.PVCNamespaceNameLabel: "velero/testPVC", + velerov1api.ResourceUsageLabel: string(velerov1api.VeleroResourceUsageDataUploadResult), + })).Result(), + want: []*test.APIResource{}, + }, } for _, tc := range tests { @@ -2939,6 +3014,10 @@ func TestRestorePersistentVolumes(t *testing.T) { require.NoError(t, h.restorer.kbClient.Create(context.Background(), vsl)) } + if tc.dataUploadResult != nil { + require.NoError(t, h.restorer.kbClient.Create(context.TODO(), tc.dataUploadResult)) + } + for _, r := range tc.apiResources { h.AddItems(t, r) } @@ -2955,11 +3034,12 @@ func TestRestorePersistentVolumes(t *testing.T) { } data := &Request{ - Log: h.log, - Restore: tc.restore, - Backup: tc.backup, - VolumeSnapshots: tc.volumeSnapshots, - BackupReader: tc.tarball, + Log: h.log, + Restore: tc.restore, + Backup: tc.backup, + VolumeSnapshots: tc.volumeSnapshots, + BackupReader: tc.tarball, + CSIVolumeSnapshots: tc.csiVolumeSnapshots, } warnings, errs := h.restorer.Restore( data, @@ -3652,3 +3732,175 @@ func TestIsAlreadyExistsError(t *testing.T) { }) } } + +func TestHasCSIVolumeSnapshot(t *testing.T) { + tests := []struct { + name string + vs *snapshotv1api.VolumeSnapshot + obj *unstructured.Unstructured + expectedResult bool + }{ + { + name: "Invalid PV, expect false.", + obj: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "kind": 1, + }, + }, + expectedResult: false, + }, + { + name: "Cannot find VS, expect false", + obj: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "kind": "PersistentVolume", + "apiVersion": "v1", + "metadata": map[string]interface{}{ + "namespace": "default", + "name": "test", + }, + }, + }, + expectedResult: false, + }, + { + name: "Find VS, expect true.", + obj: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "kind": "PersistentVolume", + "apiVersion": "v1", + "metadata": map[string]interface{}{ + "namespace": "velero", + "name": "test", + }, + "spec": map[string]interface{}{ + "claimRef": map[string]interface{}{ + "namespace": "velero", + "name": "test", + }, + }, + }, + }, + vs: builder.ForVolumeSnapshot("velero", "test").SourcePVC("test").Result(), + expectedResult: true, + }, + } + + for _, tc := range tests { + h := newHarness(t) + + ctx := &restoreContext{ + log: h.log, + } + + if tc.vs != nil { + ctx.csiVolumeSnapshots = []*snapshotv1api.VolumeSnapshot{tc.vs} + } + + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.expectedResult, hasCSIVolumeSnapshot(ctx, tc.obj)) + }) + } +} + +func TestHasSnapshotDataUpload(t *testing.T) { + tests := []struct { + name string + duResult *corev1api.ConfigMap + obj *unstructured.Unstructured + expectedResult bool + restore *velerov1api.Restore + }{ + { + name: "Invalid PV, expect false.", + obj: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "kind": 1, + }, + }, + expectedResult: false, + }, + { + name: "PV without ClaimRef, expect false", + obj: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "kind": "PersistentVolume", + "apiVersion": "v1", + "metadata": map[string]interface{}{ + "namespace": "default", + "name": "test", + }, + }, + }, + duResult: builder.ForConfigMap("velero", "test").Result(), + restore: builder.ForRestore("velero", "test").ObjectMeta(builder.WithUID("fakeUID")).Result(), + expectedResult: false, + }, + { + name: "Cannot find DataUploadResult CM, expect false", + obj: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "kind": "PersistentVolume", + "apiVersion": "v1", + "metadata": map[string]interface{}{ + "namespace": "default", + "name": "test", + }, + "spec": map[string]interface{}{ + "claimRef": map[string]interface{}{ + "namespace": "velero", + "name": "testPVC", + }, + }, + }, + }, + duResult: builder.ForConfigMap("velero", "test").Result(), + restore: builder.ForRestore("velero", "test").ObjectMeta(builder.WithUID("fakeUID")).Result(), + expectedResult: false, + }, + { + name: "Find DataUploadResult CM, expect true", + obj: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "kind": "PersistentVolume", + "apiVersion": "v1", + "metadata": map[string]interface{}{ + "namespace": "default", + "name": "test", + }, + "spec": map[string]interface{}{ + "claimRef": map[string]interface{}{ + "namespace": "velero", + "name": "testPVC", + }, + }, + }, + }, + duResult: builder.ForConfigMap("velero", "test").ObjectMeta(builder.WithLabelsMap(map[string]string{ + velerov1api.RestoreUIDLabel: "fakeUID", + velerov1api.PVCNamespaceNameLabel: "velero/testPVC", + velerov1api.ResourceUsageLabel: string(velerov1api.VeleroResourceUsageDataUploadResult), + })).Result(), + restore: builder.ForRestore("velero", "test").ObjectMeta(builder.WithUID("fakeUID")).Result(), + expectedResult: false, + }, + } + + for _, tc := range tests { + h := newHarness(t) + + ctx := &restoreContext{ + log: h.log, + kbClient: h.restorer.kbClient, + restore: tc.restore, + } + + if tc.duResult != nil { + require.NoError(t, ctx.kbClient.Create(context.TODO(), tc.duResult)) + } + + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.expectedResult, hasSnapshotDataUpload(ctx, tc.obj)) + }) + } +} diff --git a/pkg/test/resources.go b/pkg/test/resources.go index 7c2fa17f6..709497fca 100644 --- a/pkg/test/resources.go +++ b/pkg/test/resources.go @@ -142,6 +142,17 @@ func ServiceAccounts(items ...metav1.Object) *APIResource { } } +func ConfigMaps(items ...metav1.Object) *APIResource { + return &APIResource{ + Group: "", + Version: "v1", + Name: "configmaps", + ShortName: "cm", + Namespaced: true, + Items: items, + } +} + func CRDs(items ...metav1.Object) *APIResource { return &APIResource{ Group: "apiextensions.k8s.io", From 5a10f9090aafeda3f5d027689bfe96bac1d104d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wenkai=20Yin=28=E5=B0=B9=E6=96=87=E5=BC=80=29?= Date: Mon, 6 Nov 2023 16:16:38 +0800 Subject: [PATCH 5/7] Truncate the credential file to avoid the change of secret content messing it up MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Truncate the credential file to avoid the change of secret content messing it up Signed-off-by: Wenkai Yin(尹文开) --- changelogs/unreleased/7072-ywk253100 | 1 + internal/credentials/file_store.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 changelogs/unreleased/7072-ywk253100 diff --git a/changelogs/unreleased/7072-ywk253100 b/changelogs/unreleased/7072-ywk253100 new file mode 100644 index 000000000..2a6faffe3 --- /dev/null +++ b/changelogs/unreleased/7072-ywk253100 @@ -0,0 +1 @@ +Truncate the credential file to avoid the change of secret content messing it up \ No newline at end of file diff --git a/internal/credentials/file_store.go b/internal/credentials/file_store.go index 1332d4f8d..4b5d25664 100644 --- a/internal/credentials/file_store.go +++ b/internal/credentials/file_store.go @@ -71,7 +71,7 @@ func (n *namespacedFileStore) Path(selector *corev1api.SecretKeySelector) (strin keyFilePath := filepath.Join(n.fsRoot, fmt.Sprintf("%s-%s", selector.Name, selector.Key)) - file, err := n.fs.OpenFile(keyFilePath, os.O_RDWR|os.O_CREATE, 0644) + file, err := n.fs.OpenFile(keyFilePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) if err != nil { return "", errors.Wrap(err, "unable to open credentials file for writing") } From db43200cc846e05847764d4205f95ff3e3072d4a Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Wed, 8 Nov 2023 11:00:35 +0800 Subject: [PATCH 6/7] configurable data path concurrency: all in one json Signed-off-by: Lyndon-Li --- design/node-agent-concurrency.md | 53 ++++++++++++++++------------ pkg/cmd/cli/nodeagent/server.go | 2 +- pkg/cmd/cli/nodeagent/server_test.go | 4 +-- pkg/nodeagent/node_agent.go | 16 ++++----- pkg/nodeagent/node_agent_test.go | 35 ++++++++++-------- 5 files changed, 61 insertions(+), 49 deletions(-) diff --git a/design/node-agent-concurrency.md b/design/node-agent-concurrency.md index c3d84e69e..71dd33e21 100644 --- a/design/node-agent-concurrency.md +++ b/design/node-agent-concurrency.md @@ -28,10 +28,15 @@ Therefore, in order to gain the optimized performance with the limited resources We introduce a configMap named ```node-agent-configs``` for users to specify the node-agent related configurations. This configMap is not created by Velero, users should create it manually on demand. The configMap should be in the same namespace where Velero is installed. If multiple Velero instances are installed in different namespaces, there should be one configMap in each namespace which applies to node-agent in that namespace only. Node-agent server checks these configurations at startup time and use it to initiate the related VGDP modules. Therefore, users could edit this configMap any time, but in order to make the changes effective, node-agent server needs to be restarted. -The ```node-agent-configs``` configMap may be used for other purpose of configuring node-agent in future, at present, there is only one kind of configuration as the data in the configMap, the name is ```data-path-concurrency```. +The ```node-agent-configs``` configMap may be used for other purpose of configuring node-agent in future, at present, there is only one kind of configuration as the data in the configMap, the name is ```dataPathConcurrency```. -The data structure for ```data-path-concurrency``` is as below: +The data structure for ```node-agent-configs``` is as below: ```go +type Configs struct { + // DataPathConcurrency is the config for data path concurrency per node. + DataPathConcurrency *DataPathConcurrency `json:"dataPathConcurrency,omitempty"` +} + type DataPathConcurrency struct { // GlobalConfig specifies the concurrency number to all nodes for which per-node config is not specified GlobalConfig int `json:"globalConfig,omitempty"` @@ -50,7 +55,7 @@ type RuledConfigs struct { ``` ### Global concurrent number -We allow users to specify a concurrent number that will be applied to all nodes if the per-node number is not specified. This number is set through ```globalConfig``` field in ```data-path-concurrency```. +We allow users to specify a concurrent number that will be applied to all nodes if the per-node number is not specified. This number is set through ```globalConfig``` field in ```dataPathConcurrency```. The number starts from 1 which means there is no concurrency, only one instance of VGDP is allowed. There is no roof limit. If this number is not specified or not valid, a hard-coded default value will be used, the value is set to 1. @@ -62,7 +67,7 @@ We allow users to specify different concurrent number per node, for example, use The range of Per-node concurrent number is the same with Global concurrent number. Per-node concurrent number is preferable to Global concurrent number, so it will overwrite the Global concurrent number for that node. -Per-node concurrent number is implemented through ```perNodeConfig``` field in ```data-path-concurrency```. +Per-node concurrent number is implemented through ```perNodeConfig``` field in ```dataPathConcurrency```. ```perNodeConfig``` is a list of ```RuledConfigs``` each item of which matches one or more nodes by label selectors and specify the concurrent number for the matched nodes. This means, the nodes are identified by labels. @@ -80,30 +85,32 @@ If one node falls into more than one rules, e.g., if node1 also has the label `` A sample of the ```node-agent-configs``` configMap is as below: ```json { - "globalConfig": 2, - "perNodeConfig": [ - { - "nodeSelector": { - "matchLabels": { - "kubernetes.io/hostname": "node1" - } + "dataPathConcurrency": { + "globalConfig": 2, + "perNodeConfig": [ + { + "nodeSelector": { + "matchLabels": { + "kubernetes.io/hostname": "node1" + } + }, + "number": 3 }, - "number": 3 - }, - { - "nodeSelector": { - "matchLabels": { - "beta.kubernetes.io/instance-type": "Standard_B4ms" - } - }, - "number": 5 - } - ] + { + "nodeSelector": { + "matchLabels": { + "beta.kubernetes.io/instance-type": "Standard_B4ms" + } + }, + "number": 5 + } + ] + } } ``` To create the configMap, users need to save something like the above sample to a json file and then run below command: ``` -kubectl create cm node-agent-configs -n velero --from-file=data-path-concurrency= +kubectl create cm node-agent-configs -n velero --from-file= ``` ### Global data path manager diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 133992006..835b899c3 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -499,7 +499,7 @@ func (s *nodeAgentServer) getDataPathConcurrentNum(defaultNum int) int { } if configs == nil || configs.DataPathConcurrency == nil { - s.logger.Infof("Node agent configs are not found, use the default number %v", defaultNum) + s.logger.Infof("Concurrency configs are not found, use the default number %v", defaultNum) return defaultNum } diff --git a/pkg/cmd/cli/nodeagent/server_test.go b/pkg/cmd/cli/nodeagent/server_test.go index f3115923b..4472dfce1 100644 --- a/pkg/cmd/cli/nodeagent/server_test.go +++ b/pkg/cmd/cli/nodeagent/server_test.go @@ -161,7 +161,7 @@ func Test_getDataPathConcurrentNum(t *testing.T) { getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { return nil, nil }, - expectLog: fmt.Sprintf("Node agent configs are not found, use the default number %v", defaultNum), + expectLog: fmt.Sprintf("Concurrency configs are not found, use the default number %v", defaultNum), expectNum: defaultNum, }, { @@ -169,7 +169,7 @@ func Test_getDataPathConcurrentNum(t *testing.T) { getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { return &nodeagent.Configs{}, nil }, - expectLog: fmt.Sprintf("Node agent configs are not found, use the default number %v", defaultNum), + expectLog: fmt.Sprintf("Concurrency configs are not found, use the default number %v", defaultNum), expectNum: defaultNum, }, { diff --git a/pkg/nodeagent/node_agent.go b/pkg/nodeagent/node_agent.go index 51357766b..ff93ed596 100644 --- a/pkg/nodeagent/node_agent.go +++ b/pkg/nodeagent/node_agent.go @@ -61,7 +61,7 @@ type RuledConfigs struct { type Configs struct { // DataPathConcurrency is the config for data path concurrency per node. - DataPathConcurrency *DataPathConcurrency + DataPathConcurrency *DataPathConcurrency `json:"dataPathConcurrency,omitempty"` } // IsRunning checks if the node agent daemonset is running properly. If not, return the error found @@ -128,16 +128,16 @@ func GetConfigs(ctx context.Context, namespace string, kubeClient kubernetes.Int return nil, errors.Errorf("data is not available in config map %s", configName) } - jsonString, exist := cm.Data[dataPathConConfigName] - if !exist { - return nil, nil + jsonString := "" + for _, v := range cm.Data { + jsonString = v } - concurrencyConfigs := &DataPathConcurrency{} - err = json.Unmarshal([]byte(jsonString), concurrencyConfigs) + configs := &Configs{} + err = json.Unmarshal([]byte(jsonString), configs) if err != nil { - return nil, errors.Wrapf(err, "error to unmarshall data path concurrency configs from %s", configName) + return nil, errors.Wrapf(err, "error to unmarshall configs from %s", configName) } - return &Configs{DataPathConcurrency: concurrencyConfigs}, nil + return configs, nil } diff --git a/pkg/nodeagent/node_agent_test.go b/pkg/nodeagent/node_agent_test.go index 9ef8c00b1..a18e45b14 100644 --- a/pkg/nodeagent/node_agent_test.go +++ b/pkg/nodeagent/node_agent_test.go @@ -242,16 +242,16 @@ func TestGetPodSpec(t *testing.T) { func TestGetConfigs(t *testing.T) { cm := builder.ForConfigMap("fake-ns", "node-agent-configs").Result() - cmWithInvalidData := builder.ForConfigMap("fake-ns", "node-agent-configs").Data("fake-key", "fake-value").Result() - cmWithInvalidDataFormat := builder.ForConfigMap("fake-ns", "node-agent-configs").Data("data-path-concurrency", "wrong").Result() - cmWithValidData := builder.ForConfigMap("fake-ns", "node-agent-configs").Data("data-path-concurrency", "{\"globalConfig\": 5}").Result() + cmWithInvalidDataFormat := builder.ForConfigMap("fake-ns", "node-agent-configs").Data("fake-key", "wrong").Result() + cmWithoutCocurrentData := builder.ForConfigMap("fake-ns", "node-agent-configs").Data("fake-key", "{\"someothers\":{\"someother\": 10}}").Result() + cmWithValidData := builder.ForConfigMap("fake-ns", "node-agent-configs").Data("fake-key", "{\"dataPathConcurrency\":{\"globalConfig\": 5}}").Result() tests := []struct { name string kubeClientObj []runtime.Object namespace string kubeReactors []reactor - expectResult *DataPathConcurrency + expectResult *Configs expectErr string }{ { @@ -280,20 +280,21 @@ func TestGetConfigs(t *testing.T) { }, expectErr: "data is not available in config map node-agent-configs", }, - { - name: "cm's data is not found", - namespace: "fake-ns", - kubeClientObj: []runtime.Object{ - cmWithInvalidData, - }, - }, { name: "cm's data is with invalid format", namespace: "fake-ns", kubeClientObj: []runtime.Object{ cmWithInvalidDataFormat, }, - expectErr: "error to unmarshall data path concurrency configs from node-agent-configs: invalid character 'w' looking for beginning of value", + expectErr: "error to unmarshall configs from node-agent-configs: invalid character 'w' looking for beginning of value", + }, + { + name: "concurrency configs are not found", + namespace: "fake-ns", + kubeClientObj: []runtime.Object{ + cmWithoutCocurrentData, + }, + expectResult: &Configs{nil}, }, { name: "success", @@ -301,8 +302,10 @@ func TestGetConfigs(t *testing.T) { kubeClientObj: []runtime.Object{ cmWithValidData, }, - expectResult: &DataPathConcurrency{ - GlobalConfig: 5, + expectResult: &Configs{ + DataPathConcurrency: &DataPathConcurrency{ + GlobalConfig: 5, + }, }, }, } @@ -321,8 +324,10 @@ func TestGetConfigs(t *testing.T) { if test.expectResult == nil { assert.Nil(t, result) + } else if test.expectResult.DataPathConcurrency == nil { + assert.Nil(t, result.DataPathConcurrency) } else { - assert.Equal(t, *test.expectResult, *result.DataPathConcurrency) + assert.Equal(t, *test.expectResult.DataPathConcurrency, *result.DataPathConcurrency) } } else { assert.EqualError(t, err, test.expectErr) From 3fa7d295730e505baf295ebee48271b9a4991358 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Verch=C3=A8re?= Date: Thu, 9 Nov 2023 17:45:58 +0100 Subject: [PATCH 7/7] doc: add resourcePolicy for schedule (#7079) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Rémi Verchère --- site/content/docs/main/api-types/schedule.md | 5 +++++ site/content/docs/v1.12/api-types/schedule.md | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/site/content/docs/main/api-types/schedule.md b/site/content/docs/main/api-types/schedule.md index c2c55f6b7..eb8e8fbd8 100644 --- a/site/content/docs/main/api-types/schedule.md +++ b/site/content/docs/main/api-types/schedule.md @@ -41,6 +41,11 @@ spec: # CSI VolumeSnapshot status turns to ReadyToUse during creation, before # returning error as timeout. The default value is 10 minute. csiSnapshotTimeout: 10m + # resourcePolicy specifies the referenced resource policies that backup should follow + # optional + resourcePolicy: + kind: configmap + name: resource-policy-configmap # Array of namespaces to include in the scheduled backup. If unspecified, all namespaces are included. # Optional. includedNamespaces: diff --git a/site/content/docs/v1.12/api-types/schedule.md b/site/content/docs/v1.12/api-types/schedule.md index c2c55f6b7..eb8e8fbd8 100644 --- a/site/content/docs/v1.12/api-types/schedule.md +++ b/site/content/docs/v1.12/api-types/schedule.md @@ -41,6 +41,11 @@ spec: # CSI VolumeSnapshot status turns to ReadyToUse during creation, before # returning error as timeout. The default value is 10 minute. csiSnapshotTimeout: 10m + # resourcePolicy specifies the referenced resource policies that backup should follow + # optional + resourcePolicy: + kind: configmap + name: resource-policy-configmap # Array of namespaces to include in the scheduled backup. If unspecified, all namespaces are included. # Optional. includedNamespaces: