// Copyright (c) HashiCorp, Inc. // SPDX-License-Identifier: MPL-2.0 package client import ( "bytes" "context" "crypto/tls" "encoding/json" "errors" "fmt" "net/http" "strings" "time" "unicode" "github.com/hashicorp/go-cleanhttp" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-retryablehttp" ) var ( // Retry configuration RetryWaitMin = 500 * time.Millisecond RetryWaitMax = 30 * time.Second RetryMax = 10 // Standard errs ErrNamespaceUnset = errors.New(`"namespace" is unset`) ErrPodNameUnset = errors.New(`"podName" is unset`) ErrNotInCluster = errors.New("unable to load in-cluster configuration, KUBERNETES_SERVICE_HOST and KUBERNETES_SERVICE_PORT must be defined") ) // Client is a minimal Kubernetes client. We rolled our own because the existing // Kubernetes client-go library available externally has a high number of dependencies // and we thought it wasn't worth it for only two API calls. If at some point they break // the client into smaller modules, or if we add quite a few methods to this client, it may // be worthwhile to revisit that decision. type Client struct { logger hclog.Logger config *Config stopCh chan struct{} } // New instantiates a Client. The stopCh is used for exiting retry loops // when closed. func New(logger hclog.Logger) (*Client, error) { config, err := inClusterConfig() if err != nil { return nil, err } return &Client{ logger: logger, config: config, stopCh: make(chan struct{}), }, nil } func (c *Client) Shutdown() { close(c.stopCh) } // GetPod gets a pod from the Kubernetes API. func (c *Client) GetPod(namespace, podName string) (*Pod, error) { endpoint := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s", namespace, podName) method := http.MethodGet // Validate that we received required parameters. if namespace == "" { return nil, ErrNamespaceUnset } if podName == "" { return nil, ErrPodNameUnset } req, err := http.NewRequest(method, c.config.Host+endpoint, nil) if err != nil { return nil, err } pod := &Pod{} if err := c.do(req, pod); err != nil { return nil, err } return pod, nil } // PatchPod updates the pod's tags to the given ones. // It does so non-destructively, or in other words, without tearing down // the pod. func (c *Client) PatchPod(namespace, podName string, patches ...*Patch) error { endpoint := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s", namespace, podName) method := http.MethodPatch // Validate that we received required parameters. if namespace == "" { return ErrNamespaceUnset } if podName == "" { return ErrPodNameUnset } if len(patches) == 0 { // No work to perform. return nil } var jsonPatches []map[string]interface{} for _, patch := range patches { if patch.Operation == Unset { return errors.New("patch operation must be set") } jsonPatches = append(jsonPatches, map[string]interface{}{ "op": patch.Operation, "path": patch.Path, "value": patch.Value, }) } body, err := json.Marshal(jsonPatches) if err != nil { return err } req, err := http.NewRequest(method, c.config.Host+endpoint, bytes.NewReader(body)) if err != nil { return err } req.Header.Set("Content-Type", "application/json-patch+json") return c.do(req, nil) } // do executes the given request, retrying if necessary. func (c *Client) do(req *http.Request, ptrToReturnObj interface{}) error { // Finish setting up a valid request. retryableReq, err := retryablehttp.FromRequest(req) if err != nil { return err } // Build a context that will call the cancelFunc when we receive // a stop from our stopChan. This allows us to exit from our retry // loop during a shutdown, rather than hanging. ctx, cancelFunc := context.WithCancel(context.Background()) go func() { select { case <-ctx.Done(): case <-c.stopCh: cancelFunc() } }() retryableReq.WithContext(ctx) retryableReq.Header.Set("Authorization", "Bearer "+c.config.BearerToken) retryableReq.Header.Set("Accept", "application/json") client := &retryablehttp.Client{ HTTPClient: cleanhttp.DefaultClient(), RetryWaitMin: RetryWaitMin, RetryWaitMax: RetryWaitMax, RetryMax: RetryMax, CheckRetry: c.getCheckRetry(req), Backoff: retryablehttp.DefaultBackoff, } client.HTTPClient.Transport = &http.Transport{ TLSClientConfig: &tls.Config{ RootCAs: c.config.CACertPool, }, } // Execute and retry the request. This client comes with exponential backoff and // jitter already rolled in. resp, err := client.Do(retryableReq) if err != nil { return err } defer func() { if err := resp.Body.Close(); err != nil { if c.logger.IsWarn() { // Failing to close response bodies can present as a memory leak so it's // important to surface it. c.logger.Warn(fmt.Sprintf("unable to close response body: %s", err)) } } }() // If we're not supposed to read out the body, we have nothing further // to do here. if ptrToReturnObj == nil { return nil } // Attempt to read out the body into the given return object. return json.NewDecoder(resp.Body).Decode(ptrToReturnObj) } func (c *Client) getCheckRetry(req *http.Request) retryablehttp.CheckRetry { return func(ctx context.Context, resp *http.Response, err error) (bool, error) { if resp == nil { return true, fmt.Errorf("nil response: %s", req.URL.RequestURI()) } switch resp.StatusCode { case 200, 201, 202, 204: // Success. return false, nil case 401, 403: // Perhaps the token from our bearer token file has been refreshed. config, err := inClusterConfig() if err != nil { return false, err } if config.BearerToken == c.config.BearerToken { // It's the same token. return false, fmt.Errorf("bad status code: %s", sanitizedDebuggingInfo(req, resp.StatusCode)) } c.config = config // Continue to try again, but return the error too in case the caller would rather read it out. return true, fmt.Errorf("bad status code: %s", sanitizedDebuggingInfo(req, resp.StatusCode)) case 404: return false, &ErrNotFound{debuggingInfo: sanitizedDebuggingInfo(req, resp.StatusCode)} case 500, 502, 503, 504: // Could be transient. return true, fmt.Errorf("unexpected status code: %s", sanitizedDebuggingInfo(req, resp.StatusCode)) } // Unexpected. return false, fmt.Errorf("unexpected status code: %s", sanitizedDebuggingInfo(req, resp.StatusCode)) } } type Pod struct { Metadata *Metadata `json:"metadata,omitempty"` } type Metadata struct { Name string `json:"name,omitempty"` // This map will be nil if no "labels" key was provided. // It will be populated but have a length of zero if the // key was provided, but no values. Labels map[string]string `json:"labels,omitempty"` } type PatchOperation string const ( Unset PatchOperation = "unset" Add = "add" Replace = "replace" ) type Patch struct { Operation PatchOperation Path string Value interface{} } type ErrNotFound struct { debuggingInfo string } func (e *ErrNotFound) Error() string { return e.debuggingInfo } // Sanitize is for "data" being sent to the Kubernetes API. // Data must consist of alphanumeric characters, '-', '_' or '.'. // Any other characters found in the original value will be stripped, // and the surrounding characters will be concatenated. func Sanitize(val string) string { return strings.Map(replaceBadCharsWithDashes, val) } func replaceBadCharsWithDashes(r rune) rune { if unicode.IsLetter(r) { return r } if unicode.IsNumber(r) { return r } switch string(r) { case "-", "_", ".": return r } return '-' } // sanitizedDebuggingInfo provides a returnable string that can be used for debugging. This is intentionally somewhat vague // because we don't want to leak secrets that may be in a request or response body. func sanitizedDebuggingInfo(req *http.Request, respStatus int) string { return fmt.Sprintf("req method: %s, req url: %s, resp statuscode: %d", req.Method, req.URL, respStatus) }