From 0102a64a5f6de2c3fe5d7792c2c5845fc737edff Mon Sep 17 00:00:00 2001 From: Andrey Smirnov Date: Fri, 12 Nov 2021 19:49:39 +0300 Subject: [PATCH] refactor: remove pkg/resources dependencies on wgtypes, netx This finishes the work started in #4469, #4524. The only thing left for #4420 is to move the package in the source tree. Signed-off-by: Andrey Smirnov --- .../machined/pkg/adapters/cluster/cluster.go | 2 +- internal/app/machined/pkg/adapters/k8s/k8s.go | 2 +- .../pkg/adapters/kubespan/identity.go | 72 ++++ .../pkg/adapters}/kubespan/identity_test.go | 5 +- .../pkg/adapters/kubespan/kubespan.go | 6 + .../pkg/adapters/kubespan/peer_status.go | 158 ++++++++ .../adapters}/kubespan/peer_status_test.go | 47 +-- .../pkg/adapters/network/bond_master_spec.go | 69 ++-- .../adapters/network/bond_master_spec_test.go | 33 ++ .../machined/pkg/adapters/network/network.go | 6 + .../pkg/adapters/network/vlan_spec.go | 64 +++ .../pkg/adapters/network/vlan_spec_test.go | 31 ++ .../pkg/adapters/network/wireguard_spec.go | 201 ++++++++++ .../adapters/network/wireguard_spec_test.go | 279 ++++++++++++++ .../app/machined/pkg/adapters/perf/perf.go | 2 +- .../cluster/local_affiliate_test.go | 5 +- .../pkg/controllers/kubespan/identity.go | 5 +- .../pkg/controllers/kubespan/manager.go | 11 +- .../pkg/controllers/kubespan/manager_test.go | 5 +- .../pkg/controllers/network/link_config.go | 3 +- .../pkg/controllers/network/link_spec.go | 13 +- .../pkg/controllers/network/link_spec_test.go | 5 +- .../pkg/controllers/network/link_status.go | 7 +- pkg/resources/.importvet.yaml | 12 +- pkg/resources/kubespan/identity.go | 28 -- pkg/resources/kubespan/peer_status.go | 132 ------- pkg/resources/kubespan/utils.go | 31 -- pkg/resources/network/link.go | 199 ---------- pkg/resources/network/link_test.go | 363 ++---------------- 29 files changed, 985 insertions(+), 811 deletions(-) create mode 100644 internal/app/machined/pkg/adapters/kubespan/identity.go rename {pkg/resources => internal/app/machined/pkg/adapters}/kubespan/identity_test.go (73%) create mode 100644 internal/app/machined/pkg/adapters/kubespan/kubespan.go create mode 100644 internal/app/machined/pkg/adapters/kubespan/peer_status.go rename {pkg/resources => internal/app/machined/pkg/adapters}/kubespan/peer_status_test.go (60%) rename pkg/resources/network/link_linux.go => internal/app/machined/pkg/adapters/network/bond_master_spec.go (83%) create mode 100644 internal/app/machined/pkg/adapters/network/bond_master_spec_test.go create mode 100644 internal/app/machined/pkg/adapters/network/network.go create mode 100644 internal/app/machined/pkg/adapters/network/vlan_spec.go create mode 100644 internal/app/machined/pkg/adapters/network/vlan_spec_test.go create mode 100644 internal/app/machined/pkg/adapters/network/wireguard_spec.go create mode 100644 internal/app/machined/pkg/adapters/network/wireguard_spec_test.go delete mode 100644 pkg/resources/kubespan/utils.go diff --git a/internal/app/machined/pkg/adapters/cluster/cluster.go b/internal/app/machined/pkg/adapters/cluster/cluster.go index b7561cd1f..dbd420e69 100644 --- a/internal/app/machined/pkg/adapters/cluster/cluster.go +++ b/internal/app/machined/pkg/adapters/cluster/cluster.go @@ -2,5 +2,5 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -// Package cluster providers adapters wrapping resources/cluster to provide additional functionality. +// Package cluster implements adapters wrapping resources/cluster to provide additional functionality. package cluster diff --git a/internal/app/machined/pkg/adapters/k8s/k8s.go b/internal/app/machined/pkg/adapters/k8s/k8s.go index a67ed37c5..8e9789a23 100644 --- a/internal/app/machined/pkg/adapters/k8s/k8s.go +++ b/internal/app/machined/pkg/adapters/k8s/k8s.go @@ -2,5 +2,5 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -// Package k8s providers adapters wrapping resources/k8s to provide additional functionality. +// Package k8s implements adapters wrapping resources/k8s to provide additional functionality. package k8s diff --git a/internal/app/machined/pkg/adapters/kubespan/identity.go b/internal/app/machined/pkg/adapters/kubespan/identity.go new file mode 100644 index 000000000..0afb1e772 --- /dev/null +++ b/internal/app/machined/pkg/adapters/kubespan/identity.go @@ -0,0 +1,72 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package kubespan + +import ( + "fmt" + "net" + + "github.com/mdlayher/netx/eui64" + "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + "inet.af/netaddr" + + "github.com/talos-systems/talos/pkg/resources/kubespan" + "github.com/talos-systems/talos/pkg/resources/network" +) + +// IdentitySpec adapter provides identity generation. +// +//nolint:revive,golint +func IdentitySpec(r *kubespan.IdentitySpec) identity { + return identity{ + IdentitySpec: r, + } +} + +type identity struct { + *kubespan.IdentitySpec +} + +// GenerateKey generates new Wireguard key. +func (a identity) GenerateKey() error { + key, err := wgtypes.GeneratePrivateKey() + if err != nil { + return err + } + + a.IdentitySpec.PrivateKey = key.String() + a.IdentitySpec.PublicKey = key.PublicKey().String() + + return nil +} + +// UpdateAddress re-calculates node address based on input data. +func (a identity) UpdateAddress(clusterID string, mac net.HardwareAddr) error { + a.IdentitySpec.Subnet = network.ULAPrefix(clusterID, network.ULAKubeSpan) + + var err error + + a.IdentitySpec.Address, err = wgEUI64(a.IdentitySpec.Subnet, mac) + + return err +} + +func wgEUI64(prefix netaddr.IPPrefix, mac net.HardwareAddr) (out netaddr.IPPrefix, err error) { + if prefix.IsZero() { + return out, fmt.Errorf("cannot calculate IP from zero prefix") + } + + stdIP, err := eui64.ParseMAC(prefix.IPNet().IP, mac) + if err != nil { + return out, fmt.Errorf("failed to parse MAC into EUI-64 address: %w", err) + } + + ip, ok := netaddr.FromStdIP(stdIP) + if !ok { + return out, fmt.Errorf("failed to parse intermediate standard IP %q: %w", stdIP.String(), err) + } + + return netaddr.IPPrefixFrom(ip, ip.BitLen()), nil +} diff --git a/pkg/resources/kubespan/identity_test.go b/internal/app/machined/pkg/adapters/kubespan/identity_test.go similarity index 73% rename from pkg/resources/kubespan/identity_test.go rename to internal/app/machined/pkg/adapters/kubespan/identity_test.go index 8fedc116d..b5d91d07b 100644 --- a/pkg/resources/kubespan/identity_test.go +++ b/internal/app/machined/pkg/adapters/kubespan/identity_test.go @@ -11,13 +11,14 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + kubespanadapter "github.com/talos-systems/talos/internal/app/machined/pkg/adapters/kubespan" "github.com/talos-systems/talos/pkg/resources/kubespan" ) func TestIdentityGenerateKey(t *testing.T) { var spec kubespan.IdentitySpec - assert.NoError(t, spec.GenerateKey()) + assert.NoError(t, kubespanadapter.IdentitySpec(&spec).GenerateKey()) } func TestIdentityUpdateAddress(t *testing.T) { @@ -26,7 +27,7 @@ func TestIdentityUpdateAddress(t *testing.T) { mac, err := net.ParseMAC("2e:1a:b6:53:81:69") require.NoError(t, err) - assert.NoError(t, spec.UpdateAddress("8XuV9TZHW08DOk3bVxQjH9ih_TBKjnh-j44tsCLSBzo=", mac)) + assert.NoError(t, kubespanadapter.IdentitySpec(&spec).UpdateAddress("8XuV9TZHW08DOk3bVxQjH9ih_TBKjnh-j44tsCLSBzo=", mac)) assert.Equal(t, "fd7f:175a:b97c:5602:2c1a:b6ff:fe53:8169/128", spec.Address.String()) assert.Equal(t, "fd7f:175a:b97c:5602::/64", spec.Subnet.String()) diff --git a/internal/app/machined/pkg/adapters/kubespan/kubespan.go b/internal/app/machined/pkg/adapters/kubespan/kubespan.go new file mode 100644 index 000000000..5b5995cfe --- /dev/null +++ b/internal/app/machined/pkg/adapters/kubespan/kubespan.go @@ -0,0 +1,6 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +// Package kubespan implements adapters wrapping resources/kubespan to provide additional functionality. +package kubespan diff --git a/internal/app/machined/pkg/adapters/kubespan/peer_status.go b/internal/app/machined/pkg/adapters/kubespan/peer_status.go new file mode 100644 index 000000000..ecdd1523f --- /dev/null +++ b/internal/app/machined/pkg/adapters/kubespan/peer_status.go @@ -0,0 +1,158 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package kubespan + +import ( + "time" + + "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + "inet.af/netaddr" + + "github.com/talos-systems/talos/pkg/resources/kubespan" +) + +// PeerStatusSpec adapter provides Wiregard integration and state management. +// +//nolint:revive,golint +func PeerStatusSpec(r *kubespan.PeerStatusSpec) peerStatus { + return peerStatus{ + PeerStatusSpec: r, + } +} + +type peerStatus struct { + *kubespan.PeerStatusSpec +} + +// PeerDownInterval is the time since last handshake when established peer is considered to be down. +// +// WG whitepaper defines a downed peer as being: +// Handshake Timeout (180s) + Rekey Timeout (5s) + Rekey Attempt Timeout (90s) +// +// This interval is applied when the link is already established. +const PeerDownInterval = (180 + 5 + 90) * time.Second + +// EndpointConnectionTimeout is time to wait for initial handshake when the endpoint is just set. +const EndpointConnectionTimeout = 15 * time.Second + +// CalculateState updates connection state based on other fields values. +// +// Goal: endpoint is ultimately down if we haven't seen handshake for more than peerDownInterval, +// but as the endpoints get updated we want faster feedback, so we start checking more aggressively +// that the handshake happened within endpointConnectionTimeout since last endpoint change. +// +// Timeline: +// +// ----------------------------------------------------------------------> +// ^ ^ ^ +// | | | +// T0 T0+endpointConnectionTimeout T0+peerDownInterval +// +// Where T0 = LastEndpontChange +// +// The question is where is LastHandshakeTimeout vs. those points above: +// +// * if we're past (T0+peerDownInterval), simply check that time since last handshake < peerDownInterval +// * if we're between (T0+endpointConnectionTimeout) and (T0+peerDownInterval), and there's no handshake +// after the endpoint change, assume that the endpoint is down +// * if we're between (T0) and (T0+endpointConnectionTimeout), and there's no handshake since the endpoint change, +// consider the state to be unknown +func (a peerStatus) CalculateState() { + sinceLastHandshake := time.Since(a.PeerStatusSpec.LastHandshakeTime) + sinceEndpointChange := time.Since(a.PeerStatusSpec.LastEndpointChange) + + a.CalculateStateWithDurations(sinceLastHandshake, sinceEndpointChange) +} + +// CalculateStateWithDurations calculates the state based on the time since events. +func (a peerStatus) CalculateStateWithDurations(sinceLastHandshake, sinceEndpointChange time.Duration) { + switch { + case sinceEndpointChange > PeerDownInterval: // past T0+peerDownInterval + // if we got handshake in the last peerDownInterval, endpoint is up + if sinceLastHandshake < PeerDownInterval { + a.PeerStatusSpec.State = kubespan.PeerStateUp + } else { + a.PeerStatusSpec.State = kubespan.PeerStateDown + } + case sinceEndpointChange < EndpointConnectionTimeout: // between (T0) and (T0+endpointConnectionTimeout) + // endpoint got recently updated, consider no handshake as 'unknown' + if a.PeerStatusSpec.LastHandshakeTime.After(a.PeerStatusSpec.LastEndpointChange) { + a.PeerStatusSpec.State = kubespan.PeerStateUp + } else { + a.PeerStatusSpec.State = kubespan.PeerStateUnknown + } + + default: // otherwise, we're between (T0+endpointConnectionTimeout) and (T0+peerDownInterval) + // if we haven't had the handshake yet, consider the endpoint to be down + if a.PeerStatusSpec.LastHandshakeTime.After(a.PeerStatusSpec.LastEndpointChange) { + a.PeerStatusSpec.State = kubespan.PeerStateUp + } else { + a.PeerStatusSpec.State = kubespan.PeerStateDown + } + } + + if a.PeerStatusSpec.State == kubespan.PeerStateDown && a.PeerStatusSpec.LastUsedEndpoint.IsZero() { + // no endpoint, so unknown + a.PeerStatusSpec.State = kubespan.PeerStateUnknown + } +} + +// UpdateFromWireguard updates fields from wgtypes information. +func (a peerStatus) UpdateFromWireguard(peer wgtypes.Peer) { + if peer.Endpoint != nil { + a.PeerStatusSpec.Endpoint, _ = netaddr.FromStdAddr(peer.Endpoint.IP, peer.Endpoint.Port, "") + } else { + a.PeerStatusSpec.Endpoint = netaddr.IPPort{} + } + + a.PeerStatusSpec.LastHandshakeTime = peer.LastHandshakeTime + a.PeerStatusSpec.TransmitBytes = peer.TransmitBytes + a.PeerStatusSpec.ReceiveBytes = peer.ReceiveBytes +} + +// UpdateEndpoint updates the endpoint information and last update timestamp. +func (a peerStatus) UpdateEndpoint(endpoint netaddr.IPPort) { + a.PeerStatusSpec.Endpoint = endpoint + a.PeerStatusSpec.LastUsedEndpoint = endpoint + a.PeerStatusSpec.LastEndpointChange = time.Now() + a.PeerStatusSpec.State = kubespan.PeerStateUnknown +} + +// ShouldChangeEndpoint tells whether endpoint should be updated. +func (a peerStatus) ShouldChangeEndpoint() bool { + return a.PeerStatusSpec.State == kubespan.PeerStateDown || a.PeerStatusSpec.LastUsedEndpoint.IsZero() +} + +// PickNewEndpoint picks new endpoint given the state and list of available endpoints. +// +// If returned newEndpoint is zero value, no new endpoint is available. +func (a peerStatus) PickNewEndpoint(endpoints []netaddr.IPPort) (newEndpoint netaddr.IPPort) { + if len(endpoints) == 0 { + return + } + + if a.PeerStatusSpec.LastUsedEndpoint.IsZero() { + // first time setting the endpoint + newEndpoint = endpoints[0] + } else { + // find the next endpoint after LastUsedEndpoint and use it + idx := -1 + + for i := range endpoints { + if endpoints[i] == a.PeerStatusSpec.LastUsedEndpoint { + idx = i + + break + } + } + + // special case: if the peer has just a single endpoint, we can't rotate + if !(len(endpoints) == 1 && idx == 0 && a.PeerStatusSpec.Endpoint == a.PeerStatusSpec.LastUsedEndpoint) { + newEndpoint = endpoints[(idx+1)%len(endpoints)] + } + } + + return +} diff --git a/pkg/resources/kubespan/peer_status_test.go b/internal/app/machined/pkg/adapters/kubespan/peer_status_test.go similarity index 60% rename from pkg/resources/kubespan/peer_status_test.go rename to internal/app/machined/pkg/adapters/kubespan/peer_status_test.go index acf4f2e2e..1dc61fe33 100644 --- a/pkg/resources/kubespan/peer_status_test.go +++ b/internal/app/machined/pkg/adapters/kubespan/peer_status_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/assert" "inet.af/netaddr" + kubespanadapter "github.com/talos-systems/talos/internal/app/machined/pkg/adapters/kubespan" "github.com/talos-systems/talos/pkg/resources/kubespan" ) @@ -19,7 +20,7 @@ func TestPeerStatus_PickNewEndpoint(t *testing.T) { peerStatus := kubespan.PeerStatusSpec{} // no endpoint => no way to pick new one - assert.True(t, peerStatus.PickNewEndpoint(nil).IsZero()) + assert.True(t, kubespanadapter.PeerStatusSpec(&peerStatus).PickNewEndpoint(nil).IsZero()) endpoints := []netaddr.IPPort{ netaddr.MustParseIPPort("10.3.4.5:10500"), @@ -27,36 +28,36 @@ func TestPeerStatus_PickNewEndpoint(t *testing.T) { } // initial choice should be the first endpoint - newEndpoint := peerStatus.PickNewEndpoint(endpoints) + newEndpoint := kubespanadapter.PeerStatusSpec(&peerStatus).PickNewEndpoint(endpoints) assert.Equal(t, endpoints[0], newEndpoint) - peerStatus.UpdateEndpoint(newEndpoint) + kubespanadapter.PeerStatusSpec(&peerStatus).UpdateEndpoint(newEndpoint) // next choice should be 2nd endpoint - newEndpoint = peerStatus.PickNewEndpoint(endpoints) + newEndpoint = kubespanadapter.PeerStatusSpec(&peerStatus).PickNewEndpoint(endpoints) assert.Equal(t, endpoints[1], newEndpoint) - peerStatus.UpdateEndpoint(newEndpoint) + kubespanadapter.PeerStatusSpec(&peerStatus).UpdateEndpoint(newEndpoint) // back to the first endpoint - newEndpoint = peerStatus.PickNewEndpoint(endpoints) + newEndpoint = kubespanadapter.PeerStatusSpec(&peerStatus).PickNewEndpoint(endpoints) assert.Equal(t, endpoints[0], newEndpoint) - peerStatus.UpdateEndpoint(newEndpoint) + kubespanadapter.PeerStatusSpec(&peerStatus).UpdateEndpoint(newEndpoint) // can't rotate a single endpoint - assert.True(t, peerStatus.PickNewEndpoint(endpoints[:1]).IsZero()) + assert.True(t, kubespanadapter.PeerStatusSpec(&peerStatus).PickNewEndpoint(endpoints[:1]).IsZero()) // can rotate if the endpoint is different - newEndpoint = peerStatus.PickNewEndpoint(endpoints[1:]) + newEndpoint = kubespanadapter.PeerStatusSpec(&peerStatus).PickNewEndpoint(endpoints[1:]) assert.Equal(t, endpoints[1], newEndpoint) - peerStatus.UpdateEndpoint(newEndpoint) + kubespanadapter.PeerStatusSpec(&peerStatus).UpdateEndpoint(newEndpoint) // if totally new list of endpoints is given, pick the first one endpoints = []netaddr.IPPort{ netaddr.MustParseIPPort("10.3.4.5:10501"), netaddr.MustParseIPPort("192.168.3.8:458"), } - newEndpoint = peerStatus.PickNewEndpoint(endpoints) + newEndpoint = kubespanadapter.PeerStatusSpec(&peerStatus).PickNewEndpoint(endpoints) assert.Equal(t, endpoints[0], newEndpoint) - peerStatus.UpdateEndpoint(newEndpoint) + kubespanadapter.PeerStatusSpec(&peerStatus).UpdateEndpoint(newEndpoint) } func TestPeerStatus_CalculateState(t *testing.T) { @@ -78,38 +79,38 @@ func TestPeerStatus_CalculateState(t *testing.T) { }, { name: "peer is down", - sinceLastHandshake: 2 * kubespan.PeerDownInterval, - sinceEndpointChange: 2 * kubespan.PeerDownInterval, + sinceLastHandshake: 2 * kubespanadapter.PeerDownInterval, + sinceEndpointChange: 2 * kubespanadapter.PeerDownInterval, expectedState: kubespan.PeerStateDown, }, { name: "fresh peer, no handshake", - sinceLastHandshake: 2 * kubespan.PeerDownInterval, - sinceEndpointChange: kubespan.EndpointConnectionTimeout / 2, + sinceLastHandshake: 2 * kubespanadapter.PeerDownInterval, + sinceEndpointChange: kubespanadapter.EndpointConnectionTimeout / 2, expectedState: kubespan.PeerStateUnknown, }, { name: "fresh peer, with handshake", sinceLastHandshake: 0, - sinceEndpointChange: kubespan.EndpointConnectionTimeout / 2, + sinceEndpointChange: kubespanadapter.EndpointConnectionTimeout / 2, expectedState: kubespan.PeerStateUp, }, { name: "peer after initial timeout, with handshake", sinceLastHandshake: 0, - sinceEndpointChange: kubespan.EndpointConnectionTimeout + 1, + sinceEndpointChange: kubespanadapter.EndpointConnectionTimeout + 1, expectedState: kubespan.PeerStateUp, }, { name: "peer after initial timeout, no handshake", - sinceLastHandshake: 2 * kubespan.EndpointConnectionTimeout, - sinceEndpointChange: kubespan.EndpointConnectionTimeout + 1, + sinceLastHandshake: 2 * kubespanadapter.EndpointConnectionTimeout, + sinceEndpointChange: kubespanadapter.EndpointConnectionTimeout + 1, expectedState: kubespan.PeerStateDown, }, { name: "established peer, up", - sinceLastHandshake: kubespan.PeerDownInterval / 2, - sinceEndpointChange: kubespan.PeerDownInterval + 1, + sinceLastHandshake: kubespanadapter.PeerDownInterval / 2, + sinceEndpointChange: kubespanadapter.PeerDownInterval + 1, expectedState: kubespan.PeerStateUp, }, } { @@ -125,7 +126,7 @@ func TestPeerStatus_CalculateState(t *testing.T) { peerStatus.LastUsedEndpoint = netaddr.MustParseIPPort("192.168.1.1:10000") } - peerStatus.CalculateStateWithDurations(tt.sinceLastHandshake, tt.sinceEndpointChange) + kubespanadapter.PeerStatusSpec(&peerStatus).CalculateStateWithDurations(tt.sinceLastHandshake, tt.sinceEndpointChange) assert.Equal(t, tt.expectedState, peerStatus.State) }) diff --git a/pkg/resources/network/link_linux.go b/internal/app/machined/pkg/adapters/network/bond_master_spec.go similarity index 83% rename from pkg/resources/network/link_linux.go rename to internal/app/machined/pkg/adapters/network/bond_master_spec.go index 8eae8077a..e582e2d9c 100644 --- a/pkg/resources/network/link_linux.go +++ b/internal/app/machined/pkg/adapters/network/bond_master_spec.go @@ -5,50 +5,61 @@ package network import ( - "encoding/binary" - "github.com/mdlayher/netlink" "golang.org/x/sys/unix" "github.com/talos-systems/talos/pkg/machinery/nethelpers" + "github.com/talos-systems/talos/pkg/resources/network" ) -// Encode the VLANSpec into netlink attributes. -func (vlan *VLANSpec) Encode() ([]byte, error) { - encoder := netlink.NewAttributeEncoder() - - encoder.Uint16(unix.IFLA_VLAN_ID, vlan.VID) - - buf := make([]byte, 2) - binary.BigEndian.PutUint16(buf, uint16(vlan.Protocol)) - encoder.Bytes(unix.IFLA_VLAN_PROTOCOL, buf) - - return encoder.Encode() +// BondMasterSpec adapter provides encoding/decoding to netlink structures. +// +//nolint:revive,golint +func BondMasterSpec(r *network.BondMasterSpec) bondMaster { + return bondMaster{ + BondMasterSpec: r, + } } -// Decode the VLANSpec from netlink attributes. -func (vlan *VLANSpec) Decode(data []byte) error { - decoder, err := netlink.NewAttributeDecoder(data) - if err != nil { - return err +type bondMaster struct { + *network.BondMasterSpec +} + +// FillDefaults fills zero values with proper default values. +func (a bondMaster) FillDefaults() { + bond := a.BondMasterSpec + + if bond.ResendIGMP == 0 { + bond.ResendIGMP = 1 } - for decoder.Next() { - switch decoder.Type() { - case unix.IFLA_VLAN_ID: - vlan.VID = decoder.Uint16() - case unix.IFLA_VLAN_PROTOCOL: - vlan.Protocol = nethelpers.VLANProtocol(binary.BigEndian.Uint16(decoder.Bytes())) - } + if bond.LPInterval == 0 { + bond.LPInterval = 1 } - return decoder.Err() + if bond.PacketsPerSlave == 0 { + bond.PacketsPerSlave = 1 + } + + if bond.NumPeerNotif == 0 { + bond.NumPeerNotif = 1 + } + + if bond.Mode != nethelpers.BondModeALB && bond.Mode != nethelpers.BondModeTLB { + bond.TLBDynamicLB = 1 + } + + if bond.Mode == nethelpers.BondMode8023AD { + bond.ADActorSysPrio = 65535 + } } // Encode the BondMasterSpec into netlink attributes. // //nolint:gocyclo -func (bond *BondMasterSpec) Encode() ([]byte, error) { +func (a bondMaster) Encode() ([]byte, error) { + bond := a.BondMasterSpec + encoder := netlink.NewAttributeEncoder() encoder.Uint8(unix.IFLA_BOND_MODE, uint8(bond.Mode)) @@ -121,7 +132,9 @@ func (bond *BondMasterSpec) Encode() ([]byte, error) { // Decode the BondMasterSpec from netlink attributes. // //nolint:gocyclo,cyclop -func (bond *BondMasterSpec) Decode(data []byte) error { +func (a bondMaster) Decode(data []byte) error { + bond := a.BondMasterSpec + decoder, err := netlink.NewAttributeDecoder(data) if err != nil { return err diff --git a/internal/app/machined/pkg/adapters/network/bond_master_spec_test.go b/internal/app/machined/pkg/adapters/network/bond_master_spec_test.go new file mode 100644 index 000000000..79c3faada --- /dev/null +++ b/internal/app/machined/pkg/adapters/network/bond_master_spec_test.go @@ -0,0 +1,33 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package network_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + networkadapter "github.com/talos-systems/talos/internal/app/machined/pkg/adapters/network" + "github.com/talos-systems/talos/pkg/machinery/nethelpers" + "github.com/talos-systems/talos/pkg/resources/network" +) + +func TestBondMasterSpec(t *testing.T) { + spec := network.BondMasterSpec{ + Mode: nethelpers.BondModeActiveBackup, + MIIMon: 100, + UpDelay: 200, + DownDelay: 300, + } + + b, err := networkadapter.BondMasterSpec(&spec).Encode() + require.NoError(t, err) + + var decodedSpec network.BondMasterSpec + + require.NoError(t, networkadapter.BondMasterSpec(&decodedSpec).Decode(b)) + + require.Equal(t, spec, decodedSpec) +} diff --git a/internal/app/machined/pkg/adapters/network/network.go b/internal/app/machined/pkg/adapters/network/network.go new file mode 100644 index 000000000..1381663bb --- /dev/null +++ b/internal/app/machined/pkg/adapters/network/network.go @@ -0,0 +1,6 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +// Package network implements adapters wrapping resources/network to provide additional functionality. +package network diff --git a/internal/app/machined/pkg/adapters/network/vlan_spec.go b/internal/app/machined/pkg/adapters/network/vlan_spec.go new file mode 100644 index 000000000..189b26403 --- /dev/null +++ b/internal/app/machined/pkg/adapters/network/vlan_spec.go @@ -0,0 +1,64 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package network + +import ( + "encoding/binary" + + "github.com/mdlayher/netlink" + "golang.org/x/sys/unix" + + "github.com/talos-systems/talos/pkg/machinery/nethelpers" + "github.com/talos-systems/talos/pkg/resources/network" +) + +// VLANSpec adapter provides encoding/decoding to netlink structures. +// +//nolint:revive,golint +func VLANSpec(r *network.VLANSpec) vlanSpec { + return vlanSpec{ + VLANSpec: r, + } +} + +type vlanSpec struct { + *network.VLANSpec +} + +// Encode the VLANSpec into netlink attributes. +func (a vlanSpec) Encode() ([]byte, error) { + vlan := a.VLANSpec + + encoder := netlink.NewAttributeEncoder() + + encoder.Uint16(unix.IFLA_VLAN_ID, vlan.VID) + + buf := make([]byte, 2) + binary.BigEndian.PutUint16(buf, uint16(vlan.Protocol)) + encoder.Bytes(unix.IFLA_VLAN_PROTOCOL, buf) + + return encoder.Encode() +} + +// Decode the VLANSpec from netlink attributes. +func (a vlanSpec) Decode(data []byte) error { + vlan := a.VLANSpec + + decoder, err := netlink.NewAttributeDecoder(data) + if err != nil { + return err + } + + for decoder.Next() { + switch decoder.Type() { + case unix.IFLA_VLAN_ID: + vlan.VID = decoder.Uint16() + case unix.IFLA_VLAN_PROTOCOL: + vlan.Protocol = nethelpers.VLANProtocol(binary.BigEndian.Uint16(decoder.Bytes())) + } + } + + return decoder.Err() +} diff --git a/internal/app/machined/pkg/adapters/network/vlan_spec_test.go b/internal/app/machined/pkg/adapters/network/vlan_spec_test.go new file mode 100644 index 000000000..f92b86d81 --- /dev/null +++ b/internal/app/machined/pkg/adapters/network/vlan_spec_test.go @@ -0,0 +1,31 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package network_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + networkadapter "github.com/talos-systems/talos/internal/app/machined/pkg/adapters/network" + "github.com/talos-systems/talos/pkg/machinery/nethelpers" + "github.com/talos-systems/talos/pkg/resources/network" +) + +func TestVLANSpec(t *testing.T) { + spec := network.VLANSpec{ + VID: 25, + Protocol: nethelpers.VLANProtocol8021AD, + } + + b, err := networkadapter.VLANSpec(&spec).Encode() + require.NoError(t, err) + + var decodedSpec network.VLANSpec + + require.NoError(t, networkadapter.VLANSpec(&decodedSpec).Decode(b)) + + require.Equal(t, spec, decodedSpec) +} diff --git a/internal/app/machined/pkg/adapters/network/wireguard_spec.go b/internal/app/machined/pkg/adapters/network/wireguard_spec.go new file mode 100644 index 000000000..00f254fab --- /dev/null +++ b/internal/app/machined/pkg/adapters/network/wireguard_spec.go @@ -0,0 +1,201 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package network + +import ( + "net" + + "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + "inet.af/netaddr" + + "github.com/talos-systems/talos/pkg/resources/network" +) + +// WireguardSpec adapter provides encoding/decoding to netlink structures. +// +//nolint:revive,golint +func WireguardSpec(r *network.WireguardSpec) wireguardSpec { + return wireguardSpec{ + WireguardSpec: r, + } +} + +type wireguardSpec struct { + *network.WireguardSpec +} + +// Encode converts WireguardSpec to wgctrl.Config "patch" to adjust the config to match the spec. +// +// Both specs should be sorted. +// +// Encode produces a "diff" as *wgtypes.Config which when applied transitions `existing` configuration into +// configuration `spec`. +// +//nolint:gocyclo,cyclop +func (a wireguardSpec) Encode(existing *network.WireguardSpec) (*wgtypes.Config, error) { + spec := a.WireguardSpec + + cfg := &wgtypes.Config{} + + if existing.PrivateKey != spec.PrivateKey { + key, err := wgtypes.ParseKey(spec.PrivateKey) + if err != nil { + return nil, err + } + + cfg.PrivateKey = &key + } + + if existing.ListenPort != spec.ListenPort { + cfg.ListenPort = &spec.ListenPort + } + + if existing.FirewallMark != spec.FirewallMark { + cfg.FirewallMark = &spec.FirewallMark + } + + // perform a merge of two sorted list of peers producing diff + l, r := 0, 0 + + for l < len(existing.Peers) || r < len(spec.Peers) { + addPeer := func(peer *network.WireguardPeer) error { + pubKey, err := wgtypes.ParseKey(peer.PublicKey) + if err != nil { + return err + } + + var presharedKey *wgtypes.Key + + if peer.PresharedKey != "" { + var parsedKey wgtypes.Key + + parsedKey, err = wgtypes.ParseKey(peer.PresharedKey) + if err != nil { + return err + } + + presharedKey = &parsedKey + } + + var endpoint *net.UDPAddr + + if peer.Endpoint != "" { + endpoint, err = net.ResolveUDPAddr("", peer.Endpoint) + if err != nil { + return err + } + } + + allowedIPs := make([]net.IPNet, len(peer.AllowedIPs)) + + for i := range peer.AllowedIPs { + allowedIPs[i] = *peer.AllowedIPs[i].IPNet() + } + + cfg.Peers = append(cfg.Peers, wgtypes.PeerConfig{ + PublicKey: pubKey, + Endpoint: endpoint, + PresharedKey: presharedKey, + PersistentKeepaliveInterval: &peer.PersistentKeepaliveInterval, + ReplaceAllowedIPs: true, + AllowedIPs: allowedIPs, + }) + + return nil + } + + deletePeer := func(peer *network.WireguardPeer) error { + pubKey, err := wgtypes.ParseKey(peer.PublicKey) + if err != nil { + return err + } + + cfg.Peers = append(cfg.Peers, wgtypes.PeerConfig{ + PublicKey: pubKey, + Remove: true, + }) + + return nil + } + + var left, right *network.WireguardPeer + + if l < len(existing.Peers) { + left = &existing.Peers[l] + } + + if r < len(spec.Peers) { + right = &spec.Peers[r] + } + + switch { + // peer from the "right" (new spec) is missing in "existing" (left), add it + case left == nil || (right != nil && left.PublicKey > right.PublicKey): + if err := addPeer(right); err != nil { + return nil, err + } + + r++ + // peer from the "left" (existing) is missing in new spec (right), so it should be removed + case right == nil || (left != nil && left.PublicKey < right.PublicKey): + // deleting peers from the existing + if err := deletePeer(left); err != nil { + return nil, err + } + + l++ + // peer public keys are equal, so either they are identical or peer should be replaced + case left.PublicKey == right.PublicKey: + if !left.Equal(right) { + // replace peer + if err := addPeer(right); err != nil { + return nil, err + } + } + + l++ + r++ + } + } + + return cfg, nil +} + +// Decode spec from the device state. +func (a wireguardSpec) Decode(dev *wgtypes.Device, isStatus bool) { + spec := a.WireguardSpec + + if isStatus { + spec.PublicKey = dev.PublicKey.String() + } else { + spec.PrivateKey = dev.PrivateKey.String() + } + + spec.ListenPort = dev.ListenPort + spec.FirewallMark = dev.FirewallMark + + spec.Peers = make([]network.WireguardPeer, len(dev.Peers)) + + for i := range spec.Peers { + spec.Peers[i].PublicKey = dev.Peers[i].PublicKey.String() + + if dev.Peers[i].Endpoint != nil { + spec.Peers[i].Endpoint = dev.Peers[i].Endpoint.String() + } + + var zeroKey wgtypes.Key + + if dev.Peers[i].PresharedKey != zeroKey { + spec.Peers[i].PresharedKey = dev.Peers[i].PresharedKey.String() + } + + spec.Peers[i].PersistentKeepaliveInterval = dev.Peers[i].PersistentKeepaliveInterval + spec.Peers[i].AllowedIPs = make([]netaddr.IPPrefix, len(dev.Peers[i].AllowedIPs)) + + for j := range dev.Peers[i].AllowedIPs { + spec.Peers[i].AllowedIPs[j], _ = netaddr.FromStdIPNet(&dev.Peers[i].AllowedIPs[j]) + } + } +} diff --git a/internal/app/machined/pkg/adapters/network/wireguard_spec_test.go b/internal/app/machined/pkg/adapters/network/wireguard_spec_test.go new file mode 100644 index 000000000..799ab4f8d --- /dev/null +++ b/internal/app/machined/pkg/adapters/network/wireguard_spec_test.go @@ -0,0 +1,279 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package network_test + +import ( + "net" + "testing" + + "github.com/AlekSi/pointer" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + "inet.af/netaddr" + + networkadapter "github.com/talos-systems/talos/internal/app/machined/pkg/adapters/network" + "github.com/talos-systems/talos/pkg/resources/network" +) + +func TestWireguardSpecDecode(t *testing.T) { + priv, err := wgtypes.GeneratePrivateKey() + require.NoError(t, err) + + pub1, err := wgtypes.GeneratePrivateKey() + require.NoError(t, err) + + pub2, err := wgtypes.GeneratePrivateKey() + require.NoError(t, err) + + var spec network.WireguardSpec + + // decode in spec mode + networkadapter.WireguardSpec(&spec).Decode(&wgtypes.Device{ + PrivateKey: priv, + ListenPort: 30000, + FirewallMark: 1, + Peers: []wgtypes.Peer{ + { + PublicKey: pub1.PublicKey(), + PresharedKey: priv, + Endpoint: &net.UDPAddr{ + IP: net.ParseIP("10.2.0.3"), + Port: 20000, + }, + AllowedIPs: []net.IPNet{ + { + IP: net.ParseIP("172.24.0.0"), + Mask: net.IPv4Mask(255, 255, 0, 0), + }, + }, + }, + { + PublicKey: pub2.PublicKey(), + AllowedIPs: []net.IPNet{ + { + IP: net.ParseIP("172.25.0.0"), + Mask: net.IPv4Mask(255, 255, 255, 0), + }, + }, + }, + }, + }, false) + + expected := network.WireguardSpec{ + PrivateKey: priv.String(), + ListenPort: 30000, + FirewallMark: 1, + Peers: []network.WireguardPeer{ + { + PublicKey: pub1.PublicKey().String(), + PresharedKey: priv.String(), + Endpoint: "10.2.0.3:20000", + AllowedIPs: []netaddr.IPPrefix{ + netaddr.MustParseIPPrefix("172.24.0.0/16"), + }, + }, + { + PublicKey: pub2.PublicKey().String(), + AllowedIPs: []netaddr.IPPrefix{ + netaddr.MustParseIPPrefix("172.25.0.0/24"), + }, + }, + }, + } + + assert.Equal(t, expected, spec) + assert.True(t, expected.Equal(&spec)) + + // zeroed out listen port is still acceptable on the right side + spec.ListenPort = 0 + assert.True(t, expected.Equal(&spec)) + + // ... but not on the left side + expected.ListenPort = 0 + spec.ListenPort = 30000 + assert.False(t, expected.Equal(&spec)) + + var zeroSpec network.WireguardSpec + + assert.False(t, zeroSpec.Equal(&spec)) +} + +func TestWireguardSpecDecodeStatus(t *testing.T) { + priv, err := wgtypes.GeneratePrivateKey() + require.NoError(t, err) + + var spec network.WireguardSpec + + // decode in status mode + networkadapter.WireguardSpec(&spec).Decode(&wgtypes.Device{ + PrivateKey: priv, + PublicKey: priv.PublicKey(), + ListenPort: 30000, + FirewallMark: 1, + }, true) + + expected := network.WireguardSpec{ + PublicKey: priv.PublicKey().String(), + ListenPort: 30000, + FirewallMark: 1, + Peers: []network.WireguardPeer{}, + } + + assert.Equal(t, expected, spec) +} + +func TestWireguardSpecEncode(t *testing.T) { + priv, err := wgtypes.GeneratePrivateKey() + require.NoError(t, err) + + pub1, err := wgtypes.GeneratePrivateKey() + require.NoError(t, err) + + pub2, err := wgtypes.GeneratePrivateKey() + require.NoError(t, err) + + // make sure pub1 < pub2 + if pub1.PublicKey().String() > pub2.PublicKey().String() { + pub1, pub2 = pub2, pub1 + } + + specV1 := network.WireguardSpec{ + PrivateKey: priv.String(), + ListenPort: 30000, + FirewallMark: 1, + Peers: []network.WireguardPeer{ + { + PublicKey: pub1.PublicKey().String(), + Endpoint: "10.2.0.3:20000", + AllowedIPs: []netaddr.IPPrefix{ + netaddr.MustParseIPPrefix("172.24.0.0/16"), + }, + }, + { + PublicKey: pub2.PublicKey().String(), + AllowedIPs: []netaddr.IPPrefix{ + netaddr.MustParseIPPrefix("172.25.0.0/24"), + }, + }, + }, + } + + specV1.Sort() + + var zero network.WireguardSpec + + networkadapter.WireguardSpec(&zero).Decode(&wgtypes.Device{}, false) + zero.Sort() + + // from zero (empty) config to config with two peers + delta, err := networkadapter.WireguardSpec(&specV1).Encode(&zero) + require.NoError(t, err) + + assert.Equal(t, &wgtypes.Config{ + PrivateKey: &priv, + ListenPort: pointer.ToInt(30000), + FirewallMark: pointer.ToInt(1), + Peers: []wgtypes.PeerConfig{ + { + PublicKey: pub1.PublicKey(), + Endpoint: &net.UDPAddr{ + IP: net.ParseIP("10.2.0.3"), + Port: 20000, + }, + PersistentKeepaliveInterval: pointer.ToDuration(0), + ReplaceAllowedIPs: true, + AllowedIPs: []net.IPNet{ + { + IP: net.ParseIP("172.24.0.0").To4(), + Mask: net.IPv4Mask(255, 255, 0, 0), + }, + }, + }, + { + PublicKey: pub2.PublicKey(), + PersistentKeepaliveInterval: pointer.ToDuration(0), + ReplaceAllowedIPs: true, + AllowedIPs: []net.IPNet{ + { + IP: net.ParseIP("172.25.0.0").To4(), + Mask: net.IPv4Mask(255, 255, 255, 0), + }, + }, + }, + }, + }, delta) + + // noop + delta, err = networkadapter.WireguardSpec(&specV1).Encode(&specV1) + require.NoError(t, err) + + assert.Equal(t, &wgtypes.Config{}, delta) + + // delete peer2 + specV2 := network.WireguardSpec{ + PrivateKey: priv.String(), + ListenPort: 30000, + FirewallMark: 1, + Peers: []network.WireguardPeer{ + { + PublicKey: pub1.PublicKey().String(), + Endpoint: "10.2.0.3:20000", + AllowedIPs: []netaddr.IPPrefix{ + netaddr.MustParseIPPrefix("172.24.0.0/16"), + }, + }, + }, + } + + delta, err = networkadapter.WireguardSpec(&specV2).Encode(&specV1) + require.NoError(t, err) + + assert.Equal(t, &wgtypes.Config{ + Peers: []wgtypes.PeerConfig{ + { + PublicKey: pub2.PublicKey(), + Remove: true, + }, + }, + }, delta) + + // update peer1, firewallMark + specV3 := network.WireguardSpec{ + PrivateKey: priv.String(), + ListenPort: 30000, + FirewallMark: 2, + Peers: []network.WireguardPeer{ + { + PublicKey: pub1.PublicKey().String(), + PresharedKey: priv.String(), + AllowedIPs: []netaddr.IPPrefix{ + netaddr.MustParseIPPrefix("172.24.0.0/16"), + }, + }, + }, + } + + delta, err = networkadapter.WireguardSpec(&specV3).Encode(&specV2) + require.NoError(t, err) + + assert.Equal(t, &wgtypes.Config{ + FirewallMark: pointer.ToInt(2), + Peers: []wgtypes.PeerConfig{ + { + PublicKey: pub1.PublicKey(), + PresharedKey: &priv, + PersistentKeepaliveInterval: pointer.ToDuration(0), + ReplaceAllowedIPs: true, + AllowedIPs: []net.IPNet{ + { + IP: net.ParseIP("172.24.0.0").To4(), + Mask: net.IPv4Mask(255, 255, 0, 0), + }, + }, + }, + }, + }, delta) +} diff --git a/internal/app/machined/pkg/adapters/perf/perf.go b/internal/app/machined/pkg/adapters/perf/perf.go index 1e78914ca..52fab8769 100644 --- a/internal/app/machined/pkg/adapters/perf/perf.go +++ b/internal/app/machined/pkg/adapters/perf/perf.go @@ -2,5 +2,5 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -// Package perf providers adapters wrapping resources/perf to provide additional functionality. +// Package perf implements adapters wrapping resources/perf to provide additional functionality. package perf diff --git a/internal/app/machined/pkg/controllers/cluster/local_affiliate_test.go b/internal/app/machined/pkg/controllers/cluster/local_affiliate_test.go index 7bfa74f2c..59e51c794 100644 --- a/internal/app/machined/pkg/controllers/cluster/local_affiliate_test.go +++ b/internal/app/machined/pkg/controllers/cluster/local_affiliate_test.go @@ -15,6 +15,7 @@ import ( "inet.af/netaddr" clusteradapter "github.com/talos-systems/talos/internal/app/machined/pkg/adapters/cluster" + kubespanadapter "github.com/talos-systems/talos/internal/app/machined/pkg/adapters/kubespan" clusterctrl "github.com/talos-systems/talos/internal/app/machined/pkg/controllers/cluster" "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine" "github.com/talos-systems/talos/pkg/resources/cluster" @@ -78,8 +79,8 @@ func (suite *LocalAffiliateSuite) TestGeneration() { suite.Require().NoError(err) ksIdentity := kubespan.NewIdentity(kubespan.NamespaceName, kubespan.LocalIdentity) - suite.Require().NoError(ksIdentity.TypedSpec().GenerateKey()) - suite.Require().NoError(ksIdentity.TypedSpec().UpdateAddress("8XuV9TZHW08DOk3bVxQjH9ih_TBKjnh-j44tsCLSBzo=", mac)) + suite.Require().NoError(kubespanadapter.IdentitySpec(ksIdentity.TypedSpec()).GenerateKey()) + suite.Require().NoError(kubespanadapter.IdentitySpec(ksIdentity.TypedSpec()).UpdateAddress("8XuV9TZHW08DOk3bVxQjH9ih_TBKjnh-j44tsCLSBzo=", mac)) suite.Require().NoError(suite.state.Create(suite.ctx, ksIdentity)) // add KS address to the list of node addresses, it should be ignored in the endpoints diff --git a/internal/app/machined/pkg/controllers/kubespan/identity.go b/internal/app/machined/pkg/controllers/kubespan/identity.go index 2fcd3af0e..fcc738fd8 100644 --- a/internal/app/machined/pkg/controllers/kubespan/identity.go +++ b/internal/app/machined/pkg/controllers/kubespan/identity.go @@ -16,6 +16,7 @@ import ( "github.com/cosi-project/runtime/pkg/state" "go.uber.org/zap" + kubespanadapter "github.com/talos-systems/talos/internal/app/machined/pkg/adapters/kubespan" "github.com/talos-systems/talos/internal/app/machined/pkg/controllers" "github.com/talos-systems/talos/pkg/machinery/constants" "github.com/talos-systems/talos/pkg/resources/config" @@ -107,7 +108,7 @@ func (ctrl *IdentityController) Run(ctx context.Context, r controller.Runtime, l var localIdentity kubespan.IdentitySpec if err = controllers.LoadOrNewFromFile(filepath.Join(ctrl.StatePath, constants.KubeSpanIdentityFilename), &localIdentity, func(v interface{}) error { - return v.(*kubespan.IdentitySpec).GenerateKey() + return kubespanadapter.IdentitySpec(v.(*kubespan.IdentitySpec)).GenerateKey() }); err != nil { return fmt.Errorf("error caching kubespan identity: %w", err) } @@ -115,7 +116,7 @@ func (ctrl *IdentityController) Run(ctx context.Context, r controller.Runtime, l kubespanCfg := cfg.(*kubespan.Config).TypedSpec() mac := firstMAC.(*network.HardwareAddr).TypedSpec() - if err = localIdentity.UpdateAddress(kubespanCfg.ClusterID, net.HardwareAddr(mac.HardwareAddr)); err != nil { + if err = kubespanadapter.IdentitySpec(&localIdentity).UpdateAddress(kubespanCfg.ClusterID, net.HardwareAddr(mac.HardwareAddr)); err != nil { return fmt.Errorf("error updating KubeSpan address: %w", err) } diff --git a/internal/app/machined/pkg/controllers/kubespan/manager.go b/internal/app/machined/pkg/controllers/kubespan/manager.go index 5c0fdd6fe..5cde68f11 100644 --- a/internal/app/machined/pkg/controllers/kubespan/manager.go +++ b/internal/app/machined/pkg/controllers/kubespan/manager.go @@ -20,6 +20,7 @@ import ( "golang.zx2c4.com/wireguard/wgctrl/wgtypes" "inet.af/netaddr" + kubespanadapter "github.com/talos-systems/talos/internal/app/machined/pkg/adapters/kubespan" "github.com/talos-systems/talos/pkg/machinery/constants" "github.com/talos-systems/talos/pkg/machinery/nethelpers" "github.com/talos-systems/talos/pkg/resources/config" @@ -286,14 +287,14 @@ func (ctrl *ManagerController) Run(ctx context.Context, r controller.Runtime, lo if wgDevice != nil { // wgDevice might be nil if the link is not created yet for _, peerInfo := range wgDevice.Peers { if peerStatus, ok := peerStatuses[peerInfo.PublicKey.String()]; ok { - peerStatus.UpdateFromWireguard(peerInfo) + kubespanadapter.PeerStatusSpec(peerStatus).UpdateFromWireguard(peerInfo) } } } // calculate peer status connection state for _, peerStatus := range peerStatuses { - peerStatus.CalculateState() + kubespanadapter.PeerStatusSpec(peerStatus).CalculateState() } // build wireguard peer configuration @@ -306,14 +307,14 @@ func (ctrl *ManagerController) Run(ctx context.Context, r controller.Runtime, lo var endpoint string // check if the endpoint should be updated - if peerStatus.ShouldChangeEndpoint() { - newEndpoint := peerStatus.PickNewEndpoint(peerSpec.Endpoints) + if kubespanadapter.PeerStatusSpec(peerStatus).ShouldChangeEndpoint() { + newEndpoint := kubespanadapter.PeerStatusSpec(peerStatus).PickNewEndpoint(peerSpec.Endpoints) if !newEndpoint.IsZero() { logger.Debug("updating endpoint for the peer", zap.String("peer", pubKey), zap.String("label", peerSpec.Label), zap.Stringer("endpoint", newEndpoint)) endpoint = newEndpoint.String() - peerStatus.UpdateEndpoint(newEndpoint) + kubespanadapter.PeerStatusSpec(peerStatus).UpdateEndpoint(newEndpoint) updateSpecs = true } diff --git a/internal/app/machined/pkg/controllers/kubespan/manager_test.go b/internal/app/machined/pkg/controllers/kubespan/manager_test.go index 740e34f83..1fb9c1a9d 100644 --- a/internal/app/machined/pkg/controllers/kubespan/manager_test.go +++ b/internal/app/machined/pkg/controllers/kubespan/manager_test.go @@ -17,6 +17,7 @@ import ( "golang.zx2c4.com/wireguard/wgctrl/wgtypes" "inet.af/netaddr" + kubespanadapter "github.com/talos-systems/talos/internal/app/machined/pkg/adapters/kubespan" kubespanctrl "github.com/talos-systems/talos/internal/app/machined/pkg/controllers/kubespan" "github.com/talos-systems/talos/pkg/machinery/constants" "github.com/talos-systems/talos/pkg/machinery/nethelpers" @@ -138,8 +139,8 @@ func (suite *ManagerSuite) TestReconcile() { suite.Require().NoError(err) localIdentity := kubespan.NewIdentity(kubespan.NamespaceName, kubespan.LocalIdentity) - suite.Require().NoError(localIdentity.TypedSpec().GenerateKey()) - suite.Require().NoError(localIdentity.TypedSpec().UpdateAddress("v16UCWpO2iOm82n6F8dGCJ41ZXXBvDrjRDs2su7C_zs=", mac)) + suite.Require().NoError(kubespanadapter.IdentitySpec(localIdentity.TypedSpec()).GenerateKey()) + suite.Require().NoError(kubespanadapter.IdentitySpec(localIdentity.TypedSpec()).UpdateAddress("v16UCWpO2iOm82n6F8dGCJ41ZXXBvDrjRDs2su7C_zs=", mac)) suite.Require().NoError(suite.state.Create(suite.ctx, localIdentity)) // initial setup: link should be created without any peers diff --git a/internal/app/machined/pkg/controllers/network/link_config.go b/internal/app/machined/pkg/controllers/network/link_config.go index a28b336e6..abea0376a 100644 --- a/internal/app/machined/pkg/controllers/network/link_config.go +++ b/internal/app/machined/pkg/controllers/network/link_config.go @@ -17,6 +17,7 @@ import ( "go.uber.org/zap" "inet.af/netaddr" + networkadapter "github.com/talos-systems/talos/internal/app/machined/pkg/adapters/network" talosconfig "github.com/talos-systems/talos/pkg/machinery/config" "github.com/talos-systems/talos/pkg/machinery/nethelpers" "github.com/talos-systems/talos/pkg/resources/config" @@ -442,7 +443,7 @@ func bondMaster(link *network.LinkSpecSpec, bond talosconfig.Bond) error { ADUserPortKey: bond.ADUserPortKey(), PeerNotifyDelay: bond.PeerNotifyDelay(), } - link.BondMaster.FillDefaults() + networkadapter.BondMasterSpec(&link.BondMaster).FillDefaults() return nil } diff --git a/internal/app/machined/pkg/controllers/network/link_spec.go b/internal/app/machined/pkg/controllers/network/link_spec.go index df41bbe0c..18555314e 100644 --- a/internal/app/machined/pkg/controllers/network/link_spec.go +++ b/internal/app/machined/pkg/controllers/network/link_spec.go @@ -17,6 +17,7 @@ import ( "golang.org/x/sys/unix" "golang.zx2c4.com/wireguard/wgctrl" + networkadapter "github.com/talos-systems/talos/internal/app/machined/pkg/adapters/network" "github.com/talos-systems/talos/internal/app/machined/pkg/controllers/network/watch" "github.com/talos-systems/talos/pkg/machinery/nethelpers" "github.com/talos-systems/talos/pkg/resources/network" @@ -212,7 +213,7 @@ func (ctrl *LinkSpecController) syncLink(ctx context.Context, r controller.Runti if !replace && link.TypedSpec().Kind == network.LinkKindVLAN { var existingVLAN network.VLANSpec - if err := existingVLAN.Decode(existing.Attributes.Info.Data); err != nil { + if err := networkadapter.VLANSpec(&existingVLAN).Decode(existing.Attributes.Info.Data); err != nil { return fmt.Errorf("error decoding VLAN properties on %q: %w", link.TypedSpec().Name, err) } @@ -264,7 +265,7 @@ func (ctrl *LinkSpecController) syncLink(ctx context.Context, r controller.Runti } if link.TypedSpec().Kind == network.LinkKindVLAN { - data, err = link.TypedSpec().VLAN.Encode() + data, err = networkadapter.VLANSpec(&link.TypedSpec().VLAN).Encode() if err != nil { return fmt.Errorf("error encoding VLAN attributes for link %q: %w", link.TypedSpec().Name, err) } @@ -302,7 +303,7 @@ func (ctrl *LinkSpecController) syncLink(ctx context.Context, r controller.Runti if link.TypedSpec().Kind == network.LinkKindBond { var existingBond network.BondMasterSpec - if err := existingBond.Decode(existing.Attributes.Info.Data); err != nil { + if err := networkadapter.BondMasterSpec(&existingBond).Decode(existing.Attributes.Info.Data); err != nil { return fmt.Errorf("error parsing bond attributes for %q: %w", link.TypedSpec().Name, err) } @@ -312,7 +313,7 @@ func (ctrl *LinkSpecController) syncLink(ctx context.Context, r controller.Runti zap.String("new", fmt.Sprintf("%+v", link.TypedSpec().BondMaster)), ) - data, err := link.TypedSpec().BondMaster.Encode() + data, err := networkadapter.BondMasterSpec(&link.TypedSpec().BondMaster).Encode() if err != nil { return fmt.Errorf("error encoding bond attributes for %q: %w", link.TypedSpec().Name, err) } @@ -374,14 +375,14 @@ func (ctrl *LinkSpecController) syncLink(ctx context.Context, r controller.Runti var existingSpec network.WireguardSpec - existingSpec.Decode(wgDev, false) + networkadapter.WireguardSpec(&existingSpec).Decode(wgDev, false) existingSpec.Sort() link.TypedSpec().Wireguard.Sort() // order here is important: we allow listenPort to be zero in the configuration if !existingSpec.Equal(&link.TypedSpec().Wireguard) { - config, err := link.TypedSpec().Wireguard.Encode(&existingSpec) + config, err := networkadapter.WireguardSpec(&link.TypedSpec().Wireguard).Encode(&existingSpec) if err != nil { return fmt.Errorf("error creating wireguard config patch for %q: %w", link.TypedSpec().Name, err) } diff --git a/internal/app/machined/pkg/controllers/network/link_spec_test.go b/internal/app/machined/pkg/controllers/network/link_spec_test.go index 68f54b817..4816a6ec4 100644 --- a/internal/app/machined/pkg/controllers/network/link_spec_test.go +++ b/internal/app/machined/pkg/controllers/network/link_spec_test.go @@ -24,6 +24,7 @@ import ( "golang.zx2c4.com/wireguard/wgctrl/wgtypes" "inet.af/netaddr" + networkadapter "github.com/talos-systems/talos/internal/app/machined/pkg/adapters/network" netctrl "github.com/talos-systems/talos/internal/app/machined/pkg/controllers/network" "github.com/talos-systems/talos/pkg/logging" "github.com/talos-systems/talos/pkg/machinery/nethelpers" @@ -331,7 +332,7 @@ func (suite *LinkSpecSuite) TestBond() { }, ConfigLayer: network.ConfigDefault, } - bond.TypedSpec().BondMaster.FillDefaults() + networkadapter.BondMasterSpec(&bond.TypedSpec().BondMaster).FillDefaults() dummy0Name := suite.uniqueDummyInterface() dummy0 := network.NewLinkSpec(network.NamespaceName, dummy0Name) @@ -463,7 +464,7 @@ func (suite *LinkSpecSuite) TestBond8023ad() { }, ConfigLayer: network.ConfigDefault, } - bond.TypedSpec().BondMaster.FillDefaults() + networkadapter.BondMasterSpec(&bond.TypedSpec().BondMaster).FillDefaults() dummies := []resource.Resource{} dummyNames := []string{} diff --git a/internal/app/machined/pkg/controllers/network/link_status.go b/internal/app/machined/pkg/controllers/network/link_status.go index 3f1d59fb2..27ff2be20 100644 --- a/internal/app/machined/pkg/controllers/network/link_status.go +++ b/internal/app/machined/pkg/controllers/network/link_status.go @@ -19,6 +19,7 @@ import ( "golang.zx2c4.com/wireguard/wgctrl" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + networkadapter "github.com/talos-systems/talos/internal/app/machined/pkg/adapters/network" "github.com/talos-systems/talos/internal/app/machined/pkg/controllers/network/watch" "github.com/talos-systems/talos/pkg/machinery/nethelpers" "github.com/talos-systems/talos/pkg/resources/network" @@ -214,11 +215,11 @@ func (ctrl *LinkStatusController) reconcile(ctx context.Context, r controller.Ru switch status.Kind { case network.LinkKindVLAN: - if err = status.VLAN.Decode(link.Attributes.Info.Data); err != nil { + if err = networkadapter.VLANSpec(&status.VLAN).Decode(link.Attributes.Info.Data); err != nil { logger.Warn("failure decoding VLAN attributes", zap.Error(err), zap.String("link", link.Attributes.Name)) } case network.LinkKindBond: - if err = status.BondMaster.Decode(link.Attributes.Info.Data); err != nil { + if err = networkadapter.BondMasterSpec(&status.BondMaster).Decode(link.Attributes.Info.Data); err != nil { logger.Warn("failure decoding bond attributes", zap.Error(err), zap.String("link", link.Attributes.Name)) } case network.LinkKindWireguard: @@ -228,7 +229,7 @@ func (ctrl *LinkStatusController) reconcile(ctx context.Context, r controller.Ru if err != nil { logger.Warn("failure getting wireguard attributes", zap.Error(err), zap.String("link", link.Attributes.Name)) } else { - status.Wireguard.Decode(wgDev, true) + networkadapter.WireguardSpec(&status.Wireguard).Decode(wgDev, true) } } diff --git a/pkg/resources/.importvet.yaml b/pkg/resources/.importvet.yaml index f7369bd01..9916a7ef8 100644 --- a/pkg/resources/.importvet.yaml +++ b/pkg/resources/.importvet.yaml @@ -1,7 +1,7 @@ --- # temporary rules to facilitate moving `pkg/resources` into `pkg/machinery`: # - no imports of anything from Talos except for machinery and pkg/resources itself -# - (not enforced yet) external dependencies we don't have to see in the machinery +# - external dependencies we don't have to see in the machinery rules: - regexp: ^github.com/talos-systems/talos action: deny @@ -13,11 +13,9 @@ rules: action: deny - regexp: ^github.com/jxskiss/base62 action: deny - #- regexp: ^github.com/mdlayher/netlink - # action: deny - #- regexp: ^github.com/mdlayher/netx - # action: deny + - regexp: ^github.com/mdlayher/netx + action: deny - regexp: ^github.com/prometheus/procfs action: deny - #- regexp: ^golang.zx2c4.com/wireguard/wgctrl - # action: deny + - regexp: ^golang.zx2c4.com/wireguard/wgctrl + action: deny diff --git a/pkg/resources/kubespan/identity.go b/pkg/resources/kubespan/identity.go index a84bc91ed..578cf1c72 100644 --- a/pkg/resources/kubespan/identity.go +++ b/pkg/resources/kubespan/identity.go @@ -6,14 +6,10 @@ package kubespan import ( "fmt" - "net" "github.com/cosi-project/runtime/pkg/resource" "github.com/cosi-project/runtime/pkg/resource/meta" - "golang.zx2c4.com/wireguard/wgctrl/wgtypes" "inet.af/netaddr" - - "github.com/talos-systems/talos/pkg/resources/network" ) // IdentityType is type of Identity resource. @@ -99,27 +95,3 @@ func (r *Identity) ResourceDefinition() meta.ResourceDefinitionSpec { func (r *Identity) TypedSpec() *IdentitySpec { return &r.spec } - -// GenerateKey generates new Wireguard key. -func (spec *IdentitySpec) GenerateKey() error { - key, err := wgtypes.GeneratePrivateKey() - if err != nil { - return err - } - - spec.PrivateKey = key.String() - spec.PublicKey = key.PublicKey().String() - - return nil -} - -// UpdateAddress re-calculates node address based on input data. -func (spec *IdentitySpec) UpdateAddress(clusterID string, mac net.HardwareAddr) error { - spec.Subnet = network.ULAPrefix(clusterID, network.ULAKubeSpan) - - var err error - - spec.Address, err = wgEUI64(spec.Subnet, mac) - - return err -} diff --git a/pkg/resources/kubespan/peer_status.go b/pkg/resources/kubespan/peer_status.go index 8ed22c02e..219317150 100644 --- a/pkg/resources/kubespan/peer_status.go +++ b/pkg/resources/kubespan/peer_status.go @@ -10,7 +10,6 @@ import ( "github.com/cosi-project/runtime/pkg/resource" "github.com/cosi-project/runtime/pkg/resource/meta" - "golang.zx2c4.com/wireguard/wgctrl/wgtypes" "inet.af/netaddr" ) @@ -112,134 +111,3 @@ func (r *PeerStatus) ResourceDefinition() meta.ResourceDefinitionSpec { func (r *PeerStatus) TypedSpec() *PeerStatusSpec { return &r.spec } - -// PeerDownInterval is the time since last handshake when established peer is considered to be down. -// -// WG whitepaper defines a downed peer as being: -// Handshake Timeout (180s) + Rekey Timeout (5s) + Rekey Attempt Timeout (90s) -// -// This interval is applied when the link is already established. -const PeerDownInterval = (180 + 5 + 90) * time.Second - -// EndpointConnectionTimeout is time to wait for initial handshake when the endpoint is just set. -const EndpointConnectionTimeout = 15 * time.Second - -// CalculateState updates connection state based on other fields values. -// -// Goal: endpoint is ultimately down if we haven't seen handshake for more than peerDownInterval, -// but as the endpoints get updated we want faster feedback, so we start checking more aggressively -// that the handshake happened within endpointConnectionTimeout since last endpoint change. -// -// Timeline: -// -// ----------------------------------------------------------------------> -// ^ ^ ^ -// | | | -// T0 T0+endpointConnectionTimeout T0+peerDownInterval -// -// Where T0 = LastEndpontChange -// -// The question is where is LastHandshakeTimeout vs. those points above: -// -// * if we're past (T0+peerDownInterval), simply check that time since last handshake < peerDownInterval -// * if we're between (T0+endpointConnectionTimeout) and (T0+peerDownInterval), and there's no handshake -// after the endpoint change, assume that the endpoint is down -// * if we're between (T0) and (T0+endpointConnectionTimeout), and there's no handshake since the endpoint change, -// consider the state to be unknown -func (spec *PeerStatusSpec) CalculateState() { - sinceLastHandshake := time.Since(spec.LastHandshakeTime) - sinceEndpointChange := time.Since(spec.LastEndpointChange) - - spec.CalculateStateWithDurations(sinceLastHandshake, sinceEndpointChange) -} - -// CalculateStateWithDurations calculates the state based on the time since events. -func (spec *PeerStatusSpec) CalculateStateWithDurations(sinceLastHandshake, sinceEndpointChange time.Duration) { - switch { - case sinceEndpointChange > PeerDownInterval: // past T0+peerDownInterval - // if we got handshake in the last peerDownInterval, endpoint is up - if sinceLastHandshake < PeerDownInterval { - spec.State = PeerStateUp - } else { - spec.State = PeerStateDown - } - case sinceEndpointChange < EndpointConnectionTimeout: // between (T0) and (T0+endpointConnectionTimeout) - // endpoint got recently updated, consider no handshake as 'unknown' - if spec.LastHandshakeTime.After(spec.LastEndpointChange) { - spec.State = PeerStateUp - } else { - spec.State = PeerStateUnknown - } - - default: // otherwise, we're between (T0+endpointConnectionTimeout) and (T0+peerDownInterval) - // if we haven't had the handshake yet, consider the endpoint to be down - if spec.LastHandshakeTime.After(spec.LastEndpointChange) { - spec.State = PeerStateUp - } else { - spec.State = PeerStateDown - } - } - - if spec.State == PeerStateDown && spec.LastUsedEndpoint.IsZero() { - // no endpoint, so unknown - spec.State = PeerStateUnknown - } -} - -// UpdateFromWireguard updates fields from wgtypes information. -func (spec *PeerStatusSpec) UpdateFromWireguard(peer wgtypes.Peer) { - if peer.Endpoint != nil { - spec.Endpoint, _ = netaddr.FromStdAddr(peer.Endpoint.IP, peer.Endpoint.Port, "") - } else { - spec.Endpoint = netaddr.IPPort{} - } - - spec.LastHandshakeTime = peer.LastHandshakeTime - spec.TransmitBytes = peer.TransmitBytes - spec.ReceiveBytes = peer.ReceiveBytes -} - -// UpdateEndpoint updates the endpoint information and last update timestamp. -func (spec *PeerStatusSpec) UpdateEndpoint(endpoint netaddr.IPPort) { - spec.Endpoint = endpoint - spec.LastUsedEndpoint = endpoint - spec.LastEndpointChange = time.Now() - spec.State = PeerStateUnknown -} - -// ShouldChangeEndpoint tells whether endpoint should be updated. -func (spec *PeerStatusSpec) ShouldChangeEndpoint() bool { - return spec.State == PeerStateDown || spec.LastUsedEndpoint.IsZero() -} - -// PickNewEndpoint picks new endpoint given the state and list of available endpoints. -// -// If returned newEndpoint is zero value, no new endpoint is available. -func (spec *PeerStatusSpec) PickNewEndpoint(endpoints []netaddr.IPPort) (newEndpoint netaddr.IPPort) { - if len(endpoints) == 0 { - return - } - - if spec.LastUsedEndpoint.IsZero() { - // first time setting the endpoint - newEndpoint = endpoints[0] - } else { - // find the next endpoint after LastUsedEndpoint and use it - idx := -1 - - for i := range endpoints { - if endpoints[i] == spec.LastUsedEndpoint { - idx = i - - break - } - } - - // special case: if the peer has just a single endpoint, we can't rotate - if !(len(endpoints) == 1 && idx == 0 && spec.Endpoint == spec.LastUsedEndpoint) { - newEndpoint = endpoints[(idx+1)%len(endpoints)] - } - } - - return -} diff --git a/pkg/resources/kubespan/utils.go b/pkg/resources/kubespan/utils.go deleted file mode 100644 index c8619177b..000000000 --- a/pkg/resources/kubespan/utils.go +++ /dev/null @@ -1,31 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -package kubespan - -import ( - "fmt" - "net" - - "github.com/mdlayher/netx/eui64" - "inet.af/netaddr" -) - -func wgEUI64(prefix netaddr.IPPrefix, mac net.HardwareAddr) (out netaddr.IPPrefix, err error) { - if prefix.IsZero() { - return out, fmt.Errorf("cannot calculate IP from zero prefix") - } - - stdIP, err := eui64.ParseMAC(prefix.IPNet().IP, mac) - if err != nil { - return out, fmt.Errorf("failed to parse MAC into EUI-64 address: %w", err) - } - - ip, ok := netaddr.FromStdIP(stdIP) - if !ok { - return out, fmt.Errorf("failed to parse intermediate standard IP %q: %w", stdIP.String(), err) - } - - return netaddr.IPPrefixFrom(ip, ip.BitLen()), nil -} diff --git a/pkg/resources/network/link.go b/pkg/resources/network/link.go index f89ba72b2..af7bcf0b2 100644 --- a/pkg/resources/network/link.go +++ b/pkg/resources/network/link.go @@ -5,11 +5,9 @@ package network import ( - "net" "sort" "time" - "golang.zx2c4.com/wireguard/wgctrl/wgtypes" "inet.af/netaddr" "github.com/talos-systems/talos/pkg/machinery/nethelpers" @@ -52,33 +50,6 @@ type BondMasterSpec struct { PeerNotifyDelay uint32 `yaml:"peerNotifyDelay,omitempty"` } -// FillDefaults fills zero values with proper default values. -func (bond *BondMasterSpec) FillDefaults() { - if bond.ResendIGMP == 0 { - bond.ResendIGMP = 1 - } - - if bond.LPInterval == 0 { - bond.LPInterval = 1 - } - - if bond.PacketsPerSlave == 0 { - bond.PacketsPerSlave = 1 - } - - if bond.NumPeerNotif == 0 { - bond.NumPeerNotif = 1 - } - - if bond.Mode != nethelpers.BondModeALB && bond.Mode != nethelpers.BondModeTLB { - bond.TLBDynamicLB = 1 - } - - if bond.Mode == nethelpers.BondMode8023AD { - bond.ADActorSysPrio = 65535 - } -} - // WireguardSpec describes Wireguard settings if Kind == "wireguard". type WireguardSpec struct { // PrivateKey is used to configure the link, present only in the LinkSpec. @@ -201,176 +172,6 @@ func (spec *WireguardSpec) Sort() { } } -// Encode converts WireguardSpec to wgctrl.Config "patch" to adjust the config to match the spec. -// -// Both specs should be sorted. -// -// Encode produces a "diff" as *wgtypes.Config which when applied transitions `existing` configuration into -// configuration `spec`. -// -//nolint:gocyclo,cyclop -func (spec *WireguardSpec) Encode(existing *WireguardSpec) (*wgtypes.Config, error) { - cfg := &wgtypes.Config{} - - if existing.PrivateKey != spec.PrivateKey { - key, err := wgtypes.ParseKey(spec.PrivateKey) - if err != nil { - return nil, err - } - - cfg.PrivateKey = &key - } - - if existing.ListenPort != spec.ListenPort { - cfg.ListenPort = &spec.ListenPort - } - - if existing.FirewallMark != spec.FirewallMark { - cfg.FirewallMark = &spec.FirewallMark - } - - // perform a merge of two sorted list of peers producing diff - l, r := 0, 0 - - for l < len(existing.Peers) || r < len(spec.Peers) { - addPeer := func(peer *WireguardPeer) error { - pubKey, err := wgtypes.ParseKey(peer.PublicKey) - if err != nil { - return err - } - - var presharedKey *wgtypes.Key - - if peer.PresharedKey != "" { - var parsedKey wgtypes.Key - - parsedKey, err = wgtypes.ParseKey(peer.PresharedKey) - if err != nil { - return err - } - - presharedKey = &parsedKey - } - - var endpoint *net.UDPAddr - - if peer.Endpoint != "" { - endpoint, err = net.ResolveUDPAddr("", peer.Endpoint) - if err != nil { - return err - } - } - - allowedIPs := make([]net.IPNet, len(peer.AllowedIPs)) - - for i := range peer.AllowedIPs { - allowedIPs[i] = *peer.AllowedIPs[i].IPNet() - } - - cfg.Peers = append(cfg.Peers, wgtypes.PeerConfig{ - PublicKey: pubKey, - Endpoint: endpoint, - PresharedKey: presharedKey, - PersistentKeepaliveInterval: &peer.PersistentKeepaliveInterval, - ReplaceAllowedIPs: true, - AllowedIPs: allowedIPs, - }) - - return nil - } - - deletePeer := func(peer *WireguardPeer) error { - pubKey, err := wgtypes.ParseKey(peer.PublicKey) - if err != nil { - return err - } - - cfg.Peers = append(cfg.Peers, wgtypes.PeerConfig{ - PublicKey: pubKey, - Remove: true, - }) - - return nil - } - - var left, right *WireguardPeer - - if l < len(existing.Peers) { - left = &existing.Peers[l] - } - - if r < len(spec.Peers) { - right = &spec.Peers[r] - } - - switch { - // peer from the "right" (new spec) is missing in "existing" (left), add it - case left == nil || (right != nil && left.PublicKey > right.PublicKey): - if err := addPeer(right); err != nil { - return nil, err - } - - r++ - // peer from the "left" (existing) is missing in new spec (right), so it should be removed - case right == nil || (left != nil && left.PublicKey < right.PublicKey): - // deleting peers from the existing - if err := deletePeer(left); err != nil { - return nil, err - } - - l++ - // peer public keys are equal, so either they are identical or peer should be replaced - case left.PublicKey == right.PublicKey: - if !left.Equal(right) { - // replace peer - if err := addPeer(right); err != nil { - return nil, err - } - } - - l++ - r++ - } - } - - return cfg, nil -} - -// Decode spec from the device state. -func (spec *WireguardSpec) Decode(dev *wgtypes.Device, isStatus bool) { - if isStatus { - spec.PublicKey = dev.PublicKey.String() - } else { - spec.PrivateKey = dev.PrivateKey.String() - } - - spec.ListenPort = dev.ListenPort - spec.FirewallMark = dev.FirewallMark - - spec.Peers = make([]WireguardPeer, len(dev.Peers)) - - for i := range spec.Peers { - spec.Peers[i].PublicKey = dev.Peers[i].PublicKey.String() - - if dev.Peers[i].Endpoint != nil { - spec.Peers[i].Endpoint = dev.Peers[i].Endpoint.String() - } - - var zeroKey wgtypes.Key - - if dev.Peers[i].PresharedKey != zeroKey { - spec.Peers[i].PresharedKey = dev.Peers[i].PresharedKey.String() - } - - spec.Peers[i].PersistentKeepaliveInterval = dev.Peers[i].PersistentKeepaliveInterval - spec.Peers[i].AllowedIPs = make([]netaddr.IPPrefix, len(dev.Peers[i].AllowedIPs)) - - for j := range dev.Peers[i].AllowedIPs { - spec.Peers[i].AllowedIPs[j], _ = netaddr.FromStdIPNet(&dev.Peers[i].AllowedIPs[j]) - } - } -} - // Merge with other Wireguard spec overwriting non-zero values. func (spec *WireguardSpec) Merge(other WireguardSpec) { if other.ListenPort != 0 { diff --git a/pkg/resources/network/link_test.go b/pkg/resources/network/link_test.go index d0f755a4c..3de308c92 100644 --- a/pkg/resources/network/link_test.go +++ b/pkg/resources/network/link_test.go @@ -5,63 +5,21 @@ package network_test import ( - "net" "testing" "time" - "github.com/AlekSi/pointer" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "golang.zx2c4.com/wireguard/wgctrl/wgtypes" "inet.af/netaddr" - "github.com/talos-systems/talos/pkg/machinery/nethelpers" "github.com/talos-systems/talos/pkg/resources/network" ) -func TestVLANSpec(t *testing.T) { - spec := network.VLANSpec{ - VID: 25, - Protocol: nethelpers.VLANProtocol8021AD, - } - - b, err := spec.Encode() - require.NoError(t, err) - - var decodedSpec network.VLANSpec - - require.NoError(t, decodedSpec.Decode(b)) - - require.Equal(t, spec, decodedSpec) -} - -func TestBondMasterSpec(t *testing.T) { - spec := network.BondMasterSpec{ - Mode: nethelpers.BondModeActiveBackup, - MIIMon: 100, - UpDelay: 200, - DownDelay: 300, - } - - b, err := spec.Encode() - require.NoError(t, err) - - var decodedSpec network.BondMasterSpec - - require.NoError(t, decodedSpec.Decode(b)) - - require.Equal(t, spec, decodedSpec) -} - func TestWireguardPeer(t *testing.T) { - key1, err := wgtypes.GeneratePrivateKey() - require.NoError(t, err) - - key2, err := wgtypes.GeneratePrivateKey() - require.NoError(t, err) + key1 := "2t4fMmV1fBhI6RgoUzHp9BoWLT7oq0C/fOV17f7FqTI=" + key2 := "zHyf80qsjQ1EfiXkjxaLf9K9VZ6YRwcXx8GrpXQ6/yQ=" peer1 := network.WireguardPeer{ - PublicKey: key1.PublicKey().String(), + PublicKey: key1, Endpoint: "127.0.0.1:1000", PersistentKeepaliveInterval: 10 * time.Second, AllowedIPs: []netaddr.IPPrefix{ @@ -71,7 +29,7 @@ func TestWireguardPeer(t *testing.T) { } peer2 := network.WireguardPeer{ - PublicKey: key2.PublicKey().String(), + PublicKey: key2, Endpoint: "127.0.0.1:2000", AllowedIPs: []netaddr.IPPrefix{ netaddr.MustParseIPPrefix("10.2.0.0/15"), @@ -80,7 +38,7 @@ func TestWireguardPeer(t *testing.T) { } peer1_1 := network.WireguardPeer{ - PublicKey: key1.PublicKey().String(), + PublicKey: key1, Endpoint: "127.0.0.1:1000", PersistentKeepaliveInterval: 10 * time.Second, AllowedIPs: []netaddr.IPPrefix{ @@ -90,7 +48,7 @@ func TestWireguardPeer(t *testing.T) { } peer1_2 := network.WireguardPeer{ - PublicKey: key1.PublicKey().String(), + PublicKey: key1, PersistentKeepaliveInterval: 10 * time.Second, AllowedIPs: []netaddr.IPPrefix{ netaddr.MustParseIPPrefix("10.2.0.0/16"), @@ -110,275 +68,10 @@ func TestWireguardSpecZero(t *testing.T) { assert.True(t, zeroSpec.IsZero()) } -func TestWireguardSpecDecode(t *testing.T) { - priv, err := wgtypes.GeneratePrivateKey() - require.NoError(t, err) - - pub1, err := wgtypes.GeneratePrivateKey() - require.NoError(t, err) - - pub2, err := wgtypes.GeneratePrivateKey() - require.NoError(t, err) - - var spec network.WireguardSpec - - // decode in spec mode - spec.Decode(&wgtypes.Device{ - PrivateKey: priv, - ListenPort: 30000, - FirewallMark: 1, - Peers: []wgtypes.Peer{ - { - PublicKey: pub1.PublicKey(), - PresharedKey: priv, - Endpoint: &net.UDPAddr{ - IP: net.ParseIP("10.2.0.3"), - Port: 20000, - }, - AllowedIPs: []net.IPNet{ - { - IP: net.ParseIP("172.24.0.0"), - Mask: net.IPv4Mask(255, 255, 0, 0), - }, - }, - }, - { - PublicKey: pub2.PublicKey(), - AllowedIPs: []net.IPNet{ - { - IP: net.ParseIP("172.25.0.0"), - Mask: net.IPv4Mask(255, 255, 255, 0), - }, - }, - }, - }, - }, false) - - expected := network.WireguardSpec{ - PrivateKey: priv.String(), - ListenPort: 30000, - FirewallMark: 1, - Peers: []network.WireguardPeer{ - { - PublicKey: pub1.PublicKey().String(), - PresharedKey: priv.String(), - Endpoint: "10.2.0.3:20000", - AllowedIPs: []netaddr.IPPrefix{ - netaddr.MustParseIPPrefix("172.24.0.0/16"), - }, - }, - { - PublicKey: pub2.PublicKey().String(), - AllowedIPs: []netaddr.IPPrefix{ - netaddr.MustParseIPPrefix("172.25.0.0/24"), - }, - }, - }, - } - - assert.Equal(t, expected, spec) - assert.True(t, expected.Equal(&spec)) - - // zeroed out listen port is still acceptable on the right side - spec.ListenPort = 0 - assert.True(t, expected.Equal(&spec)) - - // ... but not on the left side - expected.ListenPort = 0 - spec.ListenPort = 30000 - assert.False(t, expected.Equal(&spec)) - - var zeroSpec network.WireguardSpec - - assert.False(t, zeroSpec.Equal(&spec)) -} - -func TestWireguardSpecDecodeStatus(t *testing.T) { - priv, err := wgtypes.GeneratePrivateKey() - require.NoError(t, err) - - var spec network.WireguardSpec - - // decode in status mode - spec.Decode(&wgtypes.Device{ - PrivateKey: priv, - PublicKey: priv.PublicKey(), - ListenPort: 30000, - FirewallMark: 1, - }, true) - - expected := network.WireguardSpec{ - PublicKey: priv.PublicKey().String(), - ListenPort: 30000, - FirewallMark: 1, - Peers: []network.WireguardPeer{}, - } - - assert.Equal(t, expected, spec) -} - -func TestWireguardSpecEncode(t *testing.T) { - priv, err := wgtypes.GeneratePrivateKey() - require.NoError(t, err) - - pub1, err := wgtypes.GeneratePrivateKey() - require.NoError(t, err) - - pub2, err := wgtypes.GeneratePrivateKey() - require.NoError(t, err) - - // make sure pub1 < pub2 - if pub1.PublicKey().String() > pub2.PublicKey().String() { - pub1, pub2 = pub2, pub1 - } - - specV1 := network.WireguardSpec{ - PrivateKey: priv.String(), - ListenPort: 30000, - FirewallMark: 1, - Peers: []network.WireguardPeer{ - { - PublicKey: pub1.PublicKey().String(), - Endpoint: "10.2.0.3:20000", - AllowedIPs: []netaddr.IPPrefix{ - netaddr.MustParseIPPrefix("172.24.0.0/16"), - }, - }, - { - PublicKey: pub2.PublicKey().String(), - AllowedIPs: []netaddr.IPPrefix{ - netaddr.MustParseIPPrefix("172.25.0.0/24"), - }, - }, - }, - } - - specV1.Sort() - - var zero network.WireguardSpec - - zero.Decode(&wgtypes.Device{}, false) - zero.Sort() - - // from zero (empty) config to config with two peers - delta, err := specV1.Encode(&zero) - require.NoError(t, err) - - assert.Equal(t, &wgtypes.Config{ - PrivateKey: &priv, - ListenPort: pointer.ToInt(30000), - FirewallMark: pointer.ToInt(1), - Peers: []wgtypes.PeerConfig{ - { - PublicKey: pub1.PublicKey(), - Endpoint: &net.UDPAddr{ - IP: net.ParseIP("10.2.0.3"), - Port: 20000, - }, - PersistentKeepaliveInterval: pointer.ToDuration(0), - ReplaceAllowedIPs: true, - AllowedIPs: []net.IPNet{ - { - IP: net.ParseIP("172.24.0.0").To4(), - Mask: net.IPv4Mask(255, 255, 0, 0), - }, - }, - }, - { - PublicKey: pub2.PublicKey(), - PersistentKeepaliveInterval: pointer.ToDuration(0), - ReplaceAllowedIPs: true, - AllowedIPs: []net.IPNet{ - { - IP: net.ParseIP("172.25.0.0").To4(), - Mask: net.IPv4Mask(255, 255, 255, 0), - }, - }, - }, - }, - }, delta) - - // noop - delta, err = specV1.Encode(&specV1) - require.NoError(t, err) - - assert.Equal(t, &wgtypes.Config{}, delta) - - // delete peer2 - specV2 := network.WireguardSpec{ - PrivateKey: priv.String(), - ListenPort: 30000, - FirewallMark: 1, - Peers: []network.WireguardPeer{ - { - PublicKey: pub1.PublicKey().String(), - Endpoint: "10.2.0.3:20000", - AllowedIPs: []netaddr.IPPrefix{ - netaddr.MustParseIPPrefix("172.24.0.0/16"), - }, - }, - }, - } - - delta, err = specV2.Encode(&specV1) - require.NoError(t, err) - - assert.Equal(t, &wgtypes.Config{ - Peers: []wgtypes.PeerConfig{ - { - PublicKey: pub2.PublicKey(), - Remove: true, - }, - }, - }, delta) - - // update peer1, firewallMark - specV3 := network.WireguardSpec{ - PrivateKey: priv.String(), - ListenPort: 30000, - FirewallMark: 2, - Peers: []network.WireguardPeer{ - { - PublicKey: pub1.PublicKey().String(), - PresharedKey: priv.String(), - AllowedIPs: []netaddr.IPPrefix{ - netaddr.MustParseIPPrefix("172.24.0.0/16"), - }, - }, - }, - } - - delta, err = specV3.Encode(&specV2) - require.NoError(t, err) - - assert.Equal(t, &wgtypes.Config{ - FirewallMark: pointer.ToInt(2), - Peers: []wgtypes.PeerConfig{ - { - PublicKey: pub1.PublicKey(), - PresharedKey: &priv, - PersistentKeepaliveInterval: pointer.ToDuration(0), - ReplaceAllowedIPs: true, - AllowedIPs: []net.IPNet{ - { - IP: net.ParseIP("172.24.0.0").To4(), - Mask: net.IPv4Mask(255, 255, 0, 0), - }, - }, - }, - }, - }, delta) -} - func TestWireguardSpecMerge(t *testing.T) { - priv, err := wgtypes.GeneratePrivateKey() - require.NoError(t, err) - - pub1, err := wgtypes.GeneratePrivateKey() - require.NoError(t, err) - - pub2, err := wgtypes.GeneratePrivateKey() - require.NoError(t, err) + priv := "KIT4Pe7jFbCnH+ZMwsqsIbX2xiTdmemQU9w9sYItqXY=" + pub1 := "VHlgUWcakWcZyrtKI476PJSdoINTc1G5PYO1SEkr4FQ=" + pub2 := "EiBteTHU1Dk3w9CYJtHFaSgkuZBVBZLEa+Y07xu+xno=" for _, tt := range []struct { name string @@ -396,7 +89,7 @@ func TestWireguardSpecMerge(t *testing.T) { ListenPort: 456, Peers: []network.WireguardPeer{ { - PublicKey: pub2.String(), + PublicKey: pub2, Endpoint: "127.0.0.1:3445", }, }, @@ -406,7 +99,7 @@ func TestWireguardSpecMerge(t *testing.T) { ListenPort: 456, Peers: []network.WireguardPeer{ { - PublicKey: pub2.String(), + PublicKey: pub2, Endpoint: "127.0.0.1:3445", }, }, @@ -415,21 +108,21 @@ func TestWireguardSpecMerge(t *testing.T) { { name: "otherzero", spec: network.WireguardSpec{ - PrivateKey: priv.String(), + PrivateKey: priv, FirewallMark: 34, Peers: []network.WireguardPeer{ { - PublicKey: pub1.String(), + PublicKey: pub1, }, }, }, expected: network.WireguardSpec{ - PrivateKey: priv.String(), + PrivateKey: priv, FirewallMark: 34, Peers: []network.WireguardPeer{ { - PublicKey: pub1.String(), + PublicKey: pub1, }, }, }, @@ -437,11 +130,11 @@ func TestWireguardSpecMerge(t *testing.T) { { name: "mixed", spec: network.WireguardSpec{ - PrivateKey: priv.String(), + PrivateKey: priv, FirewallMark: 34, Peers: []network.WireguardPeer{ { - PublicKey: pub1.String(), + PublicKey: pub1, }, }, }, @@ -449,22 +142,22 @@ func TestWireguardSpecMerge(t *testing.T) { ListenPort: 456, Peers: []network.WireguardPeer{ { - PublicKey: pub2.String(), + PublicKey: pub2, Endpoint: "127.0.0.1:3445", }, }, }, expected: network.WireguardSpec{ - PrivateKey: priv.String(), + PrivateKey: priv, FirewallMark: 34, ListenPort: 456, Peers: []network.WireguardPeer{ { - PublicKey: pub1.String(), + PublicKey: pub1, }, { - PublicKey: pub2.String(), + PublicKey: pub2, Endpoint: "127.0.0.1:3445", }, }, @@ -473,11 +166,11 @@ func TestWireguardSpecMerge(t *testing.T) { { name: "peerconflict", spec: network.WireguardSpec{ - PrivateKey: priv.String(), + PrivateKey: priv, FirewallMark: 34, Peers: []network.WireguardPeer{ { - PublicKey: pub1.String(), + PublicKey: pub1, PersistentKeepaliveInterval: time.Second, }, }, @@ -486,27 +179,27 @@ func TestWireguardSpecMerge(t *testing.T) { ListenPort: 456, Peers: []network.WireguardPeer{ { - PublicKey: pub1.String(), + PublicKey: pub1, Endpoint: "127.0.0.1:466", }, { - PublicKey: pub2.String(), + PublicKey: pub2, Endpoint: "127.0.0.1:3445", }, }, }, expected: network.WireguardSpec{ - PrivateKey: priv.String(), + PrivateKey: priv, FirewallMark: 34, ListenPort: 456, Peers: []network.WireguardPeer{ { - PublicKey: pub1.String(), + PublicKey: pub1, PersistentKeepaliveInterval: time.Second, }, { - PublicKey: pub2.String(), + PublicKey: pub2, Endpoint: "127.0.0.1:3445", }, },