mirror of
https://github.com/cloudnativelabs/kube-router.git
synced 2026-05-04 22:26:16 +02:00
When we used DialContext() before it was deprecated, it manually set the default resolver to passthrough. NewClient() appears to not do this, and then several versions of grpc ago, they defaulted to the DNS resolver which broke our DSR implementation with CRI compatible container engines. I'm not 100% sure that there isn't a better way to specify this, but I spent 20 - 30 minutes poking around the gRPC code base and I don't think that I can see an easy way to do this outside of specifying it this way. Furthermore, the project itself seems to advocate for this approach in comments like those on: https://github.com/grpc/grpc-go/issues/1846
111 lines
3.0 KiB
Go
111 lines
3.0 KiB
Go
package cri
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"net"
|
|
"strings"
|
|
"time"
|
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
|
|
"k8s.io/klog/v2"
|
|
|
|
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
|
|
)
|
|
|
|
const (
|
|
DefaultConnectionTimeout = 15 * time.Second
|
|
maxMsgSize = 1024 * 1024 * 16 // 16 MB
|
|
)
|
|
|
|
// remoteRuntimeService is a gRPC implementation of RuntimeService.
|
|
type remoteRuntimeService struct {
|
|
timeout time.Duration
|
|
runtimeClient runtimeapi.RuntimeServiceClient
|
|
conn *grpc.ClientConn
|
|
}
|
|
|
|
type containerInfo struct {
|
|
Pid int `json:"pid"`
|
|
}
|
|
|
|
// NewRemoteRuntimeService creates a new RuntimeService.
|
|
func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration) (RuntimeService, error) {
|
|
proto, addr, err := EndpointParser(endpoint)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
klog.V(4).Infof("[RuntimeService] got endpoint %s (proto=%s, path=%s)", endpoint, proto, addr)
|
|
|
|
if proto == "unix" {
|
|
// Every since grpc.DialContext was deprecated, we no longer get the passthrough resolver for free, so we need
|
|
// to add it manually. See: https://github.com/grpc/grpc-go/issues/1846 for more context
|
|
addr = "passthrough:///" + addr
|
|
} else {
|
|
return nil, errors.New("[RuntimeService] only unix socket is currently supported")
|
|
}
|
|
|
|
conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials()),
|
|
grpc.WithContextDialer(dialer),
|
|
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
|
|
if err != nil {
|
|
klog.Errorf("Connect remote runtime %s failed: %v", addr, err)
|
|
return nil, err
|
|
}
|
|
|
|
return &remoteRuntimeService{
|
|
timeout: connectionTimeout,
|
|
runtimeClient: runtimeapi.NewRuntimeServiceClient(conn),
|
|
conn: conn,
|
|
}, nil
|
|
}
|
|
|
|
// ContainerInfo returns verbose info of provided container.
|
|
func (r *remoteRuntimeService) ContainerInfo(id string) (*containerInfo, error) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), r.timeout)
|
|
defer cancel()
|
|
|
|
// Verbose should be set, otherwise we'll get an empty slice. see
|
|
resp, err := r.runtimeClient.ContainerStatus(ctx, &runtimeapi.ContainerStatusRequest{
|
|
ContainerId: id,
|
|
Verbose: true,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
info := containerInfo{}
|
|
|
|
if err := json.Unmarshal([]byte(resp.Info["info"]), &info); err != nil {
|
|
return nil, err
|
|
}
|
|
return &info, nil
|
|
}
|
|
|
|
// Close tears down the *grpc.ClientConn and all underlying connections.
|
|
func (r *remoteRuntimeService) Close() error {
|
|
if err := r.conn.Close(); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func dialer(ctx context.Context, addr string) (net.Conn, error) {
|
|
return (&net.Dialer{}).DialContext(ctx, "unix", addr)
|
|
}
|
|
|
|
// EndpointParser returns protocol and path of provided endpoint
|
|
func EndpointParser(endpoint string) (proto string, path string, err error) {
|
|
|
|
result := strings.Split(endpoint, "://")
|
|
|
|
if len(result) < 2 {
|
|
return "", "", errors.New("bad endpoint format. should be 'protocol://path'")
|
|
}
|
|
return result[0], result[1], nil
|
|
}
|