From ee24e423196e8a87e036aa78c8baf74180bbdd1c Mon Sep 17 00:00:00 2001 From: Brad Beam Date: Fri, 25 Oct 2019 19:08:55 +0000 Subject: [PATCH] feat: Add time api to apid This extends apid to cover the time api. Signed-off-by: Brad Beam --- api/api.pb.go | 169 ++++++++++++++++-- api/api.proto | 1 + api/time/time.pb.go | 126 +++++++++---- api/time/time.proto | 14 +- cmd/osctl/cmd/time.go | 39 ++-- cmd/osctl/pkg/client/client.go | 14 +- internal/app/apid/main.go | 6 + .../app/machined/pkg/system/services/ntpd.go | 4 +- internal/app/ntpd/main.go | 2 +- internal/app/ntpd/pkg/reg/reg.go | 10 +- internal/app/ntpd/pkg/reg/reg_test.go | 8 +- pkg/constants/constants.go | 4 +- 12 files changed, 310 insertions(+), 87 deletions(-) diff --git a/api/api.pb.go b/api/api.pb.go index 3ff6abf57..93bf0c67e 100644 --- a/api/api.pb.go +++ b/api/api.pb.go @@ -20,6 +20,7 @@ import ( common "github.com/talos-systems/talos/api/common" machine "github.com/talos-systems/talos/api/machine" os "github.com/talos-systems/talos/api/os" + time "github.com/talos-systems/talos/api/time" constants "github.com/talos-systems/talos/pkg/constants" tls "github.com/talos-systems/talos/pkg/grpc/tls" ) @@ -215,6 +216,15 @@ type VersionReply = machine.VersionReply // VersionInfo from public import machine/machine.proto type VersionInfo = machine.VersionInfo +// TimeRequest from public import time/time.proto +type TimeRequest = time.TimeRequest + +// TimeReply from public import time/time.proto +type TimeReply = time.TimeReply + +// TimeResponse from public import time/time.proto +type TimeResponse = time.TimeResponse + // NodeMetadata from public import common/common.proto type NodeMetadata = common.NodeMetadata @@ -224,16 +234,17 @@ type Empty = empty.Empty func init() { proto.RegisterFile("api.proto", fileDescriptor_00212fb1f9d3bf1c) } var fileDescriptor_00212fb1f9d3bf1c = []byte{ - // 140 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x2c, 0x8b, 0xb1, 0x0a, 0xc2, 0x30, - 0x14, 0x45, 0xd5, 0x82, 0xa0, 0x6e, 0x8a, 0x4b, 0xdd, 0xc4, 0xd5, 0xbe, 0xc1, 0x3f, 0xf0, 0x0b, - 0x32, 0xbb, 0xa5, 0x21, 0xa6, 0x81, 0xbe, 0xde, 0x87, 0xef, 0x75, 0xe8, 0xdf, 0x0b, 0x4d, 0xa6, - 0xc3, 0x39, 0x97, 0x7b, 0x3c, 0x78, 0xc9, 0x9d, 0xfc, 0x60, 0x38, 0x37, 0x5e, 0x72, 0x7b, 0x82, - 0x12, 0xb4, 0x94, 0xf6, 0xca, 0x3e, 0x0c, 0x79, 0x8a, 0x54, 0x59, 0xf3, 0x25, 0x80, 0x19, 0x13, - 0x15, 0xd4, 0x78, 0x4b, 0x40, 0x1a, 0x23, 0xad, 0xd6, 0xcf, 0x5f, 0x8a, 0x2c, 0xb6, 0x94, 0xf1, - 0xfd, 0xf8, 0xdc, 0x53, 0xb6, 0x61, 0xee, 0xbb, 0x00, 0x26, 0xf3, 0x23, 0xf4, 0xa9, 0x8b, 0x5a, - 0x64, 0x2d, 0x46, 0x5e, 0xb2, 0xdb, 0xb8, 0xad, 0xdb, 0xb9, 0xa6, 0xdf, 0xaf, 0xa7, 0xd7, 0x3f, - 0x00, 0x00, 0xff, 0xff, 0x96, 0x64, 0x43, 0x69, 0x9c, 0x00, 0x00, 0x00, + // 151 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x2c, 0xcb, 0x31, 0xce, 0xc2, 0x30, + 0x0c, 0x05, 0xe0, 0xff, 0xa7, 0x08, 0x09, 0x18, 0x90, 0x40, 0x2c, 0x65, 0x43, 0xac, 0xd4, 0x03, + 0x37, 0xe0, 0x04, 0x99, 0xd9, 0x92, 0x28, 0xa4, 0x96, 0xea, 0xda, 0xc2, 0xee, 0xd0, 0xdb, 0x23, + 0x9a, 0x2c, 0x7e, 0x7a, 0x9f, 0xf5, 0x76, 0x5b, 0x2f, 0xd8, 0xc9, 0x87, 0x8d, 0x8f, 0x8d, 0x17, + 0x6c, 0xf7, 0xac, 0xc0, 0x5a, 0xa4, 0x3d, 0x93, 0x8f, 0x3d, 0x8e, 0x09, 0x6a, 0x56, 0x3e, 0x18, + 0x52, 0x82, 0xdf, 0xa9, 0x70, 0x8a, 0x4c, 0xc4, 0x23, 0x94, 0xa8, 0x78, 0xc9, 0xcc, 0x79, 0x48, + 0xb0, 0xb4, 0x30, 0xbd, 0x21, 0x91, 0xd8, 0x5c, 0x9e, 0xcf, 0xdb, 0xeb, 0x9a, 0xd1, 0xfa, 0x29, + 0x74, 0x91, 0x09, 0xcc, 0x0f, 0xac, 0x77, 0x9d, 0xd5, 0x12, 0x69, 0x69, 0xe0, 0x05, 0xdd, 0x9f, + 0xfb, 0x77, 0x2b, 0xd7, 0xb8, 0x75, 0xd8, 0x2c, 0xb3, 0xc7, 0x37, 0x00, 0x00, 0xff, 0xff, 0xf0, + 0x38, 0xc4, 0xa9, 0xaf, 0x00, 0x00, 0x00, } type ApiProxy struct { @@ -479,6 +490,30 @@ func (p *ApiProxy) Proxy(ctx context.Context, method string, creds credentials.T resp.Response = append(resp.Response, msg.(*machine.VersionReply).Response[0]) } response = resp + case "/time.Time/Time": + // Initialize target clients + clients, err := createTimeClient(targets, creds, proxyMd) + if err != nil { + break + } + resp := &time.TimeReply{} + msgs, err = proxyTimeRunner(clients, in, proxyTime) + for _, msg := range msgs { + resp.Response = append(resp.Response, msg.(*time.TimeReply).Response[0]) + } + response = resp + case "/time.Time/TimeCheck": + // Initialize target clients + clients, err := createTimeClient(targets, creds, proxyMd) + if err != nil { + break + } + resp := &time.TimeReply{} + msgs, err = proxyTimeRunner(clients, in, proxyTimeCheck) + for _, msg := range msgs { + resp.Response = append(resp.Response, msg.(*time.TimeReply).Response[0]) + } + response = resp } @@ -752,6 +787,62 @@ func proxyVersion(client *proxyMachineClient, in interface{}, wg *sync.WaitGroup respCh <- resp } +type runnerTimeFn func(*proxyTimeClient, interface{}, *sync.WaitGroup, chan proto.Message, chan error) + +func proxyTimeRunner(clients []*proxyTimeClient, in interface{}, runner runnerTimeFn) ([]proto.Message, error) { + var ( + errors *go_multierror.Error + wg sync.WaitGroup + ) + respCh := make(chan proto.Message, len(clients)) + errCh := make(chan error, len(clients)) + wg.Add(len(clients)) + for _, client := range clients { + go runner(client, in, &wg, respCh, errCh) + } + wg.Wait() + close(respCh) + close(errCh) + + var response []proto.Message + for resp := range respCh { + response = append(response, resp) + } + for err := range errCh { + errors = go_multierror.Append(errors, err) + } + return response, errors.ErrorOrNil() +} + +type proxyTimeClient struct { + Conn time.TimeClient + Context context.Context + Target string + DialOpts []grpc.DialOption +} + +func proxyTime(client *proxyTimeClient, in interface{}, wg *sync.WaitGroup, respCh chan proto.Message, errCh chan error) { + defer wg.Done() + resp, err := client.Conn.Time(client.Context, in.(*empty.Empty)) + if err != nil { + errCh <- err + return + } + resp.Response[0].Metadata = &NodeMetadata{Hostname: client.Target} + respCh <- resp +} + +func proxyTimeCheck(client *proxyTimeClient, in interface{}, wg *sync.WaitGroup, respCh chan proto.Message, errCh chan error) { + defer wg.Done() + resp, err := client.Conn.TimeCheck(client.Context, in.(*time.TimeRequest)) + if err != nil { + errCh <- err + return + } + resp.Response[0].Metadata = &NodeMetadata{Hostname: client.Target} + respCh <- resp +} + func createOSClient(targets []string, creds credentials.TransportCredentials, proxyMd metadata.MD) ([]*proxyOSClient, error) { var errors *go_multierror.Error clients := make([]*proxyOSClient, 0, len(targets)) @@ -800,14 +891,40 @@ func createMachineClient(targets []string, creds credentials.TransportCredential return clients, errors.ErrorOrNil() } +func createTimeClient(targets []string, creds credentials.TransportCredentials, proxyMd metadata.MD) ([]*proxyTimeClient, error) { + var errors *go_multierror.Error + clients := make([]*proxyTimeClient, 0, len(targets)) + for _, target := range targets { + c := &proxyTimeClient{ + // TODO change the context to be more useful ( ex cancelable ) + Context: metadata.NewOutgoingContext(context.Background(), proxyMd), + Target: target, + } + // TODO: i think we potentially leak a client here, + // we should close the request // cancel the context if it errors + // Explicitly set OSD port + conn, err := grpc.Dial(fmt.Sprintf("%s:%d", target, 50000), grpc.WithTransportCredentials(creds)) + if err != nil { + // TODO: probably worth wrapping err to add some context about the target + errors = go_multierror.Append(errors, err) + continue + } + c.Conn = time.NewTimeClient(conn) + clients = append(clients, c) + } + return clients, errors.ErrorOrNil() +} + type Registrator struct { os.OSClient machine.MachineClient + time.TimeClient } func (r *Registrator) Register(s *grpc.Server) { os.RegisterOSServer(s, r) machine.RegisterMachineServer(s, r) + time.RegisterTimeServer(s, r) } func (r *Registrator) Containers(ctx context.Context, in *os.ContainersRequest) (*os.ContainersReply, error) { @@ -909,6 +1026,14 @@ func (r *Registrator) Version(ctx context.Context, in *empty.Empty) (*machine.Ve return r.MachineClient.Version(ctx, in) } +func (r *Registrator) Time(ctx context.Context, in *empty.Empty) (*time.TimeReply, error) { + return r.TimeClient.Time(ctx, in) +} + +func (r *Registrator) TimeCheck(ctx context.Context, in *time.TimeRequest) (*time.TimeReply, error) { + return r.TimeClient.TimeCheck(ctx, in) +} + type LocalOSClient struct { os.OSClient } @@ -1024,3 +1149,27 @@ func (c *LocalMachineClient) Stop(ctx context.Context, in *machine.StopRequest, func (c *LocalMachineClient) Version(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*machine.VersionReply, error) { return c.MachineClient.Version(ctx, in, opts...) } + +type LocalTimeClient struct { + time.TimeClient +} + +func NewLocalTimeClient() (time.TimeClient, error) { + conn, err := grpc.Dial("unix:"+constants.TimeSocketPath, + grpc.WithInsecure(), + ) + if err != nil { + return nil, err + } + return &LocalTimeClient{ + TimeClient: time.NewTimeClient(conn), + }, nil +} + +func (c *LocalTimeClient) Time(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*time.TimeReply, error) { + return c.TimeClient.Time(ctx, in, opts...) +} + +func (c *LocalTimeClient) TimeCheck(ctx context.Context, in *time.TimeRequest, opts ...grpc.CallOption) (*time.TimeReply, error) { + return c.TimeClient.TimeCheck(ctx, in, opts...) +} diff --git a/api/api.proto b/api/api.proto index 52ceb56d0..2ea0fe5fc 100644 --- a/api/api.proto +++ b/api/api.proto @@ -6,5 +6,6 @@ option go_package = "github.com/talos-systems/talos/api"; import public "os/os.proto"; import public "machine/machine.proto"; +import public "time/time.proto"; import public "common/common.proto"; import public "google/protobuf/empty.proto"; diff --git a/api/time/time.pb.go b/api/time/time.pb.go index e6ce7d5d0..3612f0ec9 100644 --- a/api/time/time.pb.go +++ b/api/time/time.pb.go @@ -12,6 +12,8 @@ import ( empty "github.com/golang/protobuf/ptypes/empty" timestamp "github.com/golang/protobuf/ptypes/timestamp" grpc "google.golang.org/grpc" + + common "github.com/talos-systems/talos/api/common" ) // Reference imports to suppress errors if they are not otherwise used. @@ -71,12 +73,10 @@ func (m *TimeRequest) GetServer() string { // The response message containing the ntp server, time, and offset type TimeReply struct { - Server string `protobuf:"bytes,1,opt,name=server,proto3" json:"server,omitempty"` - Localtime *timestamp.Timestamp `protobuf:"bytes,2,opt,name=localtime,proto3" json:"localtime,omitempty"` - Remotetime *timestamp.Timestamp `protobuf:"bytes,3,opt,name=remotetime,proto3" json:"remotetime,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + Response []*TimeResponse `protobuf:"bytes,1,rep,name=response,proto3" json:"response,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *TimeReply) Reset() { *m = TimeReply{} } @@ -108,21 +108,74 @@ func (m *TimeReply) XXX_DiscardUnknown() { var xxx_messageInfo_TimeReply proto.InternalMessageInfo -func (m *TimeReply) GetServer() string { +func (m *TimeReply) GetResponse() []*TimeResponse { + if m != nil { + return m.Response + } + return nil +} + +type TimeResponse struct { + Metadata *common.NodeMetadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"` + Server string `protobuf:"bytes,2,opt,name=server,proto3" json:"server,omitempty"` + Localtime *timestamp.Timestamp `protobuf:"bytes,3,opt,name=localtime,proto3" json:"localtime,omitempty"` + Remotetime *timestamp.Timestamp `protobuf:"bytes,4,opt,name=remotetime,proto3" json:"remotetime,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *TimeResponse) Reset() { *m = TimeResponse{} } +func (m *TimeResponse) String() string { return proto.CompactTextString(m) } +func (*TimeResponse) ProtoMessage() {} +func (*TimeResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_e7ed1ef5b20ef4ce, []int{2} +} + +func (m *TimeResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_TimeResponse.Unmarshal(m, b) +} + +func (m *TimeResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_TimeResponse.Marshal(b, m, deterministic) +} + +func (m *TimeResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_TimeResponse.Merge(m, src) +} + +func (m *TimeResponse) XXX_Size() int { + return xxx_messageInfo_TimeResponse.Size(m) +} + +func (m *TimeResponse) XXX_DiscardUnknown() { + xxx_messageInfo_TimeResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_TimeResponse proto.InternalMessageInfo + +func (m *TimeResponse) GetMetadata() *common.NodeMetadata { + if m != nil { + return m.Metadata + } + return nil +} + +func (m *TimeResponse) GetServer() string { if m != nil { return m.Server } return "" } -func (m *TimeReply) GetLocaltime() *timestamp.Timestamp { +func (m *TimeResponse) GetLocaltime() *timestamp.Timestamp { if m != nil { return m.Localtime } return nil } -func (m *TimeReply) GetRemotetime() *timestamp.Timestamp { +func (m *TimeResponse) GetRemotetime() *timestamp.Timestamp { if m != nil { return m.Remotetime } @@ -130,31 +183,36 @@ func (m *TimeReply) GetRemotetime() *timestamp.Timestamp { } func init() { - proto.RegisterType((*TimeRequest)(nil), "timeapi.TimeRequest") - proto.RegisterType((*TimeReply)(nil), "timeapi.TimeReply") + proto.RegisterType((*TimeRequest)(nil), "time.TimeRequest") + proto.RegisterType((*TimeReply)(nil), "time.TimeReply") + proto.RegisterType((*TimeResponse)(nil), "time.TimeResponse") } func init() { proto.RegisterFile("time/time.proto", fileDescriptor_e7ed1ef5b20ef4ce) } var fileDescriptor_e7ed1ef5b20ef4ce = []byte{ - // 272 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x91, 0xc1, 0x4b, 0xc3, 0x30, - 0x14, 0xc6, 0x89, 0xca, 0xc6, 0x32, 0x41, 0x08, 0x32, 0x46, 0x3d, 0x38, 0x06, 0xe2, 0x2e, 0x26, - 0x50, 0x11, 0xc4, 0x9b, 0x15, 0xef, 0x52, 0x76, 0xf2, 0x96, 0x96, 0x67, 0x17, 0x4c, 0x48, 0xd6, - 0xbc, 0x0a, 0xfd, 0x53, 0xfc, 0x6f, 0x25, 0x49, 0xc7, 0x64, 0x2a, 0x5e, 0x92, 0xbc, 0x7c, 0xbf, - 0x8f, 0x7c, 0xef, 0x85, 0x9e, 0xa1, 0x32, 0x20, 0xc2, 0xc2, 0x5d, 0x6b, 0xd1, 0xb2, 0x71, 0x38, - 0x4b, 0xa7, 0xb2, 0x8b, 0xc6, 0xda, 0x46, 0x83, 0x88, 0xd7, 0x55, 0xf7, 0x26, 0xc0, 0x38, 0xec, - 0x13, 0x95, 0x5d, 0x1e, 0x8a, 0xc1, 0xe5, 0x51, 0x1a, 0x97, 0x80, 0xe5, 0x15, 0x9d, 0xae, 0x95, - 0x81, 0x12, 0xb6, 0x1d, 0x78, 0x64, 0x33, 0x3a, 0xf2, 0xd0, 0x7e, 0x40, 0x3b, 0x27, 0x0b, 0xb2, - 0x9a, 0x94, 0x43, 0xb5, 0xfc, 0x24, 0x74, 0x92, 0x38, 0xa7, 0xfb, 0xbf, 0x28, 0x76, 0x4f, 0x27, - 0xda, 0xd6, 0x52, 0x87, 0x47, 0xe6, 0x47, 0x0b, 0xb2, 0x9a, 0xe6, 0x19, 0x4f, 0x09, 0xf8, 0x2e, - 0x01, 0x5f, 0xef, 0x12, 0x94, 0x7b, 0x98, 0x3d, 0x50, 0xda, 0x82, 0xb1, 0x08, 0xd1, 0x7a, 0xfc, - 0xaf, 0xf5, 0x1b, 0x9d, 0x6f, 0xe9, 0x49, 0x10, 0x58, 0x3e, 0xec, 0xb3, 0x1f, 0xbe, 0xe7, 0x30, - 0x91, 0x8c, 0xf1, 0x61, 0x64, 0x7c, 0xdf, 0xc9, 0x5d, 0x6a, 0xeb, 0x69, 0x03, 0xf5, 0x3b, 0x3b, - 0x3f, 0x00, 0xe2, 0x48, 0x7e, 0xb3, 0x15, 0x05, 0x3d, 0xad, 0xad, 0x89, 0x02, 0x97, 0x4e, 0x15, - 0xe3, 0x20, 0x3d, 0x3a, 0xf5, 0x42, 0x5e, 0xaf, 0x1b, 0x85, 0x9b, 0xae, 0xe2, 0xb5, 0x35, 0x02, - 0xa5, 0xb6, 0xfe, 0xc6, 0xf7, 0x1e, 0xc1, 0xf8, 0x54, 0x09, 0xe9, 0x54, 0xfc, 0x84, 0x6a, 0x14, - 0xe3, 0xdd, 0x7e, 0x05, 0x00, 0x00, 0xff, 0xff, 0x3a, 0x53, 0xb0, 0xfa, 0xda, 0x01, 0x00, 0x00, + // 332 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x92, 0xc1, 0x4e, 0x32, 0x31, + 0x14, 0x85, 0x33, 0x3f, 0x84, 0x1f, 0x2e, 0x24, 0xc4, 0x6a, 0x08, 0x19, 0x17, 0x12, 0x12, 0x23, + 0x1b, 0xa7, 0x06, 0x37, 0x46, 0x57, 0x62, 0x5c, 0x6a, 0xcc, 0xc4, 0x95, 0xbb, 0x32, 0x5c, 0xa1, + 0x71, 0xca, 0xad, 0xd3, 0x62, 0xc2, 0x4b, 0xfa, 0x4c, 0xa6, 0xed, 0x88, 0x8d, 0x2c, 0xdc, 0x4c, + 0xa7, 0xf7, 0x9c, 0x33, 0x3d, 0xf9, 0xa6, 0xd0, 0xb7, 0x52, 0x21, 0x77, 0x8f, 0x4c, 0x57, 0x64, + 0x89, 0x35, 0xdd, 0x7b, 0x7a, 0xbc, 0x24, 0x5a, 0x96, 0xc8, 0xfd, 0x6c, 0xbe, 0x79, 0xe5, 0xa8, + 0xb4, 0xdd, 0x06, 0x4b, 0x7a, 0xf2, 0x5b, 0x74, 0x11, 0x63, 0x85, 0xd2, 0xb5, 0xe1, 0xb0, 0x20, + 0xa5, 0x68, 0xcd, 0xc3, 0x12, 0x86, 0xe3, 0x53, 0xe8, 0x3e, 0x4b, 0x85, 0x39, 0xbe, 0x6f, 0xd0, + 0x58, 0x36, 0x80, 0x96, 0xc1, 0xea, 0x03, 0xab, 0x61, 0x32, 0x4a, 0x26, 0x9d, 0xbc, 0xde, 0x8d, + 0x6f, 0xa0, 0x13, 0x6c, 0xba, 0xdc, 0xb2, 0x0c, 0xda, 0x15, 0x1a, 0x4d, 0x6b, 0x83, 0xc3, 0x64, + 0xd4, 0x98, 0x74, 0xa7, 0x2c, 0xf3, 0x5d, 0x83, 0x25, 0x28, 0xf9, 0xce, 0x33, 0xfe, 0x4c, 0xa0, + 0x17, 0x4b, 0xec, 0x02, 0xda, 0x0a, 0xad, 0x58, 0x08, 0x2b, 0xfc, 0x39, 0xdd, 0xe9, 0x51, 0x56, + 0xb7, 0x7a, 0xa4, 0x05, 0x3e, 0xd4, 0x5a, 0xbe, 0x73, 0x45, 0xbd, 0xfe, 0xc5, 0xbd, 0xd8, 0x15, + 0x74, 0x4a, 0x2a, 0x44, 0xe9, 0x8e, 0x1f, 0x36, 0xfc, 0xa7, 0xd2, 0x2c, 0x80, 0xc8, 0xbe, 0x41, + 0xf8, 0x5a, 0x1e, 0x44, 0xfe, 0x63, 0x66, 0xd7, 0x00, 0x15, 0x2a, 0xb2, 0xe8, 0xa3, 0xcd, 0x3f, + 0xa3, 0x91, 0x7b, 0xba, 0x82, 0xa6, 0x13, 0x18, 0xaf, 0xd7, 0xc1, 0x5e, 0xee, 0xde, 0xfd, 0x98, + 0xb4, 0x1f, 0x63, 0x71, 0xe4, 0x78, 0xc0, 0x78, 0xb7, 0xc2, 0xe2, 0x8d, 0x1d, 0xc4, 0xaa, 0xc7, + 0xbf, 0x17, 0x98, 0xcd, 0xa0, 0x57, 0x90, 0x0a, 0x53, 0xa1, 0xe5, 0xec, 0xbf, 0x93, 0x6e, 0xb5, + 0x7c, 0x4a, 0x5e, 0xce, 0x96, 0xd2, 0xae, 0x36, 0x73, 0x07, 0x8e, 0x5b, 0x51, 0x92, 0x39, 0x37, + 0x5b, 0x63, 0x51, 0x99, 0xb0, 0xe3, 0x42, 0x4b, 0x7f, 0x05, 0xe6, 0x2d, 0xdf, 0xea, 0xf2, 0x2b, + 0x00, 0x00, 0xff, 0xff, 0xa7, 0x62, 0x03, 0x24, 0x55, 0x02, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -183,7 +241,7 @@ func NewTimeClient(cc *grpc.ClientConn) TimeClient { func (c *timeClient) Time(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*TimeReply, error) { out := new(TimeReply) - err := c.cc.Invoke(ctx, "/timeapi.Time/Time", in, out, opts...) + err := c.cc.Invoke(ctx, "/time.Time/Time", in, out, opts...) if err != nil { return nil, err } @@ -192,7 +250,7 @@ func (c *timeClient) Time(ctx context.Context, in *empty.Empty, opts ...grpc.Cal func (c *timeClient) TimeCheck(ctx context.Context, in *TimeRequest, opts ...grpc.CallOption) (*TimeReply, error) { out := new(TimeReply) - err := c.cc.Invoke(ctx, "/timeapi.Time/TimeCheck", in, out, opts...) + err := c.cc.Invoke(ctx, "/time.Time/TimeCheck", in, out, opts...) if err != nil { return nil, err } @@ -219,7 +277,7 @@ func _Time_Time_Handler(srv interface{}, ctx context.Context, dec func(interface } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/timeapi.Time/Time", + FullMethod: "/time.Time/Time", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(TimeServer).Time(ctx, req.(*empty.Empty)) @@ -237,7 +295,7 @@ func _Time_TimeCheck_Handler(srv interface{}, ctx context.Context, dec func(inte } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/timeapi.Time/TimeCheck", + FullMethod: "/time.Time/TimeCheck", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(TimeServer).TimeCheck(ctx, req.(*TimeRequest)) @@ -246,7 +304,7 @@ func _Time_TimeCheck_Handler(srv interface{}, ctx context.Context, dec func(inte } var _Time_serviceDesc = grpc.ServiceDesc{ - ServiceName: "timeapi.Time", + ServiceName: "time.Time", HandlerType: (*TimeServer)(nil), Methods: []grpc.MethodDesc{ { diff --git a/api/time/time.proto b/api/time/time.proto index f43afecb9..7a0e0238c 100644 --- a/api/time/time.proto +++ b/api/time/time.proto @@ -1,6 +1,6 @@ syntax = "proto3"; -package timeapi; +package time; option go_package = "github.com/talos-systems/talos/api/time"; option java_multiple_files = true; @@ -9,6 +9,7 @@ option java_package = "com.time.api"; import "google/protobuf/empty.proto"; import "google/protobuf/timestamp.proto"; +import "common/common.proto"; // The time service definition. service Time { @@ -21,7 +22,12 @@ message TimeRequest { string server = 1; } // The response message containing the ntp server, time, and offset message TimeReply { - string server = 1; - google.protobuf.Timestamp localtime = 2; - google.protobuf.Timestamp remotetime = 3; + repeated TimeResponse response = 1; +} + +message TimeResponse { + common.NodeMetadata metadata = 1; + string server = 2; + google.protobuf.Timestamp localtime = 3; + google.protobuf.Timestamp remotetime = 4; } diff --git a/cmd/osctl/cmd/time.go b/cmd/osctl/cmd/time.go index f39c47b45..c36c26fea 100644 --- a/cmd/osctl/cmd/time.go +++ b/cmd/osctl/cmd/time.go @@ -30,32 +30,41 @@ var timeCmd = &cobra.Command{ helpers.Fatalf("failed to parse check flag: %w", err) } - var output *timeapi.TimeReply + var reply *timeapi.TimeReply if server == "" { - output, err = c.Time(globalCtx) + reply, err = c.Time(globalCtx) if err != nil { helpers.Fatalf("error fetching time: %s", err) } } else { - output, err = c.TimeCheck(globalCtx, server) + reply, err = c.TimeCheck(globalCtx, server) if err != nil { helpers.Fatalf("error fetching time: %s", err) } } - var localtime, remotetime time.Time - localtime, err = ptypes.Timestamp(output.Localtime) - if err != nil { - helpers.Fatalf("error parsing local time: %s", err) - } - remotetime, err = ptypes.Timestamp(output.Remotetime) - if err != nil { - helpers.Fatalf("error parsing remote time: %s", err) - } - w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', 0) - fmt.Fprintln(w, "NTP-SERVER\tLOCAL-TIME\tREMOTE-TIME") - fmt.Fprintf(w, "%s\t%s\t%s\n", output.Server, localtime.String(), remotetime.String()) + fmt.Fprintln(w, "NODE\tNTP-SERVER\tLOCAL-TIME\tREMOTE-TIME") + + var localtime, remotetime time.Time + for _, resp := range reply.Response { + node := "" + + if resp.Metadata != nil { + node = resp.Metadata.Hostname + } + + localtime, err = ptypes.Timestamp(resp.Localtime) + if err != nil { + helpers.Fatalf("error parsing local time: %s", err) + } + remotetime, err = ptypes.Timestamp(resp.Remotetime) + if err != nil { + helpers.Fatalf("error parsing remote time: %s", err) + } + + fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", node, resp.Server, localtime.String(), remotetime.String()) + } helpers.Should(w.Flush()) }) }, diff --git a/cmd/osctl/pkg/client/client.go b/cmd/osctl/pkg/client/client.go index c5ddc3220..8ad9381e7 100644 --- a/cmd/osctl/pkg/client/client.go +++ b/cmd/osctl/pkg/client/client.go @@ -327,20 +327,10 @@ func (c *Client) ServiceRestart(ctx context.Context, id string) (*machineapi.Ser // Time returns the time func (c *Client) Time(ctx context.Context) (*timeapi.TimeReply, error) { - r, err := c.TimeClient.Time(ctx, &empty.Empty{}) - if err != nil { - return nil, err - } - - return r, nil + return c.TimeClient.Time(ctx, &empty.Empty{}) } // TimeCheck returns the time compared to the specified ntp server func (c *Client) TimeCheck(ctx context.Context, server string) (*timeapi.TimeReply, error) { - r, err := c.TimeClient.TimeCheck(ctx, &timeapi.TimeRequest{Server: server}) - if err != nil { - return nil, err - } - - return r, nil + return c.TimeClient.TimeCheck(ctx, &timeapi.TimeRequest{Server: server}) } diff --git a/internal/app/apid/main.go b/internal/app/apid/main.go index 2e4148621..398052293 100644 --- a/internal/app/apid/main.go +++ b/internal/app/apid/main.go @@ -71,12 +71,18 @@ func main() { log.Fatalf("networkd client: %v", err) } + timeClient, err := api.NewLocalTimeClient() + if err != nil { + log.Fatalf("time client: %v", err) + } + protoProxy := api.NewApiProxy(provider) err = factory.ListenAndServe( &api.Registrator{ MachineClient: machineClient, OSClient: osClient, + TimeClient: timeClient, }, factory.Port(constants.OsdPort), factory.ServerOptions( diff --git a/internal/app/machined/pkg/system/services/ntpd.go b/internal/app/machined/pkg/system/services/ntpd.go index 34ed99350..033dbb1f5 100644 --- a/internal/app/machined/pkg/system/services/ntpd.go +++ b/internal/app/machined/pkg/system/services/ntpd.go @@ -68,13 +68,13 @@ func (n *NTPd) Runner(config runtime.Configurator) (runner.Runner, error) { } // Ensure socket dir exists - if err := os.MkdirAll(filepath.Dir(constants.NtpdSocketPath), os.ModeDir); err != nil { + if err := os.MkdirAll(filepath.Dir(constants.TimeSocketPath), os.ModeDir); err != nil { return nil, err } mounts := []specs.Mount{ {Type: "bind", Destination: constants.ConfigPath, Source: constants.ConfigPath, Options: []string{"rbind", "ro"}}, - {Type: "bind", Destination: filepath.Dir(constants.NtpdSocketPath), Source: filepath.Dir(constants.NtpdSocketPath), Options: []string{"rbind", "rw"}}, + {Type: "bind", Destination: filepath.Dir(constants.TimeSocketPath), Source: filepath.Dir(constants.TimeSocketPath), Options: []string{"rbind", "rw"}}, } env := []string{} diff --git a/internal/app/ntpd/main.go b/internal/app/ntpd/main.go index 4becd924a..b927cbb15 100644 --- a/internal/app/ntpd/main.go +++ b/internal/app/ntpd/main.go @@ -78,7 +78,7 @@ func main() { errch <- factory.ListenAndServe( reg.NewRegistrator(n), factory.Network("unix"), - factory.SocketPath(constants.NtpdSocketPath), + factory.SocketPath(constants.TimeSocketPath), ) }() diff --git a/internal/app/ntpd/pkg/reg/reg.go b/internal/app/ntpd/pkg/reg/reg.go index 5bbbbc7f1..bf3119377 100644 --- a/internal/app/ntpd/pkg/reg/reg.go +++ b/internal/app/ntpd/pkg/reg/reg.go @@ -77,9 +77,13 @@ func genProtobufTimeReply(local, remote time.Time, server string) (*timeapi.Time } reply = &timeapi.TimeReply{ - Server: server, - Localtime: localpbts, - Remotetime: remotepbts, + Response: []*timeapi.TimeResponse{ + { + Server: server, + Localtime: localpbts, + Remotetime: remotepbts, + }, + }, } return reply, nil diff --git a/internal/app/ntpd/pkg/reg/reg_test.go b/internal/app/ntpd/pkg/reg/reg_test.go index 751fa8570..7515aed0d 100644 --- a/internal/app/ntpd/pkg/reg/reg_test.go +++ b/internal/app/ntpd/pkg/reg/reg_test.go @@ -55,9 +55,9 @@ func (suite *NtpdSuite) TestTime() { suite.Assert().NoError(err) nClient := timeapi.NewTimeClient(conn) - resp, err := nClient.Time(context.Background(), &empty.Empty{}) + reply, err := nClient.Time(context.Background(), &empty.Empty{}) suite.Assert().NoError(err) - suite.Assert().Equal(resp.Server, testServer) + suite.Assert().Equal(reply.Response[0].Server, testServer) } func (suite *NtpdSuite) TestTimeCheck() { @@ -86,9 +86,9 @@ func (suite *NtpdSuite) TestTimeCheck() { suite.Assert().NoError(err) nClient := timeapi.NewTimeClient(conn) - resp, err := nClient.TimeCheck(context.Background(), &timeapi.TimeRequest{Server: testServer}) + reply, err := nClient.TimeCheck(context.Background(), &timeapi.TimeRequest{Server: testServer}) suite.Assert().NoError(err) - suite.Assert().Equal(resp.Server, testServer) + suite.Assert().Equal(reply.Response[0].Server, testServer) } func fakeNtpdRPC() (net.Listener, error) { diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 0111ebbde..2af7eb941 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -194,8 +194,8 @@ const ( // MachineSocketPath is the path to file socket of machine API. MachineSocketPath = SystemRunPath + "/machined/machine.sock" - // NtpdSocketPath is the path to file socket of time API. - NtpdSocketPath = SystemRunPath + "/ntpd/ntpd.sock" + // TimeSocketPath is the path to file socket of time API. + TimeSocketPath = SystemRunPath + "/ntpd/ntpd.sock" // NetworkdSocketPath is the path to file socket of network API. NetworkdSocketPath = SystemRunPath + "/networkd/networkd.sock"