redoctober: simplify the NewServer signature

Remove the unexported type used in the NewServer func so that it may be
used outside this package, which allows for integration style tests that
start an instance of a RO server.

The channel based request locking has been replaced with LimitListener,
which prevents simultaneous handling of TCP connections.
This commit is contained in:
Ben Burkert
2016-01-29 22:38:14 +00:00
parent 434f0b61d7
commit 6f8424ad38

View File

@@ -17,10 +17,11 @@ import (
"net"
"net/http"
"os"
"runtime"
"strings"
"time"
"golang.org/x/net/netutil"
"github.com/cloudflare/redoctober/core"
"github.com/coreos/go-systemd/activation"
)
@@ -52,29 +53,33 @@ type userRequest struct {
// called to handle this request)
}
// queueRequest handles a single request receive on the JSON API for
// one of the functions named in the functions map above. It reads the
// request and sends it to the goroutine started in main() below for
// processing and then waits for the response.
func queueRequest(process chan<- userRequest, requestType string, w http.ResponseWriter, r *http.Request) {
// processRequest handles a single request receive on the JSON API for
// one of the functions named in the functions map above.
func processRequest(requestType string, w http.ResponseWriter, r *http.Request) {
header := w.Header()
header.Set("Content-Type", "application/json")
header.Set("Strict-Transport-Security", "max-age=86400; includeSubDomains; preload")
fn, ok := functions[requestType]
if !ok {
http.Error(w, "Unknown request", http.StatusInternalServerError)
return
}
body, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
response := make(chan []byte)
process <- userRequest{rt: requestType, in: body, resp: response}
if resp, ok := <-response; ok {
header := w.Header()
header.Set("Content-Type", "application/json")
header.Set("Strict-Transport-Security", "max-age=86400; includeSubDomains; preload")
w.Write(resp)
} else {
http.Error(w, "Unknown request", http.StatusInternalServerError)
resp, err := fn(body)
if err != nil {
log.Printf("http.main failed: %s: %s", requestType, err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Write(resp)
}
// NewServer starts an HTTPS server the handles the redoctober JSON
@@ -84,7 +89,7 @@ func queueRequest(process chan<- userRequest, requestType string, w http.Respons
//
// Returns a valid http.Server handling redoctober JSON requests (and
// its associated listener) or an error
func NewServer(process chan<- userRequest, staticPath, addr, caPath string, certPaths, keyPaths []string, useSystemdSocket bool) (*http.Server, net.Listener, error) {
func NewServer(staticPath, addr, caPath string, certPaths, keyPaths []string, useSystemdSocket bool) (*http.Server, net.Listener, error) {
config := &tls.Config{
PreferServerCipherSuites: true,
SessionTicketsDisabled: true,
@@ -151,7 +156,7 @@ func NewServer(process chan<- userRequest, staticPath, addr, caPath string, cert
requestType := current
mux.HandleFunc(requestType, func(w http.ResponseWriter, r *http.Request) {
log.Printf("http.server: endpoint=%s remote=%s", requestType, r.RemoteAddr)
queueRequest(process, requestType, w, r)
processRequest(requestType, w, r)
})
}
@@ -166,6 +171,11 @@ func NewServer(process chan<- userRequest, staticPath, addr, caPath string, cert
TLSConfig: config,
}
// The core package is not safe to be shared across goroutines so
// this supervisor goroutine reads requests from the process
// channel and dispatches them to core for processes.
lstnr = netutil.LimitListener(lstnr, 1)
return &srv, lstnr, nil
}
@@ -235,37 +245,7 @@ func main() {
log.Fatalf(err.Error())
}
runtime.GOMAXPROCS(runtime.NumCPU())
// The core package is not safe to be shared across goroutines so
// this supervisor goroutine reads requests from the process
// channel and dispatches them to core for processes.
process := make(chan userRequest)
go func() {
for {
req := <-process
if f, ok := functions[req.rt]; ok {
r, err := f(req.in)
if err == nil {
req.resp <- r
} else {
log.Printf("http.main failed: %s: %s", req.rt, err)
}
} else {
log.Printf("http.main: request=%s function is not supported", req.rt)
}
// Note that if an error occurs no message is sent down
// the channel and then channel is closed. The
// queueRequest function will see this as indication of an
// error.
close(req.resp)
}
}()
s, l, err := NewServer(process, *staticPath, *addr, *caPath, certPaths, keyPaths, *useSystemdSocket)
s, l, err := NewServer(*staticPath, *addr, *caPath, certPaths, keyPaths, *useSystemdSocket)
if err != nil {
log.Fatalf("Error starting redoctober server: %s\n", err)
}