Files
seaweedfs/test/volume_server/grpc/admin_extra_test.go
Chris Lu 10cc06333b cluster: restrict Ping RPC to known peers of the requested type (#9445)
Ping previously dialled whatever host:port the caller asked for. Gate
each server's Ping handler on cluster membership: masters check the
topology, registered cluster nodes, and configured master peers; volume
servers only accept their seed/current masters; filers accept tracked
peer filers, the master-learned volume server set, and configured
masters.

Use address-indexed peer lookups to keep Ping target validation O(1):
- topology maintains a pb.ServerAddress -> *DataNode index alongside
  the dc/rack/node tree, kept in sync from doLinkChildNode and
  UnlinkChildNode plus the ip/port-rewrite branch in
  GetOrCreateDataNode. GetTopology now returns nil on a detached
  subtree instead of panicking, so the linkage hooks can no-op safely.
- vid_map tracks a refcount per volume-server address so
  hasVolumeServer answers without scanning every vid location. The
  add path skips empty-address entries the same way the delete path
  already does, so a zero-value Location cannot leak a permanent
  serverRefCount[""] bucket.
- masters reuse a cached master-address set from MasterClient instead
  of walking the configured peer slice on every request.
- volume servers compare against a pre-built seed-master set and
  protect currentMaster reads/writes with an RWMutex, fixing the
  data race with the heartbeat goroutine. The seed slice is copied
  on construction so external mutation cannot desync it from the
  frozen lookup set.
- cluster.check drops the direct volume-to-volume sweep; volume
  servers no longer carry a peer-volume list, and the note next to
  the dropped probe is reworded to make clear that direct
  volume-to-volume reachability is intentionally not validated by
  this command.

Update the volume-server integration tests that drove Ping through the
new admission gate: success-path coverage now targets the master peer
(the only type a volume server tracks), and the unknown/unreachable
path asserts the InvalidArgument the gate now returns instead of the
old downstream dial error.

Mirror the same admission gate in the Rust volume server crate: a
seed-master HashSet built once at startup plus a tokio RwLock over the
heartbeat-tracked current master, both consulted in is_known_ping_target
on every Ping, with InvalidArgument returned for any target that isn't
a recognised master.
2026-05-12 13:00:52 -07:00

518 lines
18 KiB
Go

package volume_server_grpc_test
import (
"context"
"io"
"net/http"
"strings"
"testing"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/seaweedfs/seaweedfs/test/volume_server/framework"
"github.com/seaweedfs/seaweedfs/test/volume_server/matrix"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
)
func TestVolumeNeedleStatusForUploadedFile(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
clusterHarness := framework.StartVolumeCluster(t, matrix.P1())
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
defer conn.Close()
const volumeID = uint32(21)
const needleID = uint64(778899)
const cookie = uint32(0xA1B2C3D4)
framework.AllocateVolume(t, grpcClient, volumeID, "")
fid := framework.NewFileID(volumeID, needleID, cookie)
client := framework.NewHTTPClient()
payload := []byte("needle-status-payload")
uploadResp := framework.UploadBytes(t, client, clusterHarness.VolumeAdminURL(), fid, payload)
_ = framework.ReadAllAndClose(t, uploadResp)
if uploadResp.StatusCode != http.StatusCreated {
t.Fatalf("upload status: expected 201, got %d", uploadResp.StatusCode)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
statusResp, err := grpcClient.VolumeNeedleStatus(ctx, &volume_server_pb.VolumeNeedleStatusRequest{
VolumeId: volumeID,
NeedleId: needleID,
})
if err != nil {
t.Fatalf("VolumeNeedleStatus failed: %v", err)
}
if statusResp.GetNeedleId() != needleID {
t.Fatalf("needle id mismatch: got %d want %d", statusResp.GetNeedleId(), needleID)
}
if statusResp.GetCookie() != cookie {
t.Fatalf("cookie mismatch: got %d want %d", statusResp.GetCookie(), cookie)
}
if statusResp.GetSize() == 0 {
t.Fatalf("expected non-zero needle size")
}
}
func TestVolumeNeedleStatusIncludesTtlAndLastModified(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
clusterHarness := framework.StartVolumeCluster(t, matrix.P1())
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
defer conn.Close()
const volumeID = uint32(27)
const needleID = uint64(778901)
const cookie = uint32(0xA1B2C3D6)
framework.AllocateVolume(t, grpcClient, volumeID, "")
fid := framework.NewFileID(volumeID, needleID, cookie)
client := framework.NewHTTPClient()
uploadReq := mustNewRequest(t, http.MethodPost, clusterHarness.VolumeAdminURL()+"/"+fid+"?ttl=7d&ts=1700000000")
uploadReq.Body = io.NopCloser(strings.NewReader("needle-status-ttl-payload"))
uploadReq.ContentLength = int64(len("needle-status-ttl-payload"))
uploadReq.Header.Set("Content-Type", "application/octet-stream")
uploadResp := framework.DoRequest(t, client, uploadReq)
_ = framework.ReadAllAndClose(t, uploadResp)
if uploadResp.StatusCode != http.StatusCreated {
t.Fatalf("upload status: expected 201, got %d", uploadResp.StatusCode)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
statusResp, err := grpcClient.VolumeNeedleStatus(ctx, &volume_server_pb.VolumeNeedleStatusRequest{
VolumeId: volumeID,
NeedleId: needleID,
})
if err != nil {
t.Fatalf("VolumeNeedleStatus with ttl failed: %v", err)
}
// Go's ReadTTL normalizes via fitTtlCount: 7d → 1w (7 days = 1 week)
if statusResp.GetTtl() != "1w" {
t.Fatalf("ttl mismatch: got %q want %q", statusResp.GetTtl(), "1w")
}
if statusResp.GetLastModified() != 1700000000 {
t.Fatalf("last modified mismatch: got %d want %d", statusResp.GetLastModified(), 1700000000)
}
}
func TestVolumeNeedleStatusViaEcShardsWhenNormalVolumeUnmounted(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
clusterHarness := framework.StartVolumeCluster(t, matrix.P1())
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
defer conn.Close()
const volumeID = uint32(26)
const needleID = uint64(778900)
const cookie = uint32(0xA1B2C3D5)
framework.AllocateVolume(t, grpcClient, volumeID, "")
client := framework.NewHTTPClient()
fid := framework.NewFileID(volumeID, needleID, cookie)
payload := []byte("needle-status-ec-path-payload")
uploadResp := framework.UploadBytes(t, client, clusterHarness.VolumeAdminURL(), fid, payload)
_ = framework.ReadAllAndClose(t, uploadResp)
if uploadResp.StatusCode != http.StatusCreated {
t.Fatalf("upload status: expected 201, got %d", uploadResp.StatusCode)
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
_, err := grpcClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{
VolumeId: volumeID,
Collection: "",
})
if err != nil {
t.Fatalf("VolumeEcShardsGenerate failed: %v", err)
}
_, err = grpcClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{
VolumeId: volumeID,
Collection: "",
ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
})
if err != nil {
t.Fatalf("VolumeEcShardsMount data shards failed: %v", err)
}
_, err = grpcClient.VolumeUnmount(ctx, &volume_server_pb.VolumeUnmountRequest{
VolumeId: volumeID,
})
if err != nil {
t.Fatalf("VolumeUnmount failed: %v", err)
}
statusResp, err := grpcClient.VolumeNeedleStatus(ctx, &volume_server_pb.VolumeNeedleStatusRequest{
VolumeId: volumeID,
NeedleId: needleID,
})
if err != nil {
t.Fatalf("VolumeNeedleStatus via EC shards failed: %v", err)
}
if statusResp.GetNeedleId() != needleID {
t.Fatalf("needle id mismatch: got %d want %d", statusResp.GetNeedleId(), needleID)
}
if statusResp.GetCookie() != cookie {
t.Fatalf("cookie mismatch: got %d want %d", statusResp.GetCookie(), cookie)
}
if statusResp.GetSize() == 0 {
t.Fatalf("expected non-zero needle size from EC-backed needle status")
}
_, err = grpcClient.VolumeNeedleStatus(ctx, &volume_server_pb.VolumeNeedleStatusRequest{
VolumeId: volumeID,
NeedleId: needleID + 999999,
})
if err == nil || !strings.Contains(strings.ToLower(err.Error()), "not found") {
t.Fatalf("VolumeNeedleStatus via EC shards missing-needle error mismatch: %v", err)
}
}
func TestVolumeNeedleStatusMissingVolumeAndNeedle(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
clusterHarness := framework.StartVolumeCluster(t, matrix.P1())
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
defer conn.Close()
const volumeID = uint32(25)
framework.AllocateVolume(t, grpcClient, volumeID, "")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_, err := grpcClient.VolumeNeedleStatus(ctx, &volume_server_pb.VolumeNeedleStatusRequest{
VolumeId: 99925,
NeedleId: 1,
})
if err == nil {
t.Fatalf("VolumeNeedleStatus should fail for missing volume")
}
if !strings.Contains(strings.ToLower(err.Error()), "volume not found") {
t.Fatalf("VolumeNeedleStatus missing-volume error mismatch: %v", err)
}
_, err = grpcClient.VolumeNeedleStatus(ctx, &volume_server_pb.VolumeNeedleStatusRequest{
VolumeId: volumeID,
NeedleId: 123456789,
})
if err == nil {
t.Fatalf("VolumeNeedleStatus should fail for missing needle")
}
if !strings.Contains(strings.ToLower(err.Error()), "not found") {
t.Fatalf("VolumeNeedleStatus missing-needle error mismatch: %v", err)
}
}
func mustNewRequest(t testing.TB, method, url string) *http.Request {
t.Helper()
req, err := http.NewRequest(method, url, nil)
if err != nil {
t.Fatalf("create request %s %s: %v", method, url, err)
}
return req
}
func TestVolumeConfigureInvalidReplication(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
clusterHarness := framework.StartVolumeCluster(t, matrix.P1())
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
defer conn.Close()
const volumeID = uint32(22)
framework.AllocateVolume(t, grpcClient, volumeID, "")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
resp, err := grpcClient.VolumeConfigure(ctx, &volume_server_pb.VolumeConfigureRequest{
VolumeId: volumeID,
Replication: "bad-replication",
})
if err != nil {
t.Fatalf("VolumeConfigure returned grpc error: %v", err)
}
if resp.GetError() == "" {
t.Fatalf("VolumeConfigure expected response error for invalid replication")
}
if !strings.Contains(strings.ToLower(resp.GetError()), "replication") {
t.Fatalf("VolumeConfigure error should mention replication, got: %q", resp.GetError())
}
}
func TestVolumeConfigureSuccessAndMissingRollbackPath(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
clusterHarness := framework.StartVolumeCluster(t, matrix.P1())
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
defer conn.Close()
const volumeID = uint32(24)
framework.AllocateVolume(t, grpcClient, volumeID, "")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
successResp, err := grpcClient.VolumeConfigure(ctx, &volume_server_pb.VolumeConfigureRequest{
VolumeId: volumeID,
Replication: "000",
})
if err != nil {
t.Fatalf("VolumeConfigure success path returned grpc error: %v", err)
}
if successResp.GetError() != "" {
t.Fatalf("VolumeConfigure success path expected empty response error, got: %q", successResp.GetError())
}
statusResp, err := grpcClient.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{VolumeId: volumeID})
if err != nil {
t.Fatalf("VolumeStatus after successful configure failed: %v", err)
}
if statusResp.GetIsReadOnly() {
t.Fatalf("VolumeStatus after configure expected writable volume")
}
missingResp, err := grpcClient.VolumeConfigure(ctx, &volume_server_pb.VolumeConfigureRequest{
VolumeId: 99024,
Replication: "000",
})
if err != nil {
t.Fatalf("VolumeConfigure missing-volume branch should return response error, got grpc error: %v", err)
}
if missingResp.GetError() == "" {
t.Fatalf("VolumeConfigure missing-volume expected non-empty response error")
}
lower := strings.ToLower(missingResp.GetError())
if !strings.Contains(lower, "not found on disk") {
t.Fatalf("VolumeConfigure missing-volume error should mention not found on disk, got: %q", missingResp.GetError())
}
if !strings.Contains(lower, "failed to restore mount") {
t.Fatalf("VolumeConfigure missing-volume error should include remount rollback failure, got: %q", missingResp.GetError())
}
}
func TestPingVolumeTargetAndLeaveAffectsHealthz(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
clusterHarness := framework.StartVolumeCluster(t, matrix.P1())
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
defer conn.Close()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Volume servers only track their masters as ping peers, so drive the
// success path through the master target and rely on the master->volume
// admission for cross-type coverage.
pingResp, err := grpcClient.Ping(ctx, &volume_server_pb.PingRequest{
TargetType: cluster.MasterType,
Target: clusterHarness.MasterAddress(),
})
if err != nil {
t.Fatalf("Ping master from volume server failed: %v", err)
}
if pingResp.GetRemoteTimeNs() == 0 {
t.Fatalf("expected remote timestamp from ping master")
}
if _, err = grpcClient.VolumeServerLeave(ctx, &volume_server_pb.VolumeServerLeaveRequest{}); err != nil {
t.Fatalf("VolumeServerLeave failed: %v", err)
}
client := framework.NewHTTPClient()
healthURL := clusterHarness.VolumeAdminURL() + "/healthz"
deadline := time.Now().Add(5 * time.Second)
for {
resp := framework.DoRequest(t, client, mustNewRequest(t, http.MethodGet, healthURL))
_ = framework.ReadAllAndClose(t, resp)
if resp.StatusCode == http.StatusServiceUnavailable {
return
}
if time.Now().After(deadline) {
t.Fatalf("expected healthz to return 503 after leave, got %d", resp.StatusCode)
}
time.Sleep(100 * time.Millisecond)
}
}
func TestVolumeServerLeaveIsIdempotent(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
clusterHarness := framework.StartVolumeCluster(t, matrix.P1())
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
defer conn.Close()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if _, err := grpcClient.VolumeServerLeave(ctx, &volume_server_pb.VolumeServerLeaveRequest{}); err != nil {
t.Fatalf("first VolumeServerLeave failed: %v", err)
}
if _, err := grpcClient.VolumeServerLeave(ctx, &volume_server_pb.VolumeServerLeaveRequest{}); err != nil {
t.Fatalf("second VolumeServerLeave should be idempotent success, got: %v", err)
}
client := framework.NewHTTPClient()
healthURL := clusterHarness.VolumeAdminURL() + "/healthz"
deadline := time.Now().Add(5 * time.Second)
for {
resp := framework.DoRequest(t, client, mustNewRequest(t, http.MethodGet, healthURL))
_ = framework.ReadAllAndClose(t, resp)
if resp.StatusCode == http.StatusServiceUnavailable {
return
}
if time.Now().After(deadline) {
t.Fatalf("expected healthz to stay 503 after repeated leave, got %d", resp.StatusCode)
}
time.Sleep(100 * time.Millisecond)
}
}
func TestPingUnknownAndUnreachableTargetPaths(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
clusterHarness := framework.StartVolumeCluster(t, matrix.P1())
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
defer conn.Close()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// The volume server gates Ping on a known peer list. Unknown target types
// and addresses outside that list are rejected with InvalidArgument
// before any outbound dial is attempted.
_, err := grpcClient.Ping(ctx, &volume_server_pb.PingRequest{
TargetType: "unknown-type",
Target: "127.0.0.1:12345",
})
if err == nil {
t.Fatalf("Ping unknown target type should be rejected by admission")
}
if got := status.Code(err); got != codes.InvalidArgument {
t.Fatalf("Ping unknown target type expected InvalidArgument, got %s: %v", got, err)
}
// Empty target stays as an unauthenticated self-liveness probe.
emptyTargetResp, err := grpcClient.Ping(ctx, &volume_server_pb.PingRequest{})
if err != nil {
t.Fatalf("Ping empty target should not return grpc error, got: %v", err)
}
if emptyTargetResp.GetRemoteTimeNs() != 0 {
t.Fatalf("Ping empty target expected remote_time_ns=0, got %d", emptyTargetResp.GetRemoteTimeNs())
}
if emptyTargetResp.GetStopTimeNs() < emptyTargetResp.GetStartTimeNs() {
t.Fatalf("Ping empty target expected stop_time_ns >= start_time_ns")
}
// 127.0.0.1:1 is not in the seed/current master set, so admission rejects
// the call before it can reach the network.
_, err = grpcClient.Ping(ctx, &volume_server_pb.PingRequest{
TargetType: cluster.MasterType,
Target: "127.0.0.1:1",
})
if err == nil {
t.Fatalf("Ping master target should be rejected when not in the known-peer set")
}
if got := status.Code(err); got != codes.InvalidArgument {
t.Fatalf("Ping unknown master expected InvalidArgument, got %s: %v", got, err)
}
// Volume servers do not carry a peer-filer list at all; any filer target
// is rejected at admission regardless of reachability.
_, err = grpcClient.Ping(ctx, &volume_server_pb.PingRequest{
TargetType: cluster.FilerType,
Target: "127.0.0.1:1",
})
if err == nil {
t.Fatalf("Ping filer target should be rejected by admission")
}
if got := status.Code(err); got != codes.InvalidArgument {
t.Fatalf("Ping filer expected InvalidArgument, got %s: %v", got, err)
}
}
func TestPingMasterTargetSuccess(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
clusterHarness := framework.StartVolumeCluster(t, matrix.P1())
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
defer conn.Close()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
resp, err := grpcClient.Ping(ctx, &volume_server_pb.PingRequest{
TargetType: cluster.MasterType,
Target: clusterHarness.MasterAddress(),
})
if err != nil {
t.Fatalf("Ping master target success path failed: %v", err)
}
if resp.GetRemoteTimeNs() == 0 {
t.Fatalf("Ping master target expected non-zero remote time")
}
if resp.GetStopTimeNs() < resp.GetStartTimeNs() {
t.Fatalf("Ping master target expected stop >= start, got start=%d stop=%d", resp.GetStartTimeNs(), resp.GetStopTimeNs())
}
}
func TestPingFilerTargetSuccess(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
// Volume servers do not maintain a peer-filer index, so drive the
// filer-presence success path through the volume server's master peer:
// reaching that master in a filer-bearing cluster is sufficient signal
// that filer joined the cluster without breaking volume-server ping.
clusterHarness := framework.StartSingleVolumeClusterWithFiler(t, matrix.P1())
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
defer conn.Close()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
resp, err := grpcClient.Ping(ctx, &volume_server_pb.PingRequest{
TargetType: cluster.MasterType,
Target: clusterHarness.MasterAddress(),
})
if err != nil {
t.Fatalf("Ping master from filer-bearing cluster failed: %v", err)
}
if resp.GetRemoteTimeNs() == 0 {
t.Fatalf("Ping master expected non-zero remote time")
}
if resp.GetStopTimeNs() < resp.GetStartTimeNs() {
t.Fatalf("Ping master expected stop >= start, got start=%d stop=%d", resp.GetStartTimeNs(), resp.GetStopTimeNs())
}
}