kube-router/pkg/cri/remote_runtime.go
Aaron U'Ren 9d7c58b097 fix(dsr): change grpc resolver to passthrough
When we used DialContext() before it was deprecated, it manually set the
default resolver to passthrough. NewClient() appears to not do this, and
then several versions of grpc ago, they defaulted to the DNS resolver
which broke our DSR implementation with CRI compatible container
engines.

I'm not 100% sure that there isn't a better way to specify this, but I
spent 20 - 30 minutes poking around the gRPC code base and I don't think
that I can see an easy way to do this outside of specifying it this way.

Furthermore, the project itself seems to advocate for this approach in
comments like those on: https://github.com/grpc/grpc-go/issues/1846
2024-10-21 15:44:21 -05:00

111 lines
3.0 KiB
Go

package cri
import (
"context"
"encoding/json"
"errors"
"net"
"strings"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"k8s.io/klog/v2"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
)
const (
DefaultConnectionTimeout = 15 * time.Second
maxMsgSize = 1024 * 1024 * 16 // 16 MB
)
// remoteRuntimeService is a gRPC implementation of RuntimeService.
type remoteRuntimeService struct {
timeout time.Duration
runtimeClient runtimeapi.RuntimeServiceClient
conn *grpc.ClientConn
}
type containerInfo struct {
Pid int `json:"pid"`
}
// NewRemoteRuntimeService creates a new RuntimeService.
func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration) (RuntimeService, error) {
proto, addr, err := EndpointParser(endpoint)
if err != nil {
return nil, err
}
klog.V(4).Infof("[RuntimeService] got endpoint %s (proto=%s, path=%s)", endpoint, proto, addr)
if proto == "unix" {
// Every since grpc.DialContext was deprecated, we no longer get the passthrough resolver for free, so we need
// to add it manually. See: https://github.com/grpc/grpc-go/issues/1846 for more context
addr = "passthrough:///" + addr
} else {
return nil, errors.New("[RuntimeService] only unix socket is currently supported")
}
conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(dialer),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
if err != nil {
klog.Errorf("Connect remote runtime %s failed: %v", addr, err)
return nil, err
}
return &remoteRuntimeService{
timeout: connectionTimeout,
runtimeClient: runtimeapi.NewRuntimeServiceClient(conn),
conn: conn,
}, nil
}
// ContainerInfo returns verbose info of provided container.
func (r *remoteRuntimeService) ContainerInfo(id string) (*containerInfo, error) {
ctx, cancel := context.WithTimeout(context.Background(), r.timeout)
defer cancel()
// Verbose should be set, otherwise we'll get an empty slice. see
resp, err := r.runtimeClient.ContainerStatus(ctx, &runtimeapi.ContainerStatusRequest{
ContainerId: id,
Verbose: true,
})
if err != nil {
return nil, err
}
info := containerInfo{}
if err := json.Unmarshal([]byte(resp.Info["info"]), &info); err != nil {
return nil, err
}
return &info, nil
}
// Close tears down the *grpc.ClientConn and all underlying connections.
func (r *remoteRuntimeService) Close() error {
if err := r.conn.Close(); err != nil {
return err
}
return nil
}
func dialer(ctx context.Context, addr string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, "unix", addr)
}
// EndpointParser returns protocol and path of provided endpoint
func EndpointParser(endpoint string) (proto string, path string, err error) {
result := strings.Split(endpoint, "://")
if len(result) < 2 {
return "", "", errors.New("bad endpoint format. should be 'protocol://path'")
}
return result[0], result[1], nil
}