feat: drain node on shutdown

Cordon & drain a node when the Shutdown message is received.
Also adds a '--force' option to the shutdown command in case the control
plane is unresponsive.

Signed-off-by: Tim Jones <timniverse@gmail.com>
Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
This commit is contained in:
Tim Jones 2022-01-28 22:54:32 +01:00 committed by Andrey Smirnov
parent 7f0b3aae0a
commit fe40e7b1b3
No known key found for this signature in database
GPG Key ID: 7B26396447AB6DFD
13 changed files with 2010 additions and 1761 deletions

Binary file not shown.

View File

@ -60,7 +60,7 @@ service MachineService {
rpc ServiceRestart(ServiceRestartRequest) returns (ServiceRestartResponse);
rpc ServiceStart(ServiceStartRequest) returns (ServiceStartResponse);
rpc ServiceStop(ServiceStopRequest) returns (ServiceStopResponse);
rpc Shutdown(google.protobuf.Empty) returns (ShutdownResponse);
rpc Shutdown(ShutdownRequest) returns (ShutdownResponse);
rpc Stats(StatsRequest) returns (StatsResponse);
rpc SystemStat(google.protobuf.Empty) returns (SystemStatResponse);
rpc Upgrade(UpgradeRequest) returns (UpgradeResponse);
@ -258,6 +258,11 @@ message Shutdown {
common.Metadata metadata = 1;
}
message ShutdownRequest {
// Force indicates whether node should shutdown without first cordening and draining
bool force = 1;
}
message ShutdownResponse {
repeated Shutdown messages = 1;
}

View File

@ -13,6 +13,10 @@ import (
"github.com/talos-systems/talos/pkg/machinery/client"
)
var shutdownCmdFlags struct {
force bool
}
// shutdownCmd represents the shutdown command.
var shutdownCmd = &cobra.Command{
Use: "shutdown",
@ -21,7 +25,7 @@ var shutdownCmd = &cobra.Command{
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
return WithClient(func(ctx context.Context, c *client.Client) error {
if err := c.Shutdown(ctx); err != nil {
if err := c.Shutdown(ctx, client.WithShutdownForce(shutdownCmdFlags.force)); err != nil {
return fmt.Errorf("error executing shutdown: %s", err)
}
@ -31,5 +35,6 @@ var shutdownCmd = &cobra.Command{
}
func init() {
shutdownCmd.Flags().BoolVar(&shutdownCmdFlags.force, "force", false, "if true, force a node to shutdown without a cordon/drain")
addCommand(shutdownCmd)
}

View File

@ -375,7 +375,7 @@ func (s *Server) Bootstrap(ctx context.Context, in *machine.BootstrapRequest) (r
// Shutdown implements the machine.MachineServer interface.
//
//nolint:dupl
func (s *Server) Shutdown(ctx context.Context, in *emptypb.Empty) (reply *machine.ShutdownResponse, err error) {
func (s *Server) Shutdown(ctx context.Context, in *machine.ShutdownRequest) (reply *machine.ShutdownResponse, err error) {
log.Printf("shutdown via API received")
if err = s.checkSupported(runtime.Shutdown); err != nil {

View File

@ -108,7 +108,7 @@ type Sequencer interface {
Install(Runtime) []Phase
Reboot(Runtime) []Phase
Reset(Runtime, ResetOptions) []Phase
Shutdown(Runtime) []Phase
Shutdown(Runtime, *machine.ShutdownRequest) []Phase
StageUpgrade(Runtime, *machine.UpgradeRequest) []Phase
Upgrade(Runtime, *machine.UpgradeRequest) []Phase
}

View File

@ -391,7 +391,16 @@ func (c *Controller) phases(seq runtime.Sequence, data interface{}) ([]runtime.P
case runtime.SequenceInstall:
phases = c.s.Install(c.r)
case runtime.SequenceShutdown:
phases = c.s.Shutdown(c.r)
var (
in *machine.ShutdownRequest
ok bool
)
if in, ok = data.(*machine.ShutdownRequest); !ok {
return nil, runtime.ErrInvalidSequenceData
}
phases = c.s.Shutdown(c.r, in)
case runtime.SequenceReboot:
phases = c.s.Reboot(c.r)
case runtime.SequenceUpgrade:

View File

@ -327,9 +327,12 @@ func (*Sequencer) Reset(r runtime.Runtime, in runtime.ResetOptions) []runtime.Ph
}
// Shutdown is the shutdown sequence.
func (*Sequencer) Shutdown(r runtime.Runtime) []runtime.Phase {
phases := PhaseList{}.
Append(
func (*Sequencer) Shutdown(r runtime.Runtime, in *machineapi.ShutdownRequest) []runtime.Phase {
phases := PhaseList{}.AppendWhen(
!in.GetForce(),
"drain",
CordonAndDrainNode,
).Append(
"cleanup",
StopAllPods,
).

File diff suppressed because it is too large Load Diff

View File

@ -74,7 +74,7 @@ type MachineServiceClient interface {
ServiceRestart(ctx context.Context, in *ServiceRestartRequest, opts ...grpc.CallOption) (*ServiceRestartResponse, error)
ServiceStart(ctx context.Context, in *ServiceStartRequest, opts ...grpc.CallOption) (*ServiceStartResponse, error)
ServiceStop(ctx context.Context, in *ServiceStopRequest, opts ...grpc.CallOption) (*ServiceStopResponse, error)
Shutdown(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ShutdownResponse, error)
Shutdown(ctx context.Context, in *ShutdownRequest, opts ...grpc.CallOption) (*ShutdownResponse, error)
Stats(ctx context.Context, in *StatsRequest, opts ...grpc.CallOption) (*StatsResponse, error)
SystemStat(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*SystemStatResponse, error)
Upgrade(ctx context.Context, in *UpgradeRequest, opts ...grpc.CallOption) (*UpgradeResponse, error)
@ -629,7 +629,7 @@ func (c *machineServiceClient) ServiceStop(ctx context.Context, in *ServiceStopR
return out, nil
}
func (c *machineServiceClient) Shutdown(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ShutdownResponse, error) {
func (c *machineServiceClient) Shutdown(ctx context.Context, in *ShutdownRequest, opts ...grpc.CallOption) (*ShutdownResponse, error) {
out := new(ShutdownResponse)
err := c.cc.Invoke(ctx, "/machine.MachineService/Shutdown", in, out, opts...)
if err != nil {
@ -735,7 +735,7 @@ type MachineServiceServer interface {
ServiceRestart(context.Context, *ServiceRestartRequest) (*ServiceRestartResponse, error)
ServiceStart(context.Context, *ServiceStartRequest) (*ServiceStartResponse, error)
ServiceStop(context.Context, *ServiceStopRequest) (*ServiceStopResponse, error)
Shutdown(context.Context, *emptypb.Empty) (*ShutdownResponse, error)
Shutdown(context.Context, *ShutdownRequest) (*ShutdownResponse, error)
Stats(context.Context, *StatsRequest) (*StatsResponse, error)
SystemStat(context.Context, *emptypb.Empty) (*SystemStatResponse, error)
Upgrade(context.Context, *UpgradeRequest) (*UpgradeResponse, error)
@ -884,7 +884,7 @@ func (UnimplementedMachineServiceServer) ServiceStop(context.Context, *ServiceSt
return nil, status.Errorf(codes.Unimplemented, "method ServiceStop not implemented")
}
func (UnimplementedMachineServiceServer) Shutdown(context.Context, *emptypb.Empty) (*ShutdownResponse, error) {
func (UnimplementedMachineServiceServer) Shutdown(context.Context, *ShutdownRequest) (*ShutdownResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Shutdown not implemented")
}
@ -1568,7 +1568,7 @@ func _MachineService_ServiceStop_Handler(srv interface{}, ctx context.Context, d
}
func _MachineService_Shutdown_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(emptypb.Empty)
in := new(ShutdownRequest)
if err := dec(in); err != nil {
return nil, err
}
@ -1580,7 +1580,7 @@ func _MachineService_Shutdown_Handler(srv interface{}, ctx context.Context, dec
FullMethod: "/machine.MachineService/Shutdown",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(MachineServiceServer).Shutdown(ctx, req.(*emptypb.Empty))
return srv.(MachineServiceServer).Shutdown(ctx, req.(*ShutdownRequest))
}
return interceptor(ctx, in, info, handler)
}

View File

@ -1294,6 +1294,49 @@ func (m *Shutdown) MarshalToSizedBufferVT(dAtA []byte) (int, error) {
return len(dAtA) - i, nil
}
func (m *ShutdownRequest) MarshalVT() (dAtA []byte, err error) {
if m == nil {
return nil, nil
}
size := m.SizeVT()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBufferVT(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *ShutdownRequest) MarshalToVT(dAtA []byte) (int, error) {
size := m.SizeVT()
return m.MarshalToSizedBufferVT(dAtA[:size])
}
func (m *ShutdownRequest) MarshalToSizedBufferVT(dAtA []byte) (int, error) {
if m == nil {
return 0, nil
}
i := len(dAtA)
_ = i
var l int
_ = l
if m.unknownFields != nil {
i -= len(m.unknownFields)
copy(dAtA[i:], m.unknownFields)
}
if m.Force {
i--
if m.Force {
dAtA[i] = 1
} else {
dAtA[i] = 0
}
i--
dAtA[i] = 0x8
}
return len(dAtA) - i, nil
}
func (m *ShutdownResponse) MarshalVT() (dAtA []byte, err error) {
if m == nil {
return nil, nil
@ -8407,6 +8450,21 @@ func (m *Shutdown) SizeVT() (n int) {
return n
}
func (m *ShutdownRequest) SizeVT() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Force {
n += 2
}
if m.unknownFields != nil {
n += len(m.unknownFields)
}
return n
}
func (m *ShutdownResponse) SizeVT() (n int) {
if m == nil {
return 0
@ -13845,6 +13903,78 @@ func (m *Shutdown) UnmarshalVT(dAtA []byte) error {
return nil
}
func (m *ShutdownRequest) UnmarshalVT(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflow
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: ShutdownRequest: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: ShutdownRequest: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Force", wireType)
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflow
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
m.Force = bool(v != 0)
default:
iNdEx = preIndex
skippy, err := skip(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLength
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *ShutdownResponse) UnmarshalVT(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0

View File

@ -531,9 +531,25 @@ func (c *Client) Bootstrap(ctx context.Context, req *machineapi.BootstrapRequest
return
}
// ShutdownOption provides shutdown API options.
type ShutdownOption func(*machineapi.ShutdownRequest)
// WithShutdownForce forces the shutdown even if the Kubernetes API is down.
func WithShutdownForce(force bool) ShutdownOption {
return func(req *machineapi.ShutdownRequest) {
req.Force = force
}
}
// Shutdown implements the proto.MachineServiceClient interface.
func (c *Client) Shutdown(ctx context.Context) (err error) {
resp, err := c.MachineClient.Shutdown(ctx, &emptypb.Empty{})
func (c *Client) Shutdown(ctx context.Context, opts ...ShutdownOption) (err error) {
var req machineapi.ShutdownRequest
for _, opt := range opts {
opt(&req)
}
resp, err := c.MachineClient.Shutdown(ctx, &req)
if err == nil {
_, err = FilterMessages(resp, err)

View File

@ -141,6 +141,7 @@ description: Talos gRPC API reference.
- [ServiceStopRequest](#machine.ServiceStopRequest)
- [ServiceStopResponse](#machine.ServiceStopResponse)
- [Shutdown](#machine.Shutdown)
- [ShutdownRequest](#machine.ShutdownRequest)
- [ShutdownResponse](#machine.ShutdownResponse)
- [SoftIRQStat](#machine.SoftIRQStat)
- [Stat](#machine.Stat)
@ -2385,6 +2386,21 @@ The messages message containing the shutdown status.
<a name="machine.ShutdownRequest"></a>
### ShutdownRequest
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| force | [bool](#bool) | | Force indicates whether node should shutdown without first cordening and draining |
<a name="machine.ShutdownResponse"></a>
### ShutdownResponse
@ -2813,7 +2829,7 @@ This method is available only on control plane nodes (which run etcd). |
| ServiceRestart | [ServiceRestartRequest](#machine.ServiceRestartRequest) | [ServiceRestartResponse](#machine.ServiceRestartResponse) | |
| ServiceStart | [ServiceStartRequest](#machine.ServiceStartRequest) | [ServiceStartResponse](#machine.ServiceStartResponse) | |
| ServiceStop | [ServiceStopRequest](#machine.ServiceStopRequest) | [ServiceStopResponse](#machine.ServiceStopResponse) | |
| Shutdown | [.google.protobuf.Empty](#google.protobuf.Empty) | [ShutdownResponse](#machine.ShutdownResponse) | |
| Shutdown | [ShutdownRequest](#machine.ShutdownRequest) | [ShutdownResponse](#machine.ShutdownResponse) | |
| Stats | [StatsRequest](#machine.StatsRequest) | [StatsResponse](#machine.StatsResponse) | |
| SystemStat | [.google.protobuf.Empty](#google.protobuf.Empty) | [SystemStatResponse](#machine.SystemStatResponse) | |
| Upgrade | [UpgradeRequest](#machine.UpgradeRequest) | [UpgradeResponse](#machine.UpgradeResponse) | |

View File

@ -1823,6 +1823,7 @@ talosctl shutdown [flags]
### Options
```
--force if true, force a node to shutdown without a cordon/drain
-h, --help help for shutdown
```