From 9cce0baa43eddf3e75b31ffecf3649093a07e7a1 Mon Sep 17 00:00:00 2001 From: Jason O'Donnell <2160810+jasonodonnell@users.noreply.github.com> Date: Sat, 7 Mar 2026 09:32:06 -0500 Subject: [PATCH] tailsync --- ipn/ipnlocal/sync.go | 24 ++++ tailcfg/tailcfg.go | 4 +- tailsync/service.go | 7 + tailsync/tailsyncimpl/service.go | 135 ++++++++++++++++--- tailsync/tailsyncimpl/session.go | 192 ++++++++++++++++++++++++++-- tailsync/tailsyncimpl/syncclient.go | 186 +++++++++++++++++++++++++++ 6 files changed, 523 insertions(+), 25 deletions(-) create mode 100644 tailsync/tailsyncimpl/syncclient.go diff --git a/ipn/ipnlocal/sync.go b/ipn/ipnlocal/sync.go index 0daaa92bb..58bb156cf 100644 --- a/ipn/ipnlocal/sync.go +++ b/ipn/ipnlocal/sync.go @@ -10,6 +10,28 @@ import ( "tailscale.com/tailsync" ) +// SyncSetTransport configures the sync service with the PeerAPI transport. +func (b *LocalBackend) SyncSetTransport() { + svc, ok := b.sys.FileSync.GetOK() + if !ok { + return + } + transport := b.Dialer().PeerAPITransport() + peerURL := func(peerID string) string { + cn := b.currentNode() + for _, p := range cn.Peers() { + if string(p.StableID()) == peerID || p.DisplayName(false) == peerID || p.Name() == peerID { + base := cn.PeerAPIBase(p) + if base != "" { + return base + } + } + } + return "" + } + svc.SetTransport(transport, peerURL) +} + // SyncSharingEnabled reports whether sharing sync roots via Tailsync is // enabled. This is currently based on checking for the sync:share node // attribute. @@ -50,6 +72,8 @@ func (b *LocalBackend) SyncSetSession(session *tailsync.Session) error { if !ok { return tailsync.ErrSyncNotEnabled } + // Ensure transport is configured before starting session. + b.SyncSetTransport() return svc.SetSession(session) } diff --git a/tailcfg/tailcfg.go b/tailcfg/tailcfg.go index c90a66f0f..c71ba728e 100644 --- a/tailcfg/tailcfg.go +++ b/tailcfg/tailcfg.go @@ -1555,10 +1555,10 @@ const ( // PeerCapabilityTailsync grants the ability for a peer to sync files // with this node's exported sync roots. - PeerCapabilityTailsync PeerCapability = "tailscale.com/cap/sync" + PeerCapabilityTailsync PeerCapability = "jasonodonnell.com/cap/tailsync" // PeerCapabilityTailsyncSharer indicates that a peer has the ability to // export sync roots to us. - PeerCapabilityTailsyncSharer PeerCapability = "tailscale.com/cap/sync-sharer" + PeerCapabilityTailsyncSharer PeerCapability = "jasonodonnell.com/cap/tailsync-sharer" // PeerCapabilityKubernetes grants a peer Kubernetes-specific // capabilities, such as the ability to impersonate specific Tailscale diff --git a/tailsync/service.go b/tailsync/service.go index f1981079b..089517e8b 100644 --- a/tailsync/service.go +++ b/tailsync/service.go @@ -9,6 +9,9 @@ import ( "strings" ) +// PeerURLFunc returns the PeerAPI base URL for a given peer ID (stable node ID or hostname). +type PeerURLFunc func(peerID string) string + var ( ErrSyncNotEnabled = errors.New("tailsync not enabled") ErrInvalidRootName = errors.New("root names may only contain the letters a-z, digits 0-9, underscore _, or hyphens -") @@ -41,6 +44,10 @@ type Service interface { // GetSessionStatus returns the status of a named session. GetSessionStatus(name string) (*SessionStatus, error) + // SetTransport configures the HTTP transport and peer URL resolver + // used for reaching remote peers via PeerAPI. + SetTransport(transport http.RoundTripper, peerURL PeerURLFunc) + // ServeHTTPWithPerms handles incoming PeerAPI sync requests. ServeHTTPWithPerms(permissions Permissions, w http.ResponseWriter, r *http.Request) diff --git a/tailsync/tailsyncimpl/service.go b/tailsync/tailsyncimpl/service.go index 83a78a7bc..f5ab30752 100644 --- a/tailsync/tailsyncimpl/service.go +++ b/tailsync/tailsyncimpl/service.go @@ -5,9 +5,12 @@ package tailsyncimpl import ( + "bytes" "encoding/json" "fmt" "io" + "mime" + "mime/multipart" "net/http" "os" "path/filepath" @@ -25,10 +28,12 @@ const ConflictDir = ".tailsync-conflicts" type Service struct { logf logger.Logf - mu sync.RWMutex - roots map[string]*tailsync.Root - sessions map[string]*sessionRunner - closed bool + mu sync.RWMutex + roots map[string]*tailsync.Root + sessions map[string]*sessionRunner + closed bool + transport http.RoundTripper + peerURL tailsync.PeerURLFunc } // NewService creates a new tailsync Service. @@ -43,6 +48,13 @@ func NewService(logf logger.Logf) *Service { } } +func (s *Service) SetTransport(transport http.RoundTripper, peerURL tailsync.PeerURLFunc) { + s.mu.Lock() + defer s.mu.Unlock() + s.transport = transport + s.peerURL = peerURL +} + func (s *Service) SetRoot(root *tailsync.Root) error { name, err := tailsync.NormalizeRootName(root.Name) if err != nil { @@ -131,7 +143,7 @@ func (s *Service) SetSession(session *tailsync.Session) error { existing.stop() } - sr := newSessionRunner(s.logf, session, root) + sr := newSessionRunner(s.logf, session, root, s.transport, s.peerURL) s.sessions[session.Name] = sr go sr.run() @@ -196,6 +208,8 @@ func (s *Service) ServeHTTPWithPerms(permissions tailsync.Permissions, w http.Re s.handleRemotePush(permissions, w, r) case "pull": s.handleRemotePull(permissions, w, r) + case "file": + s.handleRemoteFile(permissions, w, r) default: http.Error(w, "unknown action", http.StatusNotFound) } @@ -247,12 +261,6 @@ func (s *Service) handleRemoteIndex(permissions tailsync.Permissions, w http.Res w.Write(data) } -// pushRequest is the JSON payload for a push. -type pushRequest struct { - RootName string `json:"root"` - Entries []*tailsync.FileEntry `json:"entries"` -} - func (s *Service) handleRemotePush(permissions tailsync.Permissions, w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) @@ -273,14 +281,63 @@ func (s *Service) handleRemotePush(permissions tailsync.Permissions, w http.Resp return } - var req pushRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) + contentType := r.Header.Get("Content-Type") + mediaType, params, err := mime.ParseMediaType(contentType) + if err != nil || !strings.HasPrefix(mediaType, "multipart/") { + http.Error(w, "expected multipart content", http.StatusBadRequest) return } + mr := multipart.NewReader(r.Body, params["boundary"]) + + // First part: JSON metadata. + metaPart, err := mr.NextPart() + if err != nil { + http.Error(w, "read metadata part: "+err.Error(), http.StatusBadRequest) + return + } + var entries []*tailsync.FileEntry + if err := json.NewDecoder(metaPart).Decode(&entries); err != nil { + http.Error(w, "decode metadata: "+err.Error(), http.StatusBadRequest) + return + } + metaPart.Close() + + // Build a map of paths that need file data. + needsData := make(map[string]bool) + for _, entry := range entries { + if !entry.Deleted && !entry.IsSymlink { + needsData[entry.Path] = true + } + } + + // Read file data parts. + fileData := make(map[string][]byte) + for len(needsData) > 0 { + part, err := mr.NextPart() + if err == io.EOF { + break + } + if err != nil { + s.logf("tailsync: push: read file part: %v", err) + break + } + path := part.Header.Get("X-Tailsync-Path") + if path == "" { + path = part.FileName() + } + data, err := io.ReadAll(part) + part.Close() + if err != nil { + s.logf("tailsync: push: read file data for %s: %v", path, err) + continue + } + fileData[path] = data + delete(needsData, path) + } + applied := 0 - for _, entry := range req.Entries { + for _, entry := range entries { if entry.Deleted { absPath := filepath.Join(root.Path, entry.Path) if err := os.Remove(absPath); err != nil && !os.IsNotExist(err) { @@ -289,6 +346,24 @@ func (s *Service) handleRemotePush(permissions tailsync.Permissions, w http.Resp applied++ continue } + if entry.IsSymlink { + applied++ + continue + } + data, ok := fileData[entry.Path] + if !ok { + s.logf("tailsync: push: missing file data for %s", entry.Path) + continue + } + absPath := filepath.Join(root.Path, entry.Path) + mode := entry.Mode + if mode == 0 { + mode = 0o644 + } + if err := fileWriter(absPath, bytes.NewReader(data), mode); err != nil { + s.logf("tailsync: push: write %s: %v", entry.Path, err) + continue + } applied++ } @@ -327,6 +402,36 @@ func (s *Service) handleRemotePull(permissions tailsync.Permissions, w http.Resp json.NewEncoder(w).Encode(entries) } +func (s *Service) handleRemoteFile(permissions tailsync.Permissions, w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + rootName := r.URL.Query().Get("root") + if permissions.For(rootName) == tailsync.PermissionNone { + http.Error(w, "not found", http.StatusNotFound) + return + } + relPath := r.URL.Query().Get("path") + if relPath == "" { + http.Error(w, "missing path", http.StatusBadRequest) + return + } + s.mu.RLock() + root, ok := s.roots[rootName] + s.mu.RUnlock() + if !ok { + http.Error(w, "root not found", http.StatusNotFound) + return + } + absPath := filepath.Join(root.Path, filepath.FromSlash(relPath)) + if !strings.HasPrefix(absPath, root.Path) { + http.Error(w, "invalid path", http.StatusBadRequest) + return + } + http.ServeFile(w, r, absPath) +} + func (s *Service) findSessionForRoot(rootName string) *sessionRunner { for _, sr := range s.sessions { if sr.session.LocalRoot == rootName { diff --git a/tailsync/tailsyncimpl/session.go b/tailsync/tailsyncimpl/session.go index 7297aed6f..487ac1ed0 100644 --- a/tailsync/tailsyncimpl/session.go +++ b/tailsync/tailsyncimpl/session.go @@ -6,6 +6,7 @@ package tailsyncimpl import ( "context" "fmt" + "net/http" "os" "path/filepath" "sync" @@ -25,6 +26,10 @@ type sessionRunner struct { root *tailsync.Root idx *index.Index + transport http.RoundTripper + peerURL tailsync.PeerURLFunc + client *syncClient + mu sync.RWMutex state tailsync.SessionState conflicts []tailsync.ConflictInfo @@ -37,16 +42,18 @@ type sessionRunner struct { done chan struct{} } -func newSessionRunner(logf logger.Logf, session *tailsync.Session, root *tailsync.Root) *sessionRunner { +func newSessionRunner(logf logger.Logf, session *tailsync.Session, root *tailsync.Root, transport http.RoundTripper, peerURL tailsync.PeerURLFunc) *sessionRunner { _, cancel := context.WithCancel(context.Background()) return &sessionRunner{ - logf: logf, - session: session, - root: root, - idx: index.New(logf), - state: tailsync.SessionStateIdle, - cancel: cancel, - done: make(chan struct{}), + logf: logf, + session: session, + root: root, + idx: index.New(logf), + transport: transport, + peerURL: peerURL, + state: tailsync.SessionStateIdle, + cancel: cancel, + done: make(chan struct{}), } } @@ -88,6 +95,14 @@ func (sr *sessionRunner) run() { sr.setState(tailsync.SessionStateIdle) sr.logf("tailsync: session %s: initial index built with %d files", sr.session.Name, sr.idx.Len()) + // Set up sync client if transport is available. + var lastPushedSeq uint64 + if sr.transport != nil && sr.peerURL != nil { + sr.client = newSyncClient(sr.logf, sr.transport, sr.peerURL, sr.session.PeerID, sr.session.RemoteRoot) + sr.initialReconcile() + lastPushedSeq = sr.idx.LocalSeq() + } + // Process events from watcher. ctx, cancel := context.WithCancel(context.Background()) sr.mu.Lock() @@ -97,6 +112,9 @@ func (sr *sessionRunner) run() { tombstoneTicker := time.NewTicker(1 * time.Hour) defer tombstoneTicker.Stop() + pullTicker := time.NewTicker(5 * time.Second) + defer pullTicker.Stop() + for { select { case <-ctx.Done(): @@ -106,6 +124,13 @@ func (sr *sessionRunner) run() { return } sr.handleEvents(events) + if sr.client != nil { + lastPushedSeq = sr.pushChanges(lastPushedSeq) + } + case <-pullTicker.C: + if sr.client != nil { + sr.pullRemoteChanges() + } case <-tombstoneTicker.C: if n := sr.idx.PurgeTombstones(); n > 0 { sr.logf("tailsync: session %s: purged %d tombstones", sr.session.Name, n) @@ -243,6 +268,157 @@ func (sr *sessionRunner) fullRescan() { } } +func (sr *sessionRunner) pushChanges(lastPushedSeq uint64) uint64 { + entries := sr.idx.ChangedSince(lastPushedSeq) + if len(entries) == 0 { + return lastPushedSeq + } + + if sr.session.Mode == tailsync.ModePull { + return sr.idx.LocalSeq() + } + + applied, err := sr.client.pushFiles(entries, sr.root.Path) + if err != nil { + sr.logf("tailsync: session %s: push error: %v", sr.session.Name, err) + return lastPushedSeq + } + + sr.mu.Lock() + sr.bytesSent += countBytes(entries) + sr.mu.Unlock() + + sr.logf("tailsync: session %s: pushed %d files", sr.session.Name, applied) + return sr.idx.LocalSeq() +} + +func (sr *sessionRunner) pullRemoteChanges() { + if sr.session.Mode == tailsync.ModePush { + return + } + + remoteSeq := sr.idx.RemoteSeq() + entries, err := sr.client.pullChanges(remoteSeq) + if err != nil { + sr.logf("tailsync: session %s: pull error: %v", sr.session.Name, err) + return + } + + for _, entry := range entries { + if entry.Deleted { + absPath := filepath.Join(sr.root.Path, entry.Path) + os.Remove(absPath) + sr.idx.ApplyRemote(entry) + continue + } + + body, _, err := sr.client.pullFile(entry.Path) + if err != nil { + sr.logf("tailsync: session %s: pull file %s error: %v", sr.session.Name, entry.Path, err) + continue + } + + absPath := filepath.Join(sr.root.Path, entry.Path) + mode := entry.Mode + if mode == 0 { + mode = 0o644 + } + err = fileWriter(absPath, body, mode) + body.Close() + if err != nil { + sr.logf("tailsync: session %s: write %s error: %v", sr.session.Name, entry.Path, err) + continue + } + + sr.idx.ApplyRemote(entry) + + sr.mu.Lock() + sr.bytesRecv += entry.Size + sr.mu.Unlock() + + if entry.Sequence > remoteSeq { + remoteSeq = entry.Sequence + } + } + + sr.idx.SetRemoteSeq(remoteSeq) + + if len(entries) > 0 { + sr.logf("tailsync: session %s: pulled %d files", sr.session.Name, len(entries)) + } +} + +func (sr *sessionRunner) initialReconcile() { + remoteEntries, remoteSeq, err := sr.client.getRemoteIndex() + if err != nil { + sr.logf("tailsync: session %s: initial reconcile: remote index error: %v (will use local-only)", sr.session.Name, err) + return + } + + sr.idx.SetRemoteSeq(remoteSeq) + + // Pull files we're missing from remote. + if sr.session.Mode != tailsync.ModePush { + for path, remoteEntry := range remoteEntries { + if remoteEntry.Deleted { + continue + } + localEntry := sr.idx.Get(path) + if localEntry == nil || localEntry.Hash != remoteEntry.Hash { + body, _, err := sr.client.pullFile(path) + if err != nil { + sr.logf("tailsync: session %s: initial pull %s: %v", sr.session.Name, path, err) + continue + } + absPath := filepath.Join(sr.root.Path, path) + mode := remoteEntry.Mode + if mode == 0 { + mode = 0o644 + } + err = fileWriter(absPath, body, mode) + body.Close() + if err != nil { + sr.logf("tailsync: session %s: initial write %s: %v", sr.session.Name, path, err) + continue + } + sr.idx.ApplyRemote(remoteEntry) + } + } + } + + // Push files remote is missing. + if sr.session.Mode != tailsync.ModePull { + var toPush []*tailsync.FileEntry + for path, localEntry := range sr.idx.Entries() { + if localEntry.Deleted { + continue + } + remoteEntry, exists := remoteEntries[path] + if !exists || remoteEntry.Hash != localEntry.Hash { + toPush = append(toPush, localEntry) + } + } + if len(toPush) > 0 { + applied, err := sr.client.pushFiles(toPush, sr.root.Path) + if err != nil { + sr.logf("tailsync: session %s: initial push error: %v", sr.session.Name, err) + } else { + sr.logf("tailsync: session %s: initial push: %d files", sr.session.Name, applied) + } + } + } +} + +func countBytes(entries []*tailsync.FileEntry) int64 { + var total int64 + for _, e := range entries { + if !e.Deleted { + total += e.Size + } + } + return total +} + func (sr *sessionRunner) status() *tailsync.SessionStatus { sr.mu.RLock() defer sr.mu.RUnlock() diff --git a/tailsync/tailsyncimpl/syncclient.go b/tailsync/tailsyncimpl/syncclient.go new file mode 100644 index 000000000..6b8c9d08a --- /dev/null +++ b/tailsync/tailsyncimpl/syncclient.go @@ -0,0 +1,186 @@ +// Copyright (c) Tailscale Inc & contributors +// SPDX-License-Identifier: BSD-3-Clause + +package tailsyncimpl + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "mime/multipart" + "net/http" + "net/textproto" + "os" + "path/filepath" + "strconv" + + "tailscale.com/tailsync" + "tailscale.com/types/logger" +) + +// syncClient handles network communication with a remote peer's tailsync PeerAPI. +type syncClient struct { + logf logger.Logf + transport http.RoundTripper + peerURL tailsync.PeerURLFunc + peerID string + rootName string +} + +func newSyncClient(logf logger.Logf, transport http.RoundTripper, peerURL tailsync.PeerURLFunc, peerID, rootName string) *syncClient { + return &syncClient{ + logf: logf, + transport: transport, + peerURL: peerURL, + peerID: peerID, + rootName: rootName, + } +} + +func (c *syncClient) baseURL() string { + return c.peerURL(c.peerID) + "/v0/sync" +} + +func (c *syncClient) httpClient() *http.Client { + return &http.Client{Transport: c.transport} +} + +// pushFiles sends changed files to the remote peer. +// entries are the metadata, rootPath is the local root to read files from. +func (c *syncClient) pushFiles(entries []*tailsync.FileEntry, rootPath string) (int, error) { + if len(entries) == 0 { + return 0, nil + } + + // Build multipart body: first part is JSON metadata, subsequent parts are file data. + var buf bytes.Buffer + mw := multipart.NewWriter(&buf) + + // Write metadata part. + metaHeader := make(textproto.MIMEHeader) + metaHeader.Set("Content-Type", "application/json") + metaHeader.Set("Content-Disposition", `form-data; name="metadata"`) + metaPart, err := mw.CreatePart(metaHeader) + if err != nil { + return 0, fmt.Errorf("create metadata part: %w", err) + } + if err := json.NewEncoder(metaPart).Encode(entries); err != nil { + return 0, fmt.Errorf("encode metadata: %w", err) + } + + // Write file data parts for non-deleted entries. + for _, entry := range entries { + if entry.Deleted || entry.IsSymlink { + continue + } + fileHeader := make(textproto.MIMEHeader) + fileHeader.Set("Content-Type", "application/octet-stream") + fileHeader.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename=%q`, entry.Path)) + fileHeader.Set("X-Tailsync-Path", entry.Path) + fileHeader.Set("X-Tailsync-Size", strconv.FormatInt(entry.Size, 10)) + fileHeader.Set("X-Tailsync-Mode", strconv.FormatUint(uint64(entry.Mode), 8)) + + filePart, err := mw.CreatePart(fileHeader) + if err != nil { + return 0, fmt.Errorf("create file part for %s: %w", entry.Path, err) + } + + absPath := filepath.Join(rootPath, entry.Path) + f, err := os.Open(absPath) + if err != nil { + c.logf("tailsync: push: skip %s: %v", entry.Path, err) + continue + } + _, err = io.Copy(filePart, f) + f.Close() + if err != nil { + return 0, fmt.Errorf("copy file %s: %w", entry.Path, err) + } + } + + mw.Close() + + url := c.baseURL() + "/push?root=" + c.rootName + req, err := http.NewRequest("POST", url, &buf) + if err != nil { + return 0, fmt.Errorf("create request: %w", err) + } + req.Header.Set("Content-Type", mw.FormDataContentType()) + + resp, err := c.httpClient().Do(req) + if err != nil { + return 0, fmt.Errorf("push request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return 0, fmt.Errorf("push failed: %s: %s", resp.Status, string(body)) + } + + var result struct { + Applied int `json:"applied"` + } + json.NewDecoder(resp.Body).Decode(&result) + return result.Applied, nil +} + +// pullChanges fetches changed entries from the remote since the given sequence. +func (c *syncClient) pullChanges(sinceSeq uint64) ([]*tailsync.FileEntry, error) { + url := fmt.Sprintf("%s/pull?root=%s&since=%d", c.baseURL(), c.rootName, sinceSeq) + resp, err := c.httpClient().Get(url) + if err != nil { + return nil, fmt.Errorf("pull request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("pull failed: %s: %s", resp.Status, string(body)) + } + + var entries []*tailsync.FileEntry + if err := json.NewDecoder(resp.Body).Decode(&entries); err != nil { + return nil, fmt.Errorf("decode pull response: %w", err) + } + return entries, nil +} + +// pullFile downloads a single file from the remote peer. +func (c *syncClient) pullFile(relPath string) (io.ReadCloser, int64, error) { + url := fmt.Sprintf("%s/file?root=%s&path=%s", c.baseURL(), c.rootName, relPath) + resp, err := c.httpClient().Get(url) + if err != nil { + return nil, 0, fmt.Errorf("pull file request: %w", err) + } + if resp.StatusCode != http.StatusOK { + resp.Body.Close() + return nil, 0, fmt.Errorf("pull file %s: %s", relPath, resp.Status) + } + return resp.Body, resp.ContentLength, nil +} + +// getRemoteIndex fetches the full index from the remote peer. +func (c *syncClient) getRemoteIndex() (map[string]*tailsync.FileEntry, uint64, error) { + url := c.baseURL() + "/index?root=" + c.rootName + resp, err := c.httpClient().Get(url) + if err != nil { + return nil, 0, fmt.Errorf("index request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, 0, fmt.Errorf("index failed: %s: %s", resp.Status, string(body)) + } + + var snap struct { + Entries map[string]*tailsync.FileEntry `json:"entries"` + LocalSeq uint64 `json:"localSeq"` + } + if err := json.NewDecoder(resp.Body).Decode(&snap); err != nil { + return nil, 0, fmt.Errorf("decode index: %w", err) + } + return snap.Entries, snap.LocalSeq, nil +}