diff --git a/control/tsp/map.go b/control/tsp/map.go index 96531255b..961c5dd57 100644 --- a/control/tsp/map.go +++ b/control/tsp/map.go @@ -249,6 +249,82 @@ func (s *MapSession) Close() error { return s.closeErr } +// SendMapUpdateOpts contains options for [Client.SendMapUpdate]. +type SendMapUpdateOpts struct { + // NodeKey is the node's private key. Required. + NodeKey key.NodePrivate + + // DiscoKey, if non-zero, is the node's disco public key. + // Peers use it to verify disco pings from this node, which is + // what enables direct (non-DERP) paths. + DiscoKey key.DiscoPublic + + // Hostinfo is the host information to send. Optional; + // if nil, a minimal default is used. + Hostinfo *tailcfg.Hostinfo +} + +// SendMapUpdate sends a one-shot, non-streaming MapRequest to push small +// updates (such as the node's endpoints, hostinfo, or disco public key) to the +// coordination server without starting or disturbing a streaming map session. +func (c *Client) SendMapUpdate(ctx context.Context, opts SendMapUpdateOpts) error { + if opts.NodeKey.IsZero() { + return fmt.Errorf("NodeKey is required") + } + + hi := opts.Hostinfo + if hi == nil { + hi = defaultHostinfo() + } + + mapReq := tailcfg.MapRequest{ + Version: tailcfg.CurrentCapabilityVersion, + NodeKey: opts.NodeKey.Public(), + DiscoKey: opts.DiscoKey, + Hostinfo: hi, + Compress: "zstd", + + // A lite update that lets the server persist our state without breaking + // any existing streaming map session. See the [tailcfg.MapResponse] + // OmitPeers docs. + OmitPeers: true, + Stream: false, + ReadOnly: false, + } + + body, err := json.Marshal(mapReq) + if err != nil { + return fmt.Errorf("encoding map request: %w", err) + } + + nc, err := c.noiseClient(ctx) + if err != nil { + return fmt.Errorf("establishing noise connection: %w", err) + } + + url := c.serverURL + "/machine/map" + url = strings.Replace(url, "http:", "https:", 1) + req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("creating map request: %w", err) + } + ts2021.AddLBHeader(req, opts.NodeKey.Public()) + + res, err := nc.Do(req) + if err != nil { + return fmt.Errorf("map request: %w", err) + } + defer res.Body.Close() + + if res.StatusCode != 200 { + msg, _ := io.ReadAll(res.Body) + return fmt.Errorf("map request: http %d: %.200s", + res.StatusCode, strings.TrimSpace(string(msg))) + } + io.Copy(io.Discard, res.Body) + return nil +} + // Map sends a map request to the coordination server and returns a MapSession // for reading the framed, zstd-compressed response(s). func (c *Client) Map(ctx context.Context, opts MapOpts) (*MapSession, error) { diff --git a/control/tsp/map_test.go b/control/tsp/map_test.go index 15b32dd36..14b64f39a 100644 --- a/control/tsp/map_test.go +++ b/control/tsp/map_test.go @@ -131,6 +131,135 @@ func TestMapAgainstTestControl(t *testing.T) { } } +// TestSendMapUpdateAgainstTestControl verifies that a [Client.SendMapUpdate] +// call from one node lands on the coordination server and that peer nodes +// subsequently observe the updated DiscoKey via their own streaming map poll. +func TestSendMapUpdateAgainstTestControl(t *testing.T) { + ctrl := &testcontrol.Server{} + ctrl.HTTPTestServer = httptest.NewUnstartedServer(ctrl) + ctrl.HTTPTestServer.Start() + t.Cleanup(ctrl.HTTPTestServer.Close) + baseURL := ctrl.HTTPTestServer.URL + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + serverKey, err := DiscoverServerKey(ctx, baseURL) + if err != nil { + t.Fatalf("DiscoverServerKey: %v", err) + } + + register := func(hostname string) (nodeKey key.NodePrivate, machineKey key.MachinePrivate) { + t.Helper() + nodeKey = key.NewNode() + machineKey = key.NewMachine() + c, err := NewClient(ClientOpts{ + ServerURL: baseURL, + MachineKey: machineKey, + }) + if err != nil { + t.Fatalf("NewClient %s: %v", hostname, err) + } + defer c.Close() + c.SetControlPublicKey(serverKey) + if _, err := c.Register(ctx, RegisterOpts{ + NodeKey: nodeKey, + Hostinfo: &tailcfg.Hostinfo{Hostname: hostname}, + }); err != nil { + t.Fatalf("Register %s: %v", hostname, err) + } + return nodeKey, machineKey + } + + nodeKeyA, machineKeyA := register("a") + nodeKeyB, machineKeyB := register("b") + + // B starts a streaming map poll so we can observe updates about peer A. + clientB, err := NewClient(ClientOpts{ + ServerURL: baseURL, + MachineKey: machineKeyB, + }) + if err != nil { + t.Fatalf("NewClient B: %v", err) + } + defer clientB.Close() + clientB.SetControlPublicKey(serverKey) + + session, err := clientB.Map(ctx, MapOpts{ + NodeKey: nodeKeyB, + Hostinfo: &tailcfg.Hostinfo{Hostname: "b"}, + Stream: true, + }) + if err != nil { + t.Fatalf("Map B: %v", err) + } + defer session.Close() + + nextNonKeepalive := func() *tailcfg.MapResponse { + t.Helper() + for { + resp, err := session.Next() + if err != nil { + t.Fatalf("session.Next: %v", err) + } + if resp.KeepAlive { + continue + } + return resp + } + } + + // Drain B's initial MapResponse. A should be present as a peer with a + // zero DiscoKey (it never pushed one). + first := nextNonKeepalive() + var initialA *tailcfg.Node + for _, p := range first.Peers { + if p.Key == nodeKeyA.Public() { + initialA = p + break + } + } + if initialA == nil { + t.Fatalf("peer A (%v) not in B's first MapResponse", nodeKeyA.Public()) + } + if !initialA.DiscoKey.IsZero() { + t.Fatalf("peer A initial DiscoKey = %v, want zero", initialA.DiscoKey) + } + + // A pushes its disco key via SendMapUpdate. + clientA, err := NewClient(ClientOpts{ + ServerURL: baseURL, + MachineKey: machineKeyA, + }) + if err != nil { + t.Fatalf("NewClient A: %v", err) + } + defer clientA.Close() + clientA.SetControlPublicKey(serverKey) + + wantDisco := key.NewDisco().Public() + if err := clientA.SendMapUpdate(ctx, SendMapUpdateOpts{ + NodeKey: nodeKeyA, + DiscoKey: wantDisco, + Hostinfo: &tailcfg.Hostinfo{Hostname: "a"}, + }); err != nil { + t.Fatalf("SendMapUpdate: %v", err) + } + + // B should now observe A's new DiscoKey in a subsequent MapResponse. + for { + resp := nextNonKeepalive() + for _, p := range resp.Peers { + if p.Key != nodeKeyA.Public() { + continue + } + if p.DiscoKey == wantDisco { + return // success + } + } + } +} + // newTestPipeline builds the same framedReader → zstd → boundedReader → // json.Decoder pipeline that [Client.Map] builds for a live session, but // feeds it from a raw byte slice. Returned jdec can be used with Decode to