diff --git a/weed/worker/client.go b/weed/worker/client.go index 963771a6b..f2b4a37ba 100644 --- a/weed/worker/client.go +++ b/weed/worker/client.go @@ -142,11 +142,14 @@ out: req.Resp <- nil continue } - if state.streamFailed == nil || state.regWait == nil { + // Capture channel pointers to avoid race condition with reconnect + streamFailedCh := state.streamFailed + regWaitCh := state.regWait + if streamFailedCh == nil || regWaitCh == nil { req.Resp <- fmt.Errorf("stream not ready for registration") continue } - err := c.sendRegistration(req.Worker, state.streamFailed, state.regWait) + err := c.sendRegistration(req.Worker, streamFailedCh, regWaitCh) req.Resp <- err case ActionQueryConnected: respCh := cmd.data.(chan bool) @@ -241,6 +244,11 @@ func (c *GrpcAdminClient) attemptConnection(s *grpcState) error { // Send registration via the normal outgoing channel and wait for response via incoming if err := c.sendRegistration(s.lastWorkerInfo, s.streamFailed, s.regWait); err != nil { c.safeCloseChannel(&s.streamExit) + c.safeCloseChannel(&s.streamFailed) + if s.regWait != nil { + close(s.regWait) + s.regWait = nil + } s.streamCancel() s.conn.Close() s.connected = false @@ -510,7 +518,10 @@ func (c *GrpcAdminClient) handleDisconnect(cmd grpcCommand, s *grpcState) { // Send shutdown signal to stop handlers loop c.safeCloseChannel(&s.streamExit) c.safeCloseChannel(&s.streamFailed) - s.regWait = nil + if s.regWait != nil { + close(s.regWait) + s.regWait = nil + } // Cancel stream context if s.streamCancel != nil { @@ -583,7 +594,7 @@ func (c *GrpcAdminClient) sendRegistration(worker *types.WorkerData, streamFaile select { case regResp := <-regWait: if regResp == nil { - return fmt.Errorf("registration timeout: registration channel closed") + return fmt.Errorf("registration failed: channel closed unexpectedly") } if regResp.Success { glog.Infof("Worker registered successfully: %s", regResp.Message) @@ -591,9 +602,9 @@ func (c *GrpcAdminClient) sendRegistration(worker *types.WorkerData, streamFaile } return fmt.Errorf("registration failed: %s", regResp.Message) case <-streamFailed: - return fmt.Errorf("registration timeout: stream closed by server") + return fmt.Errorf("registration failed: stream closed by server") case <-timeout.C: - return fmt.Errorf("registration timeout") + return fmt.Errorf("registration failed: timeout waiting for response") } } }