mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-14 05:41:29 +00:00
* fix(ec): verify full shard set before deleting source volume (#9490) Before this change, both the worker EC task and the shell ec.encode command would delete the source .dat as soon as MountEcShards returned — even if distribute/mount failed partway, leaving fewer than 14 shards in the cluster. The deletion was logged at V(2), so by the time someone noticed missing data the only trace was a 0-byte .dat synthesized by disk_location at next restart. - Worker path adds Step 6: poll VolumeEcShardsInfo on every destination, union the bitmaps, and refuse to call deleteOriginalVolume unless all TotalShardsCount distinct shard ids are observed. A failed gate leaves the source readonly so the next detection scan can retry. - Shell ec.encode adds the same gate after EcBalance, walking the master topology with collectEcNodeShardsInfo. - VolumeDelete RPC success and .dat/.idx unlinks now log at V(0) so any source destruction is traceable in default-verbosity production logs. The EC-balance-vs-in-flight-encode race is intentionally left for a follow-up; balance should refuse to move shards for a volume whose encode job is not in Completed state. * fix(ec): trim doc comments on the new shard-verification path Drop WHAT-describing godoc on freshly added helpers; keep only the WHY notes (query-error policy in VerifyShardsAcrossServers, the #9490 reference at the call sites). * fix(ec): drop issue-number anchors from new comments Issue references age poorly — the why behind each comment already stands on its own. * fix(ec): parametrize RequireFullShardSet on totalShards Take totalShards as an argument instead of reading the package-level TotalShardsCount constant. The OSS callers continue to pass 14, but the helper is now usable with any DataShards+ParityShards ratio. * test(plugin_workers): make fake volume server respond to VolumeEcShardsInfo The new pre-delete verification gate calls VolumeEcShardsInfo on every destination after mount, and the fake server's UnimplementedVolumeServer returns Unimplemented — the verifier read that as zero shards on every node and aborted source deletion. Build the response from recorded mount requests so the integration test exercises the gate end-to-end. * fix(rust/volume): log .dat/.idx unlink with size in remove_volume_files Mirror the Go-side change in weed/storage/volume_write.go: stat each file before removing and emit an info-level log for .dat/.idx so a destructive call is always traceable. The OSS Rust crate previously unlinked them silently. * fix(ec/decode): verify regenerated .dat before deleting EC shards After mountDecodedVolume succeeds, the previous code immediately unmounts and deletes every EC shard. A silent failure in generate or mount could leave the cluster with neither shards nor a valid normal volume. Probe ReadVolumeFileStatus on the target and refuse to proceed if dat or idx is 0 bytes. Also make the fake volume server's VolumeEcShardsInfo reflect whichever shard files exist on disk (seeded for tests as well as mounted via RPC), so the new gate can be exercised end-to-end. * fix(ec): address PR review nits in verification + fake server - Drop unused ServerShardInventory.Sizes field. - Skip shard ids >= MaxShardCount before bitmap Set so the ShardBits bound is explicit (Set already no-ops on overflow, this is for clarity). - Nil-guard the fake server's VolumeEcShardsInfo so a malformed call doesn't panic the test process.
507 lines
15 KiB
Go
507 lines
15 KiB
Go
package weed_server
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/peer"
|
|
"google.golang.org/grpc/status"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/util/version"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/storage"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/cluster"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/stats"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
)
|
|
|
|
// checkGrpcAdminAuth verifies the gRPC caller is authorized for destructive
|
|
// admin operations by checking the peer address against the guard's whitelist.
|
|
//
|
|
// IP extraction prefers a typed *net.TCPAddr where available, falling back to
|
|
// SplitHostPort on the string form, then to the raw string. The fallback
|
|
// chain matters because in-process/passthrough connections used in tests
|
|
// surface as unparseable strings like "@"; with an empty whitelist the
|
|
// allow-all branch in IsWhiteListed accepts them, with a whitelist they're
|
|
// denied as expected.
|
|
//
|
|
// Failed authorization attempts are logged so an operator running with a
|
|
// configured whitelist can spot misconfigured callers and probe attempts.
|
|
func (vs *VolumeServer) checkGrpcAdminAuth(ctx context.Context) error {
|
|
if vs.guard == nil {
|
|
return nil
|
|
}
|
|
pr, ok := peer.FromContext(ctx)
|
|
if !ok {
|
|
// Real gRPC connections always populate peer info; if we don't know
|
|
// who the caller is, deny.
|
|
glog.V(0).Infof("gRPC admin auth failed: no peer info")
|
|
return status.Error(codes.PermissionDenied, "no peer info")
|
|
}
|
|
addr := pr.Addr.String()
|
|
var host string
|
|
if tcpAddr, ok := pr.Addr.(*net.TCPAddr); ok {
|
|
host = tcpAddr.IP.String()
|
|
} else if h, _, splitErr := net.SplitHostPort(addr); splitErr == nil {
|
|
host = h
|
|
} else {
|
|
host = addr
|
|
}
|
|
if !vs.guard.IsWhiteListed(host) {
|
|
glog.V(0).Infof("gRPC admin auth failed: %s is not whitelisted (remote: %s)", host, addr)
|
|
return status.Errorf(codes.PermissionDenied, "not authorized: %s", host)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (vs *VolumeServer) DeleteCollection(ctx context.Context, req *volume_server_pb.DeleteCollectionRequest) (*volume_server_pb.DeleteCollectionResponse, error) {
|
|
|
|
resp := &volume_server_pb.DeleteCollectionResponse{}
|
|
|
|
if err := vs.checkGrpcAdminAuth(ctx); err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
err := vs.store.DeleteCollection(req.Collection)
|
|
|
|
if err != nil {
|
|
glog.Errorf("delete collection %s: %v", req.Collection, err)
|
|
} else {
|
|
glog.V(2).Infof("delete collection %v", req)
|
|
}
|
|
|
|
return resp, err
|
|
|
|
}
|
|
|
|
func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_pb.AllocateVolumeRequest) (*volume_server_pb.AllocateVolumeResponse, error) {
|
|
resp := &volume_server_pb.AllocateVolumeResponse{}
|
|
|
|
if err := vs.checkGrpcAdminAuth(ctx); err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
if err := vs.CheckMaintenanceMode(); err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
err := vs.store.AddVolume(
|
|
needle.VolumeId(req.VolumeId),
|
|
req.Collection,
|
|
vs.needleMapKind,
|
|
req.Replication,
|
|
req.Ttl,
|
|
req.Preallocate,
|
|
needle.Version(req.Version),
|
|
req.MemoryMapMaxSizeMb,
|
|
types.ToDiskType(req.DiskType),
|
|
vs.ldbTimout,
|
|
)
|
|
|
|
if err != nil {
|
|
glog.Errorf("assign volume %v: %v", req, err)
|
|
} else {
|
|
glog.V(2).Infof("assign volume %v", req)
|
|
}
|
|
|
|
return resp, err
|
|
|
|
}
|
|
|
|
func (vs *VolumeServer) VolumeMount(ctx context.Context, req *volume_server_pb.VolumeMountRequest) (*volume_server_pb.VolumeMountResponse, error) {
|
|
|
|
resp := &volume_server_pb.VolumeMountResponse{}
|
|
|
|
if err := vs.checkGrpcAdminAuth(ctx); err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
err := vs.store.MountVolume(needle.VolumeId(req.VolumeId))
|
|
|
|
if err != nil {
|
|
glog.Errorf("volume mount %v: %v", req, err)
|
|
} else {
|
|
glog.V(2).Infof("volume mount %v", req)
|
|
}
|
|
|
|
return resp, err
|
|
|
|
}
|
|
|
|
func (vs *VolumeServer) VolumeUnmount(ctx context.Context, req *volume_server_pb.VolumeUnmountRequest) (*volume_server_pb.VolumeUnmountResponse, error) {
|
|
|
|
resp := &volume_server_pb.VolumeUnmountResponse{}
|
|
|
|
if err := vs.checkGrpcAdminAuth(ctx); err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId))
|
|
|
|
if err != nil {
|
|
glog.Errorf("volume unmount %v: %v", req, err)
|
|
} else {
|
|
glog.V(2).Infof("volume unmount %v", req)
|
|
}
|
|
|
|
return resp, err
|
|
|
|
}
|
|
|
|
func (vs *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb.VolumeDeleteRequest) (*volume_server_pb.VolumeDeleteResponse, error) {
|
|
resp := &volume_server_pb.VolumeDeleteResponse{}
|
|
|
|
if err := vs.checkGrpcAdminAuth(ctx); err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
if err := vs.CheckMaintenanceMode(); err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId), req.OnlyEmpty, req.KeepRemoteData)
|
|
|
|
if err != nil {
|
|
glog.Errorf("volume delete %v: %v", req, err)
|
|
} else {
|
|
// V(0) so destructive RPCs are always traceable.
|
|
glog.Infof("volume delete %v", req)
|
|
}
|
|
|
|
return resp, err
|
|
|
|
}
|
|
|
|
func (vs *VolumeServer) VolumeConfigure(ctx context.Context, req *volume_server_pb.VolumeConfigureRequest) (*volume_server_pb.VolumeConfigureResponse, error) {
|
|
resp := &volume_server_pb.VolumeConfigureResponse{}
|
|
|
|
if err := vs.checkGrpcAdminAuth(ctx); err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
if err := vs.CheckMaintenanceMode(); err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
// check replication format
|
|
if _, err := super_block.NewReplicaPlacementFromString(req.Replication); err != nil {
|
|
resp.Error = fmt.Sprintf("volume configure replication %v: %v", req, err)
|
|
return resp, nil
|
|
}
|
|
|
|
// unmount
|
|
if err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId)); err != nil {
|
|
glog.Errorf("volume configure unmount %v: %v", req, err)
|
|
resp.Error = fmt.Sprintf("volume configure unmount %v: %v", req, err)
|
|
return resp, nil
|
|
}
|
|
|
|
// modify the volume info file
|
|
if err := vs.store.ConfigureVolume(needle.VolumeId(req.VolumeId), req.Replication); err != nil {
|
|
glog.Errorf("volume configure %v: %v", req, err)
|
|
resp.Error = fmt.Sprintf("volume configure %v: %v", req, err)
|
|
// Try to re-mount to restore the volume state
|
|
if mountErr := vs.store.MountVolume(needle.VolumeId(req.VolumeId)); mountErr != nil {
|
|
glog.Errorf("volume configure failed to restore mount %v: %v", req, mountErr)
|
|
resp.Error += fmt.Sprintf(". Also failed to restore mount: %v", mountErr)
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
// mount
|
|
if err := vs.store.MountVolume(needle.VolumeId(req.VolumeId)); err != nil {
|
|
glog.Errorf("volume configure mount %v: %v", req, err)
|
|
resp.Error = fmt.Sprintf("volume configure mount %v: %v", req, err)
|
|
return resp, nil
|
|
}
|
|
|
|
return resp, nil
|
|
|
|
}
|
|
|
|
func (vs *VolumeServer) makeVolumeReadonly(ctx context.Context, v *storage.Volume, persist bool) error {
|
|
if err := vs.CheckMaintenanceMode(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// step 1: stop master from redirecting traffic here
|
|
if err := vs.notifyMasterVolumeReadonly(ctx, v, true); err != nil {
|
|
return err
|
|
}
|
|
|
|
// rare case 1.5: it will be unlucky if heartbeat happened between step 1 and 2.
|
|
|
|
// step 2: mark local volume as readonly
|
|
if err := vs.store.MarkVolumeReadonly(v.Id, persist); err != nil {
|
|
glog.Errorf("mark volume %d readonly: %v", v.Id, err)
|
|
return err
|
|
} else {
|
|
glog.V(2).Infof("volume %d marked readonly", v.Id)
|
|
}
|
|
|
|
// step 3: tell master from redirecting traffic here again, to prevent rare case 1.5
|
|
if err := vs.notifyMasterVolumeReadonly(ctx, v, true); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (vs *VolumeServer) makeVolumeWritable(ctx context.Context, v *storage.Volume) error {
|
|
if err := vs.CheckMaintenanceMode(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := vs.store.MarkVolumeWritable(v.Id); err != nil {
|
|
glog.Errorf("mark volume %d writable: %v", v.Id, err)
|
|
return err
|
|
} else {
|
|
glog.V(2).Infof("volume %d marked writable", v.Id)
|
|
}
|
|
|
|
// enable master to redirect traffic here
|
|
if err := vs.notifyMasterVolumeReadonly(ctx, v, false); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (vs *VolumeServer) notifyMasterVolumeReadonly(ctx context.Context, v *storage.Volume, isReadOnly bool) error {
|
|
if grpcErr := pb.WithMasterClient(false, vs.GetMaster(ctx), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
|
|
_, err := client.VolumeMarkReadonly(context.Background(), &master_pb.VolumeMarkReadonlyRequest{
|
|
Ip: vs.store.Ip,
|
|
Port: uint32(vs.store.Port),
|
|
VolumeId: uint32(v.Id),
|
|
Collection: v.Collection,
|
|
ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
|
|
Ttl: v.Ttl.ToUint32(),
|
|
DiskType: string(v.DiskType()),
|
|
IsReadonly: isReadOnly,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("set volume %d to read only on master: %v", v.Id, err)
|
|
}
|
|
return nil
|
|
}); grpcErr != nil {
|
|
glog.V(0).Infof("connect to %s: %v", vs.GetMaster(context.Background()), grpcErr)
|
|
return fmt.Errorf("grpc VolumeMarkReadonly with master %s: %v", vs.GetMaster(context.Background()), grpcErr)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_server_pb.VolumeMarkReadonlyRequest) (*volume_server_pb.VolumeMarkReadonlyResponse, error) {
|
|
resp := &volume_server_pb.VolumeMarkReadonlyResponse{}
|
|
|
|
if err := vs.checkGrpcAdminAuth(ctx); err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
|
|
if v == nil {
|
|
return resp, fmt.Errorf("volume %d not found", req.VolumeId)
|
|
}
|
|
|
|
if err := vs.makeVolumeReadonly(ctx, v, req.GetPersist()); err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func (vs *VolumeServer) VolumeMarkWritable(ctx context.Context, req *volume_server_pb.VolumeMarkWritableRequest) (*volume_server_pb.VolumeMarkWritableResponse, error) {
|
|
resp := &volume_server_pb.VolumeMarkWritableResponse{}
|
|
|
|
if err := vs.checkGrpcAdminAuth(ctx); err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
|
|
if v == nil {
|
|
return resp, fmt.Errorf("volume %d not found", req.VolumeId)
|
|
}
|
|
|
|
if err := vs.makeVolumeWritable(ctx, v); err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func (vs *VolumeServer) VolumeStatus(ctx context.Context, req *volume_server_pb.VolumeStatusRequest) (*volume_server_pb.VolumeStatusResponse, error) {
|
|
|
|
resp := &volume_server_pb.VolumeStatusResponse{}
|
|
|
|
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
|
|
if v == nil {
|
|
return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
|
|
}
|
|
if v.DataBackend == nil {
|
|
return nil, fmt.Errorf("volume %d data backend not found", req.VolumeId)
|
|
}
|
|
|
|
volumeSize, _, _ := v.DataBackend.GetStat()
|
|
resp.IsReadOnly = v.IsReadOnly()
|
|
resp.VolumeSize = uint64(volumeSize)
|
|
resp.FileCount = v.FileCount()
|
|
resp.FileDeletedCount = v.DeletedCount()
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_server_pb.VolumeServerStatusRequest) (*volume_server_pb.VolumeServerStatusResponse, error) {
|
|
|
|
resp := &volume_server_pb.VolumeServerStatusResponse{
|
|
State: vs.store.State.Proto(),
|
|
MemoryStatus: stats.MemStat(),
|
|
Version: version.Version(),
|
|
DataCenter: vs.dataCenter,
|
|
Rack: vs.rack,
|
|
}
|
|
|
|
for _, loc := range vs.store.Locations {
|
|
if dir, e := filepath.Abs(loc.Directory); e == nil {
|
|
resp.DiskStatuses = append(resp.DiskStatuses, stats.NewDiskStatus(dir))
|
|
}
|
|
}
|
|
|
|
return resp, nil
|
|
|
|
}
|
|
|
|
func (vs *VolumeServer) VolumeServerLeave(ctx context.Context, req *volume_server_pb.VolumeServerLeaveRequest) (*volume_server_pb.VolumeServerLeaveResponse, error) {
|
|
|
|
resp := &volume_server_pb.VolumeServerLeaveResponse{}
|
|
|
|
if err := vs.checkGrpcAdminAuth(ctx); err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
vs.StopHeartbeat()
|
|
|
|
return resp, nil
|
|
|
|
}
|
|
|
|
func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_server_pb.VolumeNeedleStatusRequest) (*volume_server_pb.VolumeNeedleStatusResponse, error) {
|
|
|
|
resp := &volume_server_pb.VolumeNeedleStatusResponse{}
|
|
|
|
if err := vs.checkGrpcAdminAuth(ctx); err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
volumeId := needle.VolumeId(req.VolumeId)
|
|
|
|
n := &needle.Needle{
|
|
Id: types.NeedleId(req.NeedleId),
|
|
}
|
|
|
|
var count int
|
|
var err error
|
|
hasVolume := vs.store.HasVolume(volumeId)
|
|
if !hasVolume {
|
|
_, hasEcVolume := vs.store.FindEcVolume(volumeId)
|
|
if !hasEcVolume {
|
|
return nil, fmt.Errorf("volume not found %d", req.VolumeId)
|
|
}
|
|
count, err = vs.store.ReadEcShardNeedle(volumeId, n, nil)
|
|
} else {
|
|
count, err = vs.store.ReadVolumeNeedle(volumeId, n, nil, nil)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if count < 0 {
|
|
return nil, fmt.Errorf("needle not found %d", n.Id)
|
|
}
|
|
|
|
resp.NeedleId = uint64(n.Id)
|
|
resp.Cookie = uint32(n.Cookie)
|
|
resp.Size = uint32(n.Size)
|
|
resp.LastModified = n.LastModified
|
|
resp.Crc = n.Checksum.Value()
|
|
if n.HasTtl() {
|
|
resp.Ttl = n.Ttl.String()
|
|
}
|
|
return resp, nil
|
|
|
|
}
|
|
|
|
// isKnownPingTarget reports whether target is a master this volume server
|
|
// already knows about. Volume servers do not maintain a peer-volume or
|
|
// peer-filer list, so Ping is scoped to the masters they heartbeat with.
|
|
// The current-master read is taken under a lock to avoid racing with the
|
|
// heartbeat goroutine that rewrites it on leader changes, and the seed
|
|
// list is consulted via a pre-built set so the check stays O(1).
|
|
func (vs *VolumeServer) isKnownPingTarget(target string, targetType string) bool {
|
|
if targetType != cluster.MasterType {
|
|
return false
|
|
}
|
|
addr := pb.ServerAddress(target)
|
|
key := addr.ToHttpAddress()
|
|
if key == "" {
|
|
return false
|
|
}
|
|
if current := vs.getCurrentMaster(); current != "" && current.ToHttpAddress() == key {
|
|
return true
|
|
}
|
|
_, ok := vs.seedMasterSet[key]
|
|
return ok
|
|
}
|
|
|
|
func (vs *VolumeServer) Ping(ctx context.Context, req *volume_server_pb.PingRequest) (resp *volume_server_pb.PingResponse, pingErr error) {
|
|
resp = &volume_server_pb.PingResponse{
|
|
StartTimeNs: time.Now().UnixNano(),
|
|
}
|
|
// Empty target is a self-liveness probe and stays unauthenticated.
|
|
if req.Target != "" && !vs.isKnownPingTarget(req.Target, req.TargetType) {
|
|
resp.StopTimeNs = time.Now().UnixNano()
|
|
return resp, status.Errorf(codes.InvalidArgument, "unknown ping target %s of type %s", req.Target, req.TargetType)
|
|
}
|
|
if req.TargetType == cluster.FilerType {
|
|
pingErr = pb.WithFilerClient(false, 0, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
|
pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{})
|
|
if pingResp != nil {
|
|
resp.RemoteTimeNs = pingResp.StartTimeNs
|
|
}
|
|
return err
|
|
})
|
|
}
|
|
if req.TargetType == cluster.VolumeServerType {
|
|
pingErr = pb.WithVolumeServerClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
|
|
pingResp, err := client.Ping(ctx, &volume_server_pb.PingRequest{})
|
|
if pingResp != nil {
|
|
resp.RemoteTimeNs = pingResp.StartTimeNs
|
|
}
|
|
return err
|
|
})
|
|
}
|
|
if req.TargetType == cluster.MasterType {
|
|
pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
|
|
pingResp, err := client.Ping(ctx, &master_pb.PingRequest{})
|
|
if pingResp != nil {
|
|
resp.RemoteTimeNs = pingResp.StartTimeNs
|
|
}
|
|
return err
|
|
})
|
|
}
|
|
if pingErr != nil {
|
|
pingErr = fmt.Errorf("ping %s %s: %v", req.TargetType, req.Target, pingErr)
|
|
}
|
|
resp.StopTimeNs = time.Now().UnixNano()
|
|
return
|
|
}
|