fix: handle dynamic HTTP proxy settings for discovery client

Unfortunately, gRPC client library doesn't offer control on picking up
HTTP proxy for a connection, and relies on sync.Once protected value
which reads proxy variables from the environment just once.

As environment variables can be set dynamically, there is a chance that
dial might happen before they are set, so they are never respected after
that.

So I had to copy/adapt some part of grpc internal/transport parts to
enable dynamic loading of HTTP proxy, and use new dialer in discovery
client.

Fixes #10136

Also, while testing this with our small "air-gapped" tool I discovered
that our HTTP proxy test implementation is not complete, so this had to
be fixed as well.

Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
This commit is contained in:
Andrey Smirnov 2025-02-27 18:36:21 +04:00
parent d45eaeb74c
commit ebfdb91b4c
No known key found for this signature in database
GPG Key ID: FE042E3D4085A811
5 changed files with 219 additions and 17 deletions

View File

@ -194,14 +194,16 @@ func runHTTPServer(ctx context.Context, certPEM, keyPEM []byte) error {
}
func handleTunneling(w http.ResponseWriter, r *http.Request) {
dst, err := net.DialTimeout("tcp", r.Host, 10*time.Second)
addr := r.URL.Host
dstConn, err := net.DialTimeout("tcp", addr, 10*time.Second)
if err != nil {
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return
}
w.WriteHeader(http.StatusOK)
dst := dstConn.(*net.TCPConn)
hijacker, ok := w.(http.Hijacker)
if !ok {
@ -217,15 +219,35 @@ func handleTunneling(w http.ResponseWriter, r *http.Request) {
return
}
go transfer(dst, clientConn)
go transfer(clientConn, dst)
src := clientConn.(*net.TCPConn)
src.Write([]byte("HTTP/1.0 200 Connection established\r\n\r\n")) //nolint:errcheck
log.Printf("HTTP CONNECT: tunneling to %s", addr)
defer dst.Close() //nolint:errcheck
defer src.Close() //nolint:errcheck
var eg errgroup.Group
eg.Go(func() error { return transfer(dst, src, "src -> dst: "+addr) })
eg.Go(func() error { return transfer(src, dst, "dst -> src: "+addr) })
if err = eg.Wait(); err != nil {
log.Printf("HTTP CONNECT: tunneling to %s: failed %v", addr, err)
}
}
func transfer(destination io.WriteCloser, source io.ReadCloser) {
defer destination.Close() //nolint:errcheck
defer source.Close() //nolint:errcheck
func transfer(destination *net.TCPConn, source *net.TCPConn, label string) error {
defer destination.CloseWrite() //nolint:errcheck
defer source.CloseRead() //nolint:errcheck
io.Copy(destination, source) //nolint:errcheck
n, err := io.Copy(destination, source)
if err != nil {
return fmt.Errorf("transfer failed %s (%d bytes copied): %w", label, n, err)
}
return nil
}
func handleHTTP(w http.ResponseWriter, req *http.Request) {

6
go.mod
View File

@ -140,8 +140,8 @@ require (
github.com/safchain/ethtool v0.5.9
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.30
github.com/siderolabs/crypto v0.5.1
github.com/siderolabs/discovery-api v0.1.5
github.com/siderolabs/discovery-client v0.1.10
github.com/siderolabs/discovery-api v0.1.6
github.com/siderolabs/discovery-client v0.1.11
github.com/siderolabs/gen v0.8.0
github.com/siderolabs/go-api-signature v0.3.6
github.com/siderolabs/go-blockdevice v0.4.8
@ -191,7 +191,7 @@ require (
golang.org/x/time v0.10.0
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20230429144221-925a1e7659e6
google.golang.org/grpc v1.70.0
google.golang.org/protobuf v1.36.4
google.golang.org/protobuf v1.36.5
gopkg.in/yaml.v3 v3.0.1
k8s.io/klog/v2 v2.130.1
kernel.org/pub/linux/libs/security/libcap/cap v1.2.73

12
go.sum
View File

@ -635,10 +635,10 @@ github.com/siderolabs/coredns v1.12.50 h1:uOBWZErtM3pcncGj0XxHru53Xo3SND/qjEVkfH
github.com/siderolabs/coredns v1.12.50/go.mod h1:le+OxWZF+rYtNGuYvxuOAdlrTwJAHKcuW09SPu25w2c=
github.com/siderolabs/crypto v0.5.1 h1:aZEUTZBoP8rH+0TqQAlUgazriPh89MrXf4R+th+m6ps=
github.com/siderolabs/crypto v0.5.1/go.mod h1:7RHC7eUKBx6RLS2lDaNXrQ83zY9iPH/aQSTxk1I4/j4=
github.com/siderolabs/discovery-api v0.1.5 h1:fcHVkLkWla7C5+9IeOGEUQ4N8Yp9R7a/kcKbiay2QKw=
github.com/siderolabs/discovery-api v0.1.5/go.mod h1:b9jOm9T2puYVcRqCAjWxPcHz2qBqDX8I0OZDOyOFHXg=
github.com/siderolabs/discovery-client v0.1.10 h1:bTAvFLiISSzVXyYL1cIgAz8cPYd9ZfvhxwdebgtxARA=
github.com/siderolabs/discovery-client v0.1.10/go.mod h1:Ew1z07eyJwqNwum84IKYH4S649KEKK5WUmRW49HlXS8=
github.com/siderolabs/discovery-api v0.1.6 h1:/LhsF1ytqFEfWwV0UKfUgn90k9fk5+rhYMJ9yeUB2yc=
github.com/siderolabs/discovery-api v0.1.6/go.mod h1:s5CnTyRMGid/vJNSJs8Jw9I4tnKHu/2SGqP2ytTaePQ=
github.com/siderolabs/discovery-client v0.1.11 h1:Au+7QZ+CIB6g4C7ZCC4m5Ai5Uso1g/I3/E4bSUElzF8=
github.com/siderolabs/discovery-client v0.1.11/go.mod h1:Iw5XUphGNNV0m2czHjbj9aLhQvfM8hYEfWCc6fdQ4ko=
github.com/siderolabs/ethtool v0.3.0 h1:98kMFGnkDEikngqtDyk6R/ykMjaGpJ1df/bw9vPLKJg=
github.com/siderolabs/ethtool v0.3.0/go.mod h1:3u47fCDlOQGM7IhXhUB3uPEcjdQxGmK53/45kEg1HDM=
github.com/siderolabs/gen v0.8.0 h1:Pj93+hexkk5hQ7izjJ6YXnEWc8vlzOmDwFz13/VzS7o=
@ -1084,8 +1084,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.36.4 h1:6A3ZDJHn/eNqc1i+IdefRzy/9PokBTPvcqMySR7NNIM=
google.golang.org/protobuf v1.36.4/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=

View File

@ -23,7 +23,9 @@ import (
"github.com/siderolabs/gen/optional"
"github.com/siderolabs/gen/xslices"
"go.uber.org/zap"
"google.golang.org/grpc"
"github.com/siderolabs/talos/pkg/grpc/dialer"
"github.com/siderolabs/talos/pkg/httpdefaults"
"github.com/siderolabs/talos/pkg/machinery/config/machine"
"github.com/siderolabs/talos/pkg/machinery/proto"
@ -244,6 +246,9 @@ func (ctrl *DiscoveryServiceController) Run(ctx context.Context, r controller.Ru
TLSConfig: &tls.Config{
RootCAs: httpdefaults.RootCAs(),
},
DialOptions: []grpc.DialOption{
grpc.WithContextDialer(dialer.DynamicProxyDialer),
},
})
if err != nil {
return fmt.Errorf("error initializing discovery client: %w", err)

175
pkg/grpc/dialer/proxy.go Normal file
View File

@ -0,0 +1,175 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package dialer
import (
"bufio"
"context"
"encoding/base64"
"fmt"
"io"
"net"
"net/http"
"net/http/httputil"
"net/url"
"golang.org/x/net/http/httpproxy"
"google.golang.org/grpc"
)
/*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
const grpcUA = "grpc-go/" + grpc.Version
// DynamicProxyDialer is a fork of grpc standard dialer which supports dynamic resolving of proxy settings
// on each request (vs. caching it once per process).
//
// DynamicProxyDialer assumes that the address is using 'tcp' network.
func DynamicProxyDialer(ctx context.Context, addr string) (net.Conn, error) {
newAddr := addr
proxyURL, err := mapAddress(addr)
if err != nil {
return nil, err
}
if proxyURL != nil {
newAddr = proxyURL.Host
}
conn, err := NetDialerWithTCPKeepalive().DialContext(ctx, "tcp", newAddr)
if err != nil {
return nil, err
}
if proxyURL == nil {
// proxy is disabled if proxyURL is nil.
return conn, err
}
return doHTTPConnectHandshake(ctx, conn, addr, proxyURL, grpcUA)
}
const proxyAuthHeaderKey = "Proxy-Authorization"
func mapAddress(address string) (*url.URL, error) {
req := &http.Request{
URL: &url.URL{
Scheme: "https",
Host: address,
},
}
return httpproxy.FromEnvironment().ProxyFunc()(req.URL)
}
// To read a response from a net.Conn, http.ReadResponse() takes a bufio.Reader.
// It's possible that this reader reads more than what's need for the response and stores
// those bytes in the buffer.
// bufConn wraps the original net.Conn and the bufio.Reader to make sure we don't lose the
// bytes in the buffer.
type bufConn struct {
net.Conn
r io.Reader
}
func (c *bufConn) Read(b []byte) (int, error) {
return c.r.Read(b)
}
func basicAuth(username, password string) string {
auth := username + ":" + password
return base64.StdEncoding.EncodeToString([]byte(auth))
}
func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr string, proxyURL *url.URL, grpcUA string) (_ net.Conn, err error) {
defer func() {
if err != nil {
conn.Close() //nolint:errcheck
}
}()
req := &http.Request{
Method: http.MethodConnect,
URL: &url.URL{Host: backendAddr},
Header: map[string][]string{"User-Agent": {grpcUA}},
}
if t := proxyURL.User; t != nil {
u := t.Username()
p, _ := t.Password()
req.Header.Add(proxyAuthHeaderKey, "Basic "+basicAuth(u, p))
}
if err := sendHTTPRequest(ctx, req, conn); err != nil {
return nil, fmt.Errorf("failed to write the HTTP request: %v", err)
}
r := bufio.NewReader(conn)
resp, err := http.ReadResponse(r, req)
if err != nil {
return nil, fmt.Errorf("reading server HTTP response: %v", err)
}
defer resp.Body.Close() //nolint:errcheck
if resp.StatusCode != http.StatusOK {
dump, err := httputil.DumpResponse(resp, true)
if err != nil {
return nil, fmt.Errorf("failed to do connect handshake, status code: %s", resp.Status)
}
return nil, fmt.Errorf("failed to do connect handshake, response: %q", dump)
}
// The buffer could contain extra bytes from the target server, so we can't
// discard it. However, in many cases where the server waits for the client
// to send the first message (e.g. when TLS is being used), the buffer will
// be empty, so we can avoid the overhead of reading through this buffer.
if r.Buffered() != 0 {
return &bufConn{Conn: conn, r: r}, nil
}
return conn, nil
}
func sendHTTPRequest(ctx context.Context, req *http.Request, conn net.Conn) error {
req = req.WithContext(ctx)
if err := req.Write(conn); err != nil {
return fmt.Errorf("failed to write the HTTP request: %v", err)
}
return nil
}
// NetDialerWithTCPKeepalive returns a net.Dialer that enables TCP keepalives on
// the underlying connection with OS default values for keepalive parameters.
func NetDialerWithTCPKeepalive() *net.Dialer {
return &net.Dialer{
KeepAliveConfig: net.KeepAliveConfig{
Enable: true,
Idle: -1,
Count: -1,
Interval: -1,
},
}
}