Implement list, clear locks REST API w/ pkg/madmin support (#3491)

* Filter lock info based on bucket, prefix and time since lock was held
* Implement list and clear locks REST API
* madmin: Add list and clear locks API
* locks: Clear locks matching bucket, prefix, relTime.
* Gather lock information across nodes for both list and clear locks admin REST API.
* docs: Add lock API to management APIs
This commit is contained in:
Krishnan Parthasarathi
2017-01-04 13:09:22 +05:30
committed by Harshavardhana
parent cae62ce543
commit c8f57133a4
20 changed files with 1039 additions and 90 deletions

View File

@@ -20,6 +20,7 @@ import (
"net/url"
"path"
"sync"
"time"
)
// localAdminClient - represents admin operation to be executed locally.
@@ -32,11 +33,12 @@ type remoteAdminClient struct {
*AuthRPCClient
}
// stopRestarter - abstracts stop and restart operations for both
// local and remote execution.
type stopRestarter interface {
// adminCmdRunner - abstracts local and remote execution of admin
// commands like service stop and service restart.
type adminCmdRunner interface {
Stop() error
Restart() error
ListLocks(bucket, prefix string, relTime time.Duration) ([]VolumeLockInfo, error)
}
// Stop - Sends a message over channel to the go-routine responsible
@@ -53,24 +55,43 @@ func (lc localAdminClient) Restart() error {
return nil
}
// ListLocks - Fetches lock information from local lock instrumentation.
func (lc localAdminClient) ListLocks(bucket, prefix string, relTime time.Duration) ([]VolumeLockInfo, error) {
return listLocksInfo(bucket, prefix, relTime), nil
}
// Stop - Sends stop command to remote server via RPC.
func (rc remoteAdminClient) Stop() error {
args := AuthRPCArgs{}
reply := AuthRPCReply{}
return rc.Call("Service.Shutdown", &args, &reply)
return rc.Call("Admin.Shutdown", &args, &reply)
}
// Restart - Sends restart command to remote server via RPC.
func (rc remoteAdminClient) Restart() error {
args := AuthRPCArgs{}
reply := AuthRPCReply{}
return rc.Call("Service.Restart", &args, &reply)
return rc.Call("Admin.Restart", &args, &reply)
}
// ListLocks - Sends list locks command to remote server via RPC.
func (rc remoteAdminClient) ListLocks(bucket, prefix string, relTime time.Duration) ([]VolumeLockInfo, error) {
listArgs := ListLocksQuery{
bucket: bucket,
prefix: prefix,
relTime: relTime,
}
var reply ListLocksReply
if err := rc.Call("Admin.ListLocks", &listArgs, &reply); err != nil {
return nil, err
}
return reply.volLocks, nil
}
// adminPeer - represents an entity that implements Stop and Restart methods.
type adminPeer struct {
addr string
svcClnt stopRestarter
addr string
cmdRunner adminCmdRunner
}
// type alias for a collection of adminPeer.
@@ -105,13 +126,13 @@ func makeAdminPeers(eps []*url.URL) adminPeers {
secretKey: serverCred.SecretKey,
serverAddr: ep.Host,
secureConn: isSSL(),
serviceEndpoint: path.Join(reservedBucket, servicePath),
serviceName: "Service",
serviceEndpoint: path.Join(reservedBucket, adminPath),
serviceName: "Admin",
}
servicePeers = append(servicePeers, adminPeer{
addr: ep.Host,
svcClnt: &remoteAdminClient{newAuthRPCClient(cfg)},
addr: ep.Host,
cmdRunner: &remoteAdminClient{newAuthRPCClient(cfg)},
})
seenAddr[ep.Host] = true
}
@@ -129,9 +150,9 @@ func initGlobalAdminPeers(eps []*url.URL) {
func invokeServiceCmd(cp adminPeer, cmd serviceSignal) (err error) {
switch cmd {
case serviceStop:
err = cp.svcClnt.Stop()
err = cp.cmdRunner.Stop()
case serviceRestart:
err = cp.svcClnt.Restart()
err = cp.cmdRunner.Restart()
}
return err
}
@@ -147,9 +168,58 @@ func sendServiceCmd(cps adminPeers, cmd serviceSignal) {
wg.Add(1)
go func(idx int) {
defer wg.Done()
errs[idx] = invokeServiceCmd(remotePeers[idx], cmd)
// we use idx+1 because remotePeers slice is 1 position shifted w.r.t cps
errs[idx+1] = invokeServiceCmd(remotePeers[idx], cmd)
}(i)
}
wg.Wait()
errs[0] = invokeServiceCmd(cps[0], cmd)
}
func listPeerLocksInfo(peers adminPeers, bucket, prefix string, relTime time.Duration) ([]VolumeLockInfo, error) {
// Used to aggregate volume lock information from all nodes.
allLocks := make([][]VolumeLockInfo, len(peers))
errs := make([]error, len(peers))
var wg sync.WaitGroup
localPeer := peers[0]
remotePeers := peers[1:]
for i, remotePeer := range remotePeers {
wg.Add(1)
go func(idx int, remotePeer adminPeer) {
defer wg.Done()
// `remotePeers` is right-shifted by one position relative to `peers`
allLocks[idx], errs[idx] = remotePeer.cmdRunner.ListLocks(bucket, prefix, relTime)
}(i+1, remotePeer)
}
wg.Wait()
allLocks[0], errs[0] = localPeer.cmdRunner.ListLocks(bucket, prefix, relTime)
// Summarizing errors received for ListLocks RPC across all
// nodes. N B the possible unavailability of quorum in errors
// applies only to distributed setup.
errCount, err := reduceErrs(errs, []error{})
if err != nil {
if errCount >= (len(peers)/2 + 1) {
return nil, err
}
return nil, InsufficientReadQuorum{}
}
// Group lock information across nodes by (bucket, object)
// pair. For readability only.
paramLockMap := make(map[nsParam][]VolumeLockInfo)
for _, nodeLocks := range allLocks {
for _, lockInfo := range nodeLocks {
param := nsParam{
volume: lockInfo.Bucket,
path: lockInfo.Object,
}
paramLockMap[param] = append(paramLockMap[param], lockInfo)
}
}
groupedLockInfos := []VolumeLockInfo{}
for _, volLocks := range paramLockMap {
groupedLockInfos = append(groupedLockInfos, volLocks...)
}
return groupedLockInfos, nil
}