// Copyright 2020 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // The go-ethereum library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . package node import ( "compress/gzip" "context" "fmt" "io" "io/ioutil" "net" "net/http" "sort" "strings" "sync" "sync/atomic" "github.com/ledgerwatch/turbo-geth/log" "github.com/ledgerwatch/turbo-geth/rpc" "github.com/rs/cors" ) // httpConfig is the JSON-RPC/HTTP configuration. type httpConfig struct { Modules []string CorsAllowedOrigins []string Vhosts []string } // wsConfig is the JSON-RPC/Websocket configuration type wsConfig struct { Origins []string Modules []string } type rpcHandler struct { http.Handler server *rpc.Server } type httpServer struct { log log.Logger timeouts rpc.HTTPTimeouts mux http.ServeMux // registered handlers go here mu sync.Mutex server *http.Server listener net.Listener // non-nil when server is running // HTTP RPC handler things. httpConfig httpConfig httpHandler atomic.Value // *rpcHandler // WebSocket handler things. wsConfig wsConfig wsHandler atomic.Value // *rpcHandler // These are set by setListenAddr. endpoint string host string port int handlerNames map[string]string } func newHTTPServer(log log.Logger, timeouts rpc.HTTPTimeouts) *httpServer { h := &httpServer{log: log, timeouts: timeouts, handlerNames: make(map[string]string)} h.httpHandler.Store((*rpcHandler)(nil)) h.wsHandler.Store((*rpcHandler)(nil)) return h } // setListenAddr configures the listening address of the server. // The address can only be set while the server isn't running. func (h *httpServer) setListenAddr(host string, port int) error { h.mu.Lock() defer h.mu.Unlock() if h.listener != nil && (host != h.host || port != h.port) { return fmt.Errorf("HTTP server already running on %s", h.endpoint) } h.host, h.port = host, port h.endpoint = fmt.Sprintf("%s:%d", host, port) return nil } // listenAddr returns the listening address of the server. func (h *httpServer) listenAddr() string { h.mu.Lock() defer h.mu.Unlock() if h.listener != nil { return h.listener.Addr().String() } return h.endpoint } // start starts the HTTP server if it is enabled and not already running. func (h *httpServer) start() error { h.mu.Lock() defer h.mu.Unlock() if h.endpoint == "" || h.listener != nil { return nil // already running or not configured } // Initialize the server. h.server = &http.Server{Handler: h} if h.timeouts != (rpc.HTTPTimeouts{}) { CheckTimeouts(&h.timeouts) h.server.ReadTimeout = h.timeouts.ReadTimeout h.server.WriteTimeout = h.timeouts.WriteTimeout h.server.IdleTimeout = h.timeouts.IdleTimeout } // Start the server. listener, err := net.Listen("tcp", h.endpoint) if err != nil { // If the server fails to start, we need to clear out the RPC and WS // configuration so they can be configured another time. h.disableRPC() h.disableWS() return err } h.listener = listener go h.server.Serve(listener) // nolint:errcheck // if server is websocket only, return after logging if h.wsAllowed() && !h.rpcAllowed() { h.log.Info("WebSocket enabled", "url", fmt.Sprintf("ws://%v", listener.Addr())) return nil } // Log http endpoint. h.log.Info("HTTP server started", "endpoint", listener.Addr(), "cors", strings.Join(h.httpConfig.CorsAllowedOrigins, ","), "vhosts", strings.Join(h.httpConfig.Vhosts, ","), ) // Log all handlers mounted on server. paths := make([]string, len(h.handlerNames)) i := 0 for path := range h.handlerNames { paths[i] = path i++ } sort.Strings(paths) logged := make(map[string]bool, len(paths)) for _, path := range paths { name := h.handlerNames[path] if !logged[name] { log.Info(name+" enabled", "url", "http://"+listener.Addr().String()+path) logged[name] = true } } return nil } func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { rpc := h.httpHandler.Load().(*rpcHandler) if r.RequestURI == "/" { // Serve JSON-RPC on the root path. ws := h.wsHandler.Load().(*rpcHandler) if ws != nil && isWebsocket(r) { ws.ServeHTTP(w, r) return } if rpc != nil { rpc.ServeHTTP(w, r) return } } else if rpc != nil { // Requests to a path below root are handled by the mux, // which has all the handlers registered via Node.RegisterHandler. // These are made available when RPC is enabled. h.mux.ServeHTTP(w, r) return } w.WriteHeader(404) } // stop shuts down the HTTP server. func (h *httpServer) stop() { h.mu.Lock() defer h.mu.Unlock() h.doStop() } func (h *httpServer) doStop() { if h.listener == nil { return // not running } // Shut down the server. httpHandler := h.httpHandler.Load().(*rpcHandler) wsHandler := h.httpHandler.Load().(*rpcHandler) if httpHandler != nil { h.httpHandler.Store((*rpcHandler)(nil)) httpHandler.server.Stop() } if wsHandler != nil { h.wsHandler.Store((*rpcHandler)(nil)) wsHandler.server.Stop() } h.server.Shutdown(context.Background()) //nolint:errcheck h.listener.Close() h.log.Info("HTTP server stopped", "endpoint", h.listener.Addr()) // Clear out everything to allow re-configuring it later. h.host, h.port, h.endpoint = "", 0, "" h.server, h.listener = nil, nil } // enableRPC turns on JSON-RPC over HTTP on the server. func (h *httpServer) enableRPC(apis []rpc.API, config httpConfig, allowList rpc.AllowList) error { h.mu.Lock() defer h.mu.Unlock() if h.rpcAllowed() { return fmt.Errorf("JSON-RPC over HTTP is already enabled") } // Create RPC server and handler. srv := rpc.NewServer() srv.SetAllowList(allowList) if err := RegisterApisFromWhitelist(apis, config.Modules, srv, false); err != nil { return err } h.httpConfig = config h.httpHandler.Store(&rpcHandler{ Handler: NewHTTPHandlerStack(srv, config.CorsAllowedOrigins, config.Vhosts), server: srv, }) return nil } // disableRPC stops the HTTP RPC handler. This is internal, the caller must hold h.mu. func (h *httpServer) disableRPC() bool { handler := h.httpHandler.Load().(*rpcHandler) if handler != nil { h.httpHandler.Store((*rpcHandler)(nil)) handler.server.Stop() } return handler != nil } // enableWS turns on JSON-RPC over WebSocket on the server. func (h *httpServer) enableWS(apis []rpc.API, config wsConfig, allowList rpc.AllowList) error { h.mu.Lock() defer h.mu.Unlock() if h.wsAllowed() { return fmt.Errorf("JSON-RPC over WebSocket is already enabled") } // Create RPC server and handler. srv := rpc.NewServer() srv.SetAllowList(allowList) if err := RegisterApisFromWhitelist(apis, config.Modules, srv, false); err != nil { return err } h.wsConfig = config h.wsHandler.Store(&rpcHandler{ Handler: srv.WebsocketHandler(config.Origins), server: srv, }) return nil } // stopWS disables JSON-RPC over WebSocket and also stops the server if it only serves WebSocket. func (h *httpServer) stopWS() { h.mu.Lock() defer h.mu.Unlock() if h.disableWS() { if !h.rpcAllowed() { h.doStop() } } } // disableWS disables the WebSocket handler. This is internal, the caller must hold h.mu. func (h *httpServer) disableWS() bool { ws := h.wsHandler.Load().(*rpcHandler) if ws != nil { h.wsHandler.Store((*rpcHandler)(nil)) ws.server.Stop() } return ws != nil } // rpcAllowed returns true when JSON-RPC over HTTP is enabled. func (h *httpServer) rpcAllowed() bool { return h.httpHandler.Load().(*rpcHandler) != nil } // wsAllowed returns true when JSON-RPC over WebSocket is enabled. func (h *httpServer) wsAllowed() bool { return h.wsHandler.Load().(*rpcHandler) != nil } // isWebsocket checks the header of an http request for a websocket upgrade request. func isWebsocket(r *http.Request) bool { return strings.ToLower(r.Header.Get("Upgrade")) == "websocket" && strings.Contains(strings.ToLower(r.Header.Get("Connection")), "upgrade") } // NewHTTPHandlerStack returns wrapped http-related handlers func NewHTTPHandlerStack(srv http.Handler, cors []string, vhosts []string) http.Handler { // Wrap the CORS-handler within a host-handler handler := newCorsHandler(srv, cors) handler = newVHostHandler(vhosts, handler) return newGzipHandler(handler) } func newCorsHandler(srv http.Handler, allowedOrigins []string) http.Handler { // disable CORS support if user has not specified a custom CORS configuration if len(allowedOrigins) == 0 { return srv } c := cors.New(cors.Options{ AllowedOrigins: allowedOrigins, AllowedMethods: []string{http.MethodPost, http.MethodGet}, AllowedHeaders: []string{"*"}, MaxAge: 600, }) return c.Handler(srv) } // virtualHostHandler is a handler which validates the Host-header of incoming requests. // Using virtual hosts can help prevent DNS rebinding attacks, where a 'random' domain name points to // the service ip address (but without CORS headers). By verifying the targeted virtual host, we can // ensure that it's a destination that the node operator has defined. type virtualHostHandler struct { vhosts map[string]struct{} next http.Handler } func newVHostHandler(vhosts []string, next http.Handler) http.Handler { vhostMap := make(map[string]struct{}) for _, allowedHost := range vhosts { vhostMap[strings.ToLower(allowedHost)] = struct{}{} } return &virtualHostHandler{vhostMap, next} } // ServeHTTP serves JSON-RPC requests over HTTP, implements http.Handler func (h *virtualHostHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // if r.Host is not set, we can continue serving since a browser would set the Host header if r.Host == "" { h.next.ServeHTTP(w, r) return } host, _, err := net.SplitHostPort(r.Host) if err != nil { // Either invalid (too many colons) or no port specified host = r.Host } if ipAddr := net.ParseIP(host); ipAddr != nil { // It's an IP address, we can serve that h.next.ServeHTTP(w, r) return } // Not an IP address, but a hostname. Need to validate if _, exist := h.vhosts["*"]; exist { h.next.ServeHTTP(w, r) return } if _, exist := h.vhosts[host]; exist { h.next.ServeHTTP(w, r) return } http.Error(w, "invalid host specified", http.StatusForbidden) } var gzPool = sync.Pool{ New: func() interface{} { w := gzip.NewWriter(ioutil.Discard) return w }, } type gzipResponseWriter struct { io.Writer http.ResponseWriter } func (w *gzipResponseWriter) WriteHeader(status int) { w.Header().Del("Content-Length") w.ResponseWriter.WriteHeader(status) } func (w *gzipResponseWriter) Write(b []byte) (int, error) { return w.Writer.Write(b) } func newGzipHandler(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { next.ServeHTTP(w, r) return } w.Header().Set("Content-Encoding", "gzip") gz := gzPool.Get().(*gzip.Writer) defer gzPool.Put(gz) gz.Reset(w) defer gz.Close() next.ServeHTTP(&gzipResponseWriter{ResponseWriter: w, Writer: gz}, r) }) } type ipcServer struct { log log.Logger endpoint string mu sync.Mutex listener net.Listener srv *rpc.Server } func newIPCServer(log log.Logger, endpoint string) *ipcServer { return &ipcServer{log: log, endpoint: endpoint} } // Start starts the httpServer's http.Server func (is *ipcServer) start(apis []rpc.API) error { is.mu.Lock() defer is.mu.Unlock() if is.listener != nil { return nil // already running } listener, srv, err := rpc.StartIPCEndpoint(is.endpoint, apis) if err != nil { is.log.Warn("IPC opening failed", "url", is.endpoint, "error", err) return err } is.log.Info("IPC endpoint opened", "url", is.endpoint) is.listener, is.srv = listener, srv return nil } func (is *ipcServer) stop() error { is.mu.Lock() defer is.mu.Unlock() if is.listener == nil { return nil // not running } err := is.listener.Close() is.srv.Stop() is.listener, is.srv = nil, nil is.log.Info("IPC endpoint closed", "url", is.endpoint) return err } // RegisterApisFromWhitelist checks the given modules' availability, generates a whitelist based on the allowed modules, // and then registers all of the APIs exposed by the services. func RegisterApisFromWhitelist(apis []rpc.API, modules []string, srv *rpc.Server, exposeAll bool) error { if bad, available := checkModuleAvailability(modules, apis); len(bad) > 0 { log.Error("Unavailable modules in HTTP API list", "unavailable", bad, "available", available) } // Generate the whitelist based on the allowed modules whitelist := make(map[string]bool) for _, module := range modules { whitelist[module] = true } // Register all the APIs exposed by the services for _, api := range apis { if exposeAll || whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) { if err := srv.RegisterName(api.Namespace, api.Service); err != nil { return err } } } return nil }