diff --git a/cmd/talosctl/cmd/mgmt/debug/air-gapped.go b/cmd/talosctl/cmd/mgmt/debug/air-gapped.go index f24fb73a4..e57daf4d2 100644 --- a/cmd/talosctl/cmd/mgmt/debug/air-gapped.go +++ b/cmd/talosctl/cmd/mgmt/debug/air-gapped.go @@ -194,14 +194,16 @@ func runHTTPServer(ctx context.Context, certPEM, keyPEM []byte) error { } func handleTunneling(w http.ResponseWriter, r *http.Request) { - dst, err := net.DialTimeout("tcp", r.Host, 10*time.Second) + addr := r.URL.Host + + dstConn, err := net.DialTimeout("tcp", addr, 10*time.Second) if err != nil { http.Error(w, err.Error(), http.StatusServiceUnavailable) return } - w.WriteHeader(http.StatusOK) + dst := dstConn.(*net.TCPConn) hijacker, ok := w.(http.Hijacker) if !ok { @@ -217,15 +219,35 @@ func handleTunneling(w http.ResponseWriter, r *http.Request) { return } - go transfer(dst, clientConn) - go transfer(clientConn, dst) + src := clientConn.(*net.TCPConn) + + src.Write([]byte("HTTP/1.0 200 Connection established\r\n\r\n")) //nolint:errcheck + + log.Printf("HTTP CONNECT: tunneling to %s", addr) + + defer dst.Close() //nolint:errcheck + defer src.Close() //nolint:errcheck + + var eg errgroup.Group + + eg.Go(func() error { return transfer(dst, src, "src -> dst: "+addr) }) + eg.Go(func() error { return transfer(src, dst, "dst -> src: "+addr) }) + + if err = eg.Wait(); err != nil { + log.Printf("HTTP CONNECT: tunneling to %s: failed %v", addr, err) + } } -func transfer(destination io.WriteCloser, source io.ReadCloser) { - defer destination.Close() //nolint:errcheck - defer source.Close() //nolint:errcheck +func transfer(destination *net.TCPConn, source *net.TCPConn, label string) error { + defer destination.CloseWrite() //nolint:errcheck + defer source.CloseRead() //nolint:errcheck - io.Copy(destination, source) //nolint:errcheck + n, err := io.Copy(destination, source) + if err != nil { + return fmt.Errorf("transfer failed %s (%d bytes copied): %w", label, n, err) + } + + return nil } func handleHTTP(w http.ResponseWriter, req *http.Request) { diff --git a/go.mod b/go.mod index fd54ce0d0..5c575a28a 100644 --- a/go.mod +++ b/go.mod @@ -140,8 +140,8 @@ require ( github.com/safchain/ethtool v0.5.9 github.com/scaleway/scaleway-sdk-go v1.0.0-beta.30 github.com/siderolabs/crypto v0.5.1 - github.com/siderolabs/discovery-api v0.1.5 - github.com/siderolabs/discovery-client v0.1.10 + github.com/siderolabs/discovery-api v0.1.6 + github.com/siderolabs/discovery-client v0.1.11 github.com/siderolabs/gen v0.8.0 github.com/siderolabs/go-api-signature v0.3.6 github.com/siderolabs/go-blockdevice v0.4.8 @@ -191,7 +191,7 @@ require ( golang.org/x/time v0.10.0 golang.zx2c4.com/wireguard/wgctrl v0.0.0-20230429144221-925a1e7659e6 google.golang.org/grpc v1.70.0 - google.golang.org/protobuf v1.36.4 + google.golang.org/protobuf v1.36.5 gopkg.in/yaml.v3 v3.0.1 k8s.io/klog/v2 v2.130.1 kernel.org/pub/linux/libs/security/libcap/cap v1.2.73 diff --git a/go.sum b/go.sum index b9fe82a81..44e95802c 100644 --- a/go.sum +++ b/go.sum @@ -635,10 +635,10 @@ github.com/siderolabs/coredns v1.12.50 h1:uOBWZErtM3pcncGj0XxHru53Xo3SND/qjEVkfH github.com/siderolabs/coredns v1.12.50/go.mod h1:le+OxWZF+rYtNGuYvxuOAdlrTwJAHKcuW09SPu25w2c= github.com/siderolabs/crypto v0.5.1 h1:aZEUTZBoP8rH+0TqQAlUgazriPh89MrXf4R+th+m6ps= github.com/siderolabs/crypto v0.5.1/go.mod h1:7RHC7eUKBx6RLS2lDaNXrQ83zY9iPH/aQSTxk1I4/j4= -github.com/siderolabs/discovery-api v0.1.5 h1:fcHVkLkWla7C5+9IeOGEUQ4N8Yp9R7a/kcKbiay2QKw= -github.com/siderolabs/discovery-api v0.1.5/go.mod h1:b9jOm9T2puYVcRqCAjWxPcHz2qBqDX8I0OZDOyOFHXg= -github.com/siderolabs/discovery-client v0.1.10 h1:bTAvFLiISSzVXyYL1cIgAz8cPYd9ZfvhxwdebgtxARA= -github.com/siderolabs/discovery-client v0.1.10/go.mod h1:Ew1z07eyJwqNwum84IKYH4S649KEKK5WUmRW49HlXS8= +github.com/siderolabs/discovery-api v0.1.6 h1:/LhsF1ytqFEfWwV0UKfUgn90k9fk5+rhYMJ9yeUB2yc= +github.com/siderolabs/discovery-api v0.1.6/go.mod h1:s5CnTyRMGid/vJNSJs8Jw9I4tnKHu/2SGqP2ytTaePQ= +github.com/siderolabs/discovery-client v0.1.11 h1:Au+7QZ+CIB6g4C7ZCC4m5Ai5Uso1g/I3/E4bSUElzF8= +github.com/siderolabs/discovery-client v0.1.11/go.mod h1:Iw5XUphGNNV0m2czHjbj9aLhQvfM8hYEfWCc6fdQ4ko= github.com/siderolabs/ethtool v0.3.0 h1:98kMFGnkDEikngqtDyk6R/ykMjaGpJ1df/bw9vPLKJg= github.com/siderolabs/ethtool v0.3.0/go.mod h1:3u47fCDlOQGM7IhXhUB3uPEcjdQxGmK53/45kEg1HDM= github.com/siderolabs/gen v0.8.0 h1:Pj93+hexkk5hQ7izjJ6YXnEWc8vlzOmDwFz13/VzS7o= @@ -1084,8 +1084,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.36.4 h1:6A3ZDJHn/eNqc1i+IdefRzy/9PokBTPvcqMySR7NNIM= -google.golang.org/protobuf v1.36.4/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= diff --git a/internal/app/machined/pkg/controllers/cluster/discovery_service.go b/internal/app/machined/pkg/controllers/cluster/discovery_service.go index 56ac5fdb9..8a02b8ea5 100644 --- a/internal/app/machined/pkg/controllers/cluster/discovery_service.go +++ b/internal/app/machined/pkg/controllers/cluster/discovery_service.go @@ -23,7 +23,9 @@ import ( "github.com/siderolabs/gen/optional" "github.com/siderolabs/gen/xslices" "go.uber.org/zap" + "google.golang.org/grpc" + "github.com/siderolabs/talos/pkg/grpc/dialer" "github.com/siderolabs/talos/pkg/httpdefaults" "github.com/siderolabs/talos/pkg/machinery/config/machine" "github.com/siderolabs/talos/pkg/machinery/proto" @@ -244,6 +246,9 @@ func (ctrl *DiscoveryServiceController) Run(ctx context.Context, r controller.Ru TLSConfig: &tls.Config{ RootCAs: httpdefaults.RootCAs(), }, + DialOptions: []grpc.DialOption{ + grpc.WithContextDialer(dialer.DynamicProxyDialer), + }, }) if err != nil { return fmt.Errorf("error initializing discovery client: %w", err) diff --git a/pkg/grpc/dialer/proxy.go b/pkg/grpc/dialer/proxy.go new file mode 100644 index 000000000..6635fa3ec --- /dev/null +++ b/pkg/grpc/dialer/proxy.go @@ -0,0 +1,175 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package dialer + +import ( + "bufio" + "context" + "encoding/base64" + "fmt" + "io" + "net" + "net/http" + "net/http/httputil" + "net/url" + + "golang.org/x/net/http/httpproxy" + "google.golang.org/grpc" +) + +/* + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +const grpcUA = "grpc-go/" + grpc.Version + +// DynamicProxyDialer is a fork of grpc standard dialer which supports dynamic resolving of proxy settings +// on each request (vs. caching it once per process). +// +// DynamicProxyDialer assumes that the address is using 'tcp' network. +func DynamicProxyDialer(ctx context.Context, addr string) (net.Conn, error) { + newAddr := addr + + proxyURL, err := mapAddress(addr) + if err != nil { + return nil, err + } + + if proxyURL != nil { + newAddr = proxyURL.Host + } + + conn, err := NetDialerWithTCPKeepalive().DialContext(ctx, "tcp", newAddr) + if err != nil { + return nil, err + } + + if proxyURL == nil { + // proxy is disabled if proxyURL is nil. + return conn, err + } + + return doHTTPConnectHandshake(ctx, conn, addr, proxyURL, grpcUA) +} + +const proxyAuthHeaderKey = "Proxy-Authorization" + +func mapAddress(address string) (*url.URL, error) { + req := &http.Request{ + URL: &url.URL{ + Scheme: "https", + Host: address, + }, + } + + return httpproxy.FromEnvironment().ProxyFunc()(req.URL) +} + +// To read a response from a net.Conn, http.ReadResponse() takes a bufio.Reader. +// It's possible that this reader reads more than what's need for the response and stores +// those bytes in the buffer. +// bufConn wraps the original net.Conn and the bufio.Reader to make sure we don't lose the +// bytes in the buffer. +type bufConn struct { + net.Conn + r io.Reader +} + +func (c *bufConn) Read(b []byte) (int, error) { + return c.r.Read(b) +} + +func basicAuth(username, password string) string { + auth := username + ":" + password + + return base64.StdEncoding.EncodeToString([]byte(auth)) +} + +func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr string, proxyURL *url.URL, grpcUA string) (_ net.Conn, err error) { + defer func() { + if err != nil { + conn.Close() //nolint:errcheck + } + }() + + req := &http.Request{ + Method: http.MethodConnect, + URL: &url.URL{Host: backendAddr}, + Header: map[string][]string{"User-Agent": {grpcUA}}, + } + + if t := proxyURL.User; t != nil { + u := t.Username() + p, _ := t.Password() + req.Header.Add(proxyAuthHeaderKey, "Basic "+basicAuth(u, p)) + } + + if err := sendHTTPRequest(ctx, req, conn); err != nil { + return nil, fmt.Errorf("failed to write the HTTP request: %v", err) + } + + r := bufio.NewReader(conn) + + resp, err := http.ReadResponse(r, req) + if err != nil { + return nil, fmt.Errorf("reading server HTTP response: %v", err) + } + + defer resp.Body.Close() //nolint:errcheck + + if resp.StatusCode != http.StatusOK { + dump, err := httputil.DumpResponse(resp, true) + if err != nil { + return nil, fmt.Errorf("failed to do connect handshake, status code: %s", resp.Status) + } + + return nil, fmt.Errorf("failed to do connect handshake, response: %q", dump) + } + + // The buffer could contain extra bytes from the target server, so we can't + // discard it. However, in many cases where the server waits for the client + // to send the first message (e.g. when TLS is being used), the buffer will + // be empty, so we can avoid the overhead of reading through this buffer. + if r.Buffered() != 0 { + return &bufConn{Conn: conn, r: r}, nil + } + + return conn, nil +} + +func sendHTTPRequest(ctx context.Context, req *http.Request, conn net.Conn) error { + req = req.WithContext(ctx) + if err := req.Write(conn); err != nil { + return fmt.Errorf("failed to write the HTTP request: %v", err) + } + + return nil +} + +// NetDialerWithTCPKeepalive returns a net.Dialer that enables TCP keepalives on +// the underlying connection with OS default values for keepalive parameters. +func NetDialerWithTCPKeepalive() *net.Dialer { + return &net.Dialer{ + KeepAliveConfig: net.KeepAliveConfig{ + Enable: true, + Idle: -1, + Count: -1, + Interval: -1, + }, + } +}