diff --git a/pkg/donut/bucket.go b/pkg/donut/bucket.go index cd8e2c7c3..afb30505e 100644 --- a/pkg/donut/bucket.go +++ b/pkg/donut/bucket.go @@ -318,15 +318,15 @@ func (b bucket) writeObjectMetadata(objectName string, objMetadata ObjectMetadat if err != nil { return iodine.New(err, nil) } - for _, objMetadataWriter := range objMetadataWriters { - defer objMetadataWriter.Close() - } for _, objMetadataWriter := range objMetadataWriters { jenc := json.NewEncoder(objMetadataWriter) if err := jenc.Encode(&objMetadata); err != nil { return iodine.New(err, nil) } } + for _, objMetadataWriter := range objMetadataWriters { + objMetadataWriter.Close() + } return nil } diff --git a/pkg/donut/disk/disk.go b/pkg/donut/disk/disk.go index ad7ecdbb5..6db2bc962 100644 --- a/pkg/donut/disk/disk.go +++ b/pkg/donut/disk/disk.go @@ -18,10 +18,12 @@ package disk import ( "fmt" + "io/ioutil" "os" "path/filepath" "strconv" "strings" + "sync" "syscall" "github.com/minio/minio/pkg/iodine" @@ -29,10 +31,41 @@ import ( // Disk container for disk parameters type Disk struct { + lock *sync.Mutex path string fsInfo map[string]string } +// File container provided by disk for atomic file writes +type File struct { + *os.File + path string +} + +// Close the file replacing the configured file. +func (f *File) Close() error { + if err := f.File.Close(); err != nil { + return err + } + if err := os.Rename(f.Name(), f.path); err != nil { + return err + } + return nil +} + +// Abort closes the file and removes it instead of replacing the configured +// file. This is useful if after starting to write to the file you decide you +// don't want it anymore. +func (f *File) Abort() error { + if err := f.File.Close(); err != nil { + return err + } + if err := os.Remove(f.Name()); err != nil { + return err + } + return nil +} + // New - instantiate new disk func New(diskPath string) (Disk, error) { if diskPath == "" { @@ -51,6 +84,7 @@ func New(diskPath string) (Disk, error) { return Disk{}, iodine.New(err, nil) } disk := Disk{ + lock: &sync.Mutex{}, path: diskPath, fsInfo: make(map[string]string), } @@ -70,6 +104,9 @@ func (disk Disk) GetPath() string { // GetFSInfo - get disk filesystem and its usage information func (disk Disk) GetFSInfo() map[string]string { + disk.lock.Lock() + defer disk.lock.Unlock() + s := syscall.Statfs_t{} err := syscall.Statfs(disk.path, &s) if err != nil { @@ -84,11 +121,16 @@ func (disk Disk) GetFSInfo() map[string]string { // MakeDir - make a directory inside disk root path func (disk Disk) MakeDir(dirname string) error { + disk.lock.Lock() + defer disk.lock.Unlock() return os.MkdirAll(filepath.Join(disk.path, dirname), 0700) } // ListDir - list a directory inside disk root path, get only directories func (disk Disk) ListDir(dirname string) ([]os.FileInfo, error) { + disk.lock.Lock() + defer disk.lock.Unlock() + dir, err := os.Open(filepath.Join(disk.path, dirname)) if err != nil { return nil, iodine.New(err, nil) @@ -110,6 +152,9 @@ func (disk Disk) ListDir(dirname string) ([]os.FileInfo, error) { // ListFiles - list a directory inside disk root path, get only files func (disk Disk) ListFiles(dirname string) ([]os.FileInfo, error) { + disk.lock.Lock() + defer disk.lock.Unlock() + dir, err := os.Open(filepath.Join(disk.path, dirname)) if err != nil { return nil, iodine.New(err, nil) @@ -129,25 +174,37 @@ func (disk Disk) ListFiles(dirname string) ([]os.FileInfo, error) { return files, nil } -// CreateFile - create a file inside disk root path -func (disk Disk) CreateFile(filename string) (*os.File, error) { +// CreateFile - create a file inside disk root path, replies with custome disk.File which provides atomic writes +func (disk Disk) CreateFile(filename string) (*File, error) { + disk.lock.Lock() + defer disk.lock.Unlock() + if filename == "" { return nil, iodine.New(InvalidArgument{}, nil) } + filePath := filepath.Join(disk.path, filename) // Create directories if they don't exist if err := os.MkdirAll(filepath.Dir(filePath), 0700); err != nil { return nil, iodine.New(err, nil) } - dataFile, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0600) + f, err := ioutil.TempFile(filepath.Dir(filePath), filepath.Base(filePath)) if err != nil { return nil, iodine.New(err, nil) } - return dataFile, nil + if err := os.Chmod(f.Name(), 0600); err != nil { + os.Remove(f.Name()) + return nil, iodine.New(err, nil) + } + + return &File{File: f, path: filePath}, nil } // OpenFile - read a file inside disk root path func (disk Disk) OpenFile(filename string) (*os.File, error) { + disk.lock.Lock() + defer disk.lock.Unlock() + if filename == "" { return nil, iodine.New(InvalidArgument{}, nil) } diff --git a/pkg/donut/disk/disk_test.go b/pkg/donut/disk/disk_test.go index 18cb5145d..93fc9e87f 100644 --- a/pkg/donut/disk/disk_test.go +++ b/pkg/donut/disk/disk_test.go @@ -61,18 +61,24 @@ func (s *MyDiskSuite) TestDiskCreateDir(c *C) { func (s *MyDiskSuite) TestDiskCreateFile(c *C) { f, err := s.disk.CreateFile("hello1") c.Assert(err, IsNil) - c.Assert(f.Name(), Equals, filepath.Join(s.path, "hello1")) - defer f.Close() + c.Assert(f.Name(), Not(Equals), filepath.Join(s.path, "hello1")) + // close renames the file + f.Close() + + // Openfile should be a success + _, err = s.disk.OpenFile("hello1") + c.Assert(err, IsNil) } func (s *MyDiskSuite) TestDiskOpenFile(c *C) { - f, err := s.disk.CreateFile("hello2") + f1, err := s.disk.CreateFile("hello2") c.Assert(err, IsNil) - c.Assert(f.Name(), Equals, filepath.Join(s.path, "hello2")) - defer f.Close() + c.Assert(f1.Name(), Not(Equals), filepath.Join(s.path, "hello2")) + // close renames the file + f1.Close() - f, err = s.disk.OpenFile("hello2") + f2, err := s.disk.OpenFile("hello2") c.Assert(err, IsNil) - c.Assert(f.Name(), Equals, filepath.Join(s.path, "hello2")) - defer f.Close() + c.Assert(f2.Name(), Equals, filepath.Join(s.path, "hello2")) + defer f2.Close() } diff --git a/pkg/server/nimble/http.go b/pkg/server/nimble/http.go index d1b72f030..4caa2fabe 100644 --- a/pkg/server/nimble/http.go +++ b/pkg/server/nimble/http.go @@ -13,6 +13,7 @@ import ( "os/signal" "sync" "syscall" + "time" "github.com/facebookgo/httpdown" "github.com/minio/minio/pkg/iodine" @@ -23,25 +24,16 @@ var ( ppid = os.Getppid() ) -// An app contains one or more servers and associated configuration. +// An app contains one or more servers and their associated configuration. type app struct { servers []*http.Server - net *nimbleNet listeners []net.Listener sds []httpdown.Server + net *nimbleNet errors chan error } -func newApp(servers []*http.Server) *app { - return &app{ - servers: servers, - net: &nimbleNet{}, - listeners: make([]net.Listener, 0, len(servers)), - sds: make([]httpdown.Server, 0, len(servers)), - errors: make(chan error, 1+(len(servers)*2)), - } -} - +// listen initailize listeners func (a *app) listen() error { for _, s := range a.servers { l, err := a.net.Listen("tcp", s.Addr) @@ -56,17 +48,22 @@ func (a *app) listen() error { return nil } +// serve start serving all listeners func (a *app) serve() { - h := &httpdown.HTTP{} + h := &httpdown.HTTP{ + StopTimeout: 10 * time.Second, + KillTimeout: 1 * time.Second, + } for i, s := range a.servers { a.sds = append(a.sds, h.Serve(s, a.listeners[i])) } } +// wait for http server to signal all requests that have been served func (a *app) wait() { var wg sync.WaitGroup wg.Add(len(a.sds) * 2) // Wait & Stop - go a.signalHandler(&wg) + go a.trapSignal(&wg) for _, s := range a.sds { go func(s httpdown.Server) { defer wg.Done() @@ -78,28 +75,25 @@ func (a *app) wait() { wg.Wait() } -func (a *app) term(wg *sync.WaitGroup) { - for _, s := range a.sds { - go func(s httpdown.Server) { - defer wg.Done() - if err := s.Stop(); err != nil { - a.errors <- iodine.New(err, nil) - } - }(s) - } -} - -func (a *app) signalHandler(wg *sync.WaitGroup) { +// trapSignal wait on listed signals for pre-defined behaviors +func (a *app) trapSignal(wg *sync.WaitGroup) { ch := make(chan os.Signal, 10) signal.Notify(ch, syscall.SIGTERM, syscall.SIGUSR2, syscall.SIGHUP) for { sig := <-ch switch sig { case syscall.SIGTERM: - // this ensures a subsequent TERM will trigger standard go behaviour of - // terminating. + // this ensures a subsequent TERM will trigger standard go behaviour of terminating signal.Stop(ch) - a.term(wg) + // roll through all initialized http servers and stop them + for _, s := range a.sds { + go func(s httpdown.Server) { + defer wg.Done() + if err := s.Stop(); err != nil { + a.errors <- iodine.New(err, nil) + } + }(s) + } return case syscall.SIGUSR2: fallthrough @@ -116,7 +110,13 @@ func (a *app) signalHandler(wg *sync.WaitGroup) { // ListenAndServe will serve the given http.Servers and will monitor for signals // allowing for graceful termination (SIGTERM) or restart (SIGUSR2/SIGHUP). func ListenAndServe(servers ...*http.Server) error { - a := newApp(servers) + a := &app{ + servers: servers, + listeners: make([]net.Listener, 0, len(servers)), + sds: make([]httpdown.Server, 0, len(servers)), + net: &nimbleNet{}, + errors: make(chan error, 1+(len(servers)*2)), + } // Acquire Listeners if err := a.listen(); err != nil { @@ -137,6 +137,8 @@ func ListenAndServe(servers ...*http.Server) error { go func() { defer close(waitdone) a.wait() + // communicate by sending not by closing a channel + waitdone <- struct{}{} }() select { diff --git a/pkg/server/nimble/net.go b/pkg/server/nimble/net.go index b360b85b0..eeaabae0d 100644 --- a/pkg/server/nimble/net.go +++ b/pkg/server/nimble/net.go @@ -37,13 +37,14 @@ var originalWD, _ = os.Getwd() // nimbleNet provides the family of Listen functions and maintains the associated // state. Typically you will have only once instance of nimbleNet per application. type nimbleNet struct { - inherited []net.Listener - active []net.Listener - mutex sync.Mutex - inheritOnce sync.Once + inheritedListeners []net.Listener + activeListeners []net.Listener + mutex sync.Mutex + inheritOnce sync.Once } -func (n *nimbleNet) inherit() error { +// inherit - lookg for LISTEN_FDS in environment variables and populate listeners +func (n *nimbleNet) getInheritedListeners() error { var retErr error n.inheritOnce.Do(func() { n.mutex.Lock() @@ -71,7 +72,7 @@ func (n *nimbleNet) inherit() error { retErr = fmt.Errorf("error closing inherited socket fd %d: %s", i, err) return } - n.inherited = append(n.inherited, l) + n.inheritedListeners = append(n.inheritedListeners, l) } }) return iodine.New(retErr, nil) @@ -104,7 +105,7 @@ func (n *nimbleNet) Listen(nett, laddr string) (net.Listener, error) { // be: "tcp", "tcp4" or "tcp6". It returns an inherited net.Listener for the // matching network and address, or creates a new one using net.ListenTCP. func (n *nimbleNet) ListenTCP(nett string, laddr *net.TCPAddr) (*net.TCPListener, error) { - if err := n.inherit(); err != nil { + if err := n.getInheritedListeners(); err != nil { return nil, iodine.New(err, nil) } @@ -112,13 +113,13 @@ func (n *nimbleNet) ListenTCP(nett string, laddr *net.TCPAddr) (*net.TCPListener defer n.mutex.Unlock() // look for an inherited listener - for i, l := range n.inherited { + for i, l := range n.inheritedListeners { if l == nil { // we nil used inherited listeners continue } if isSameAddr(l.Addr(), laddr) { - n.inherited[i] = nil - n.active = append(n.active, l) + n.inheritedListeners[i] = nil + n.activeListeners = append(n.activeListeners, l) return l.(*net.TCPListener), nil } } @@ -128,7 +129,7 @@ func (n *nimbleNet) ListenTCP(nett string, laddr *net.TCPAddr) (*net.TCPListener if err != nil { return nil, iodine.New(err, nil) } - n.active = append(n.active, l) + n.activeListeners = append(n.activeListeners, l) return l, nil } @@ -136,7 +137,7 @@ func (n *nimbleNet) ListenTCP(nett string, laddr *net.TCPAddr) (*net.TCPListener // must be a: "unix" or "unixpacket". It returns an inherited net.Listener for // the matching network and address, or creates a new one using net.ListenUnix. func (n *nimbleNet) ListenUnix(nett string, laddr *net.UnixAddr) (*net.UnixListener, error) { - if err := n.inherit(); err != nil { + if err := n.getInheritedListeners(); err != nil { return nil, iodine.New(err, nil) } @@ -144,13 +145,13 @@ func (n *nimbleNet) ListenUnix(nett string, laddr *net.UnixAddr) (*net.UnixListe defer n.mutex.Unlock() // look for an inherited listener - for i, l := range n.inherited { + for i, l := range n.inheritedListeners { if l == nil { // we nil used inherited listeners continue } if isSameAddr(l.Addr(), laddr) { - n.inherited[i] = nil - n.active = append(n.active, l) + n.inheritedListeners[i] = nil + n.activeListeners = append(n.activeListeners, l) return l.(*net.UnixListener), nil } } @@ -160,16 +161,16 @@ func (n *nimbleNet) ListenUnix(nett string, laddr *net.UnixAddr) (*net.UnixListe if err != nil { return nil, iodine.New(err, nil) } - n.active = append(n.active, l) + n.activeListeners = append(n.activeListeners, l) return l, nil } // activeListeners returns a snapshot copy of the active listeners. -func (n *nimbleNet) activeListeners() ([]net.Listener, error) { +func (n *nimbleNet) getActiveListeners() ([]net.Listener, error) { n.mutex.Lock() defer n.mutex.Unlock() - ls := make([]net.Listener, len(n.active)) - copy(ls, n.active) + ls := make([]net.Listener, len(n.activeListeners)) + copy(ls, n.activeListeners) return ls, nil } @@ -200,7 +201,7 @@ func isSameAddr(a1, a2 net.Addr) bool { // deployed binary to be started. It returns the pid of the newly started // process when successful. func (n *nimbleNet) StartProcess() (int, error) { - listeners, err := n.activeListeners() + listeners, err := n.getActiveListeners() if err != nil { return 0, iodine.New(err, nil) }