This commit is contained in:
Jason O'Donnell 2026-03-07 09:32:06 -05:00
parent 6e71a15e59
commit 9cce0baa43
6 changed files with 523 additions and 25 deletions

View File

@ -10,6 +10,28 @@ import (
"tailscale.com/tailsync"
)
// SyncSetTransport configures the sync service with the PeerAPI transport.
func (b *LocalBackend) SyncSetTransport() {
svc, ok := b.sys.FileSync.GetOK()
if !ok {
return
}
transport := b.Dialer().PeerAPITransport()
peerURL := func(peerID string) string {
cn := b.currentNode()
for _, p := range cn.Peers() {
if string(p.StableID()) == peerID || p.DisplayName(false) == peerID || p.Name() == peerID {
base := cn.PeerAPIBase(p)
if base != "" {
return base
}
}
}
return ""
}
svc.SetTransport(transport, peerURL)
}
// SyncSharingEnabled reports whether sharing sync roots via Tailsync is
// enabled. This is currently based on checking for the sync:share node
// attribute.
@ -50,6 +72,8 @@ func (b *LocalBackend) SyncSetSession(session *tailsync.Session) error {
if !ok {
return tailsync.ErrSyncNotEnabled
}
// Ensure transport is configured before starting session.
b.SyncSetTransport()
return svc.SetSession(session)
}

View File

@ -1555,10 +1555,10 @@ const (
// PeerCapabilityTailsync grants the ability for a peer to sync files
// with this node's exported sync roots.
PeerCapabilityTailsync PeerCapability = "tailscale.com/cap/sync"
PeerCapabilityTailsync PeerCapability = "jasonodonnell.com/cap/tailsync"
// PeerCapabilityTailsyncSharer indicates that a peer has the ability to
// export sync roots to us.
PeerCapabilityTailsyncSharer PeerCapability = "tailscale.com/cap/sync-sharer"
PeerCapabilityTailsyncSharer PeerCapability = "jasonodonnell.com/cap/tailsync-sharer"
// PeerCapabilityKubernetes grants a peer Kubernetes-specific
// capabilities, such as the ability to impersonate specific Tailscale

View File

@ -9,6 +9,9 @@ import (
"strings"
)
// PeerURLFunc returns the PeerAPI base URL for a given peer ID (stable node ID or hostname).
type PeerURLFunc func(peerID string) string
var (
ErrSyncNotEnabled = errors.New("tailsync not enabled")
ErrInvalidRootName = errors.New("root names may only contain the letters a-z, digits 0-9, underscore _, or hyphens -")
@ -41,6 +44,10 @@ type Service interface {
// GetSessionStatus returns the status of a named session.
GetSessionStatus(name string) (*SessionStatus, error)
// SetTransport configures the HTTP transport and peer URL resolver
// used for reaching remote peers via PeerAPI.
SetTransport(transport http.RoundTripper, peerURL PeerURLFunc)
// ServeHTTPWithPerms handles incoming PeerAPI sync requests.
ServeHTTPWithPerms(permissions Permissions, w http.ResponseWriter, r *http.Request)

View File

@ -5,9 +5,12 @@
package tailsyncimpl
import (
"bytes"
"encoding/json"
"fmt"
"io"
"mime"
"mime/multipart"
"net/http"
"os"
"path/filepath"
@ -25,10 +28,12 @@ const ConflictDir = ".tailsync-conflicts"
type Service struct {
logf logger.Logf
mu sync.RWMutex
roots map[string]*tailsync.Root
sessions map[string]*sessionRunner
closed bool
mu sync.RWMutex
roots map[string]*tailsync.Root
sessions map[string]*sessionRunner
closed bool
transport http.RoundTripper
peerURL tailsync.PeerURLFunc
}
// NewService creates a new tailsync Service.
@ -43,6 +48,13 @@ func NewService(logf logger.Logf) *Service {
}
}
func (s *Service) SetTransport(transport http.RoundTripper, peerURL tailsync.PeerURLFunc) {
s.mu.Lock()
defer s.mu.Unlock()
s.transport = transport
s.peerURL = peerURL
}
func (s *Service) SetRoot(root *tailsync.Root) error {
name, err := tailsync.NormalizeRootName(root.Name)
if err != nil {
@ -131,7 +143,7 @@ func (s *Service) SetSession(session *tailsync.Session) error {
existing.stop()
}
sr := newSessionRunner(s.logf, session, root)
sr := newSessionRunner(s.logf, session, root, s.transport, s.peerURL)
s.sessions[session.Name] = sr
go sr.run()
@ -196,6 +208,8 @@ func (s *Service) ServeHTTPWithPerms(permissions tailsync.Permissions, w http.Re
s.handleRemotePush(permissions, w, r)
case "pull":
s.handleRemotePull(permissions, w, r)
case "file":
s.handleRemoteFile(permissions, w, r)
default:
http.Error(w, "unknown action", http.StatusNotFound)
}
@ -247,12 +261,6 @@ func (s *Service) handleRemoteIndex(permissions tailsync.Permissions, w http.Res
w.Write(data)
}
// pushRequest is the JSON payload for a push.
type pushRequest struct {
RootName string `json:"root"`
Entries []*tailsync.FileEntry `json:"entries"`
}
func (s *Service) handleRemotePush(permissions tailsync.Permissions, w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
@ -273,14 +281,63 @@ func (s *Service) handleRemotePush(permissions tailsync.Permissions, w http.Resp
return
}
var req pushRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
contentType := r.Header.Get("Content-Type")
mediaType, params, err := mime.ParseMediaType(contentType)
if err != nil || !strings.HasPrefix(mediaType, "multipart/") {
http.Error(w, "expected multipart content", http.StatusBadRequest)
return
}
mr := multipart.NewReader(r.Body, params["boundary"])
// First part: JSON metadata.
metaPart, err := mr.NextPart()
if err != nil {
http.Error(w, "read metadata part: "+err.Error(), http.StatusBadRequest)
return
}
var entries []*tailsync.FileEntry
if err := json.NewDecoder(metaPart).Decode(&entries); err != nil {
http.Error(w, "decode metadata: "+err.Error(), http.StatusBadRequest)
return
}
metaPart.Close()
// Build a map of paths that need file data.
needsData := make(map[string]bool)
for _, entry := range entries {
if !entry.Deleted && !entry.IsSymlink {
needsData[entry.Path] = true
}
}
// Read file data parts.
fileData := make(map[string][]byte)
for len(needsData) > 0 {
part, err := mr.NextPart()
if err == io.EOF {
break
}
if err != nil {
s.logf("tailsync: push: read file part: %v", err)
break
}
path := part.Header.Get("X-Tailsync-Path")
if path == "" {
path = part.FileName()
}
data, err := io.ReadAll(part)
part.Close()
if err != nil {
s.logf("tailsync: push: read file data for %s: %v", path, err)
continue
}
fileData[path] = data
delete(needsData, path)
}
applied := 0
for _, entry := range req.Entries {
for _, entry := range entries {
if entry.Deleted {
absPath := filepath.Join(root.Path, entry.Path)
if err := os.Remove(absPath); err != nil && !os.IsNotExist(err) {
@ -289,6 +346,24 @@ func (s *Service) handleRemotePush(permissions tailsync.Permissions, w http.Resp
applied++
continue
}
if entry.IsSymlink {
applied++
continue
}
data, ok := fileData[entry.Path]
if !ok {
s.logf("tailsync: push: missing file data for %s", entry.Path)
continue
}
absPath := filepath.Join(root.Path, entry.Path)
mode := entry.Mode
if mode == 0 {
mode = 0o644
}
if err := fileWriter(absPath, bytes.NewReader(data), mode); err != nil {
s.logf("tailsync: push: write %s: %v", entry.Path, err)
continue
}
applied++
}
@ -327,6 +402,36 @@ func (s *Service) handleRemotePull(permissions tailsync.Permissions, w http.Resp
json.NewEncoder(w).Encode(entries)
}
func (s *Service) handleRemoteFile(permissions tailsync.Permissions, w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
rootName := r.URL.Query().Get("root")
if permissions.For(rootName) == tailsync.PermissionNone {
http.Error(w, "not found", http.StatusNotFound)
return
}
relPath := r.URL.Query().Get("path")
if relPath == "" {
http.Error(w, "missing path", http.StatusBadRequest)
return
}
s.mu.RLock()
root, ok := s.roots[rootName]
s.mu.RUnlock()
if !ok {
http.Error(w, "root not found", http.StatusNotFound)
return
}
absPath := filepath.Join(root.Path, filepath.FromSlash(relPath))
if !strings.HasPrefix(absPath, root.Path) {
http.Error(w, "invalid path", http.StatusBadRequest)
return
}
http.ServeFile(w, r, absPath)
}
func (s *Service) findSessionForRoot(rootName string) *sessionRunner {
for _, sr := range s.sessions {
if sr.session.LocalRoot == rootName {

View File

@ -6,6 +6,7 @@ package tailsyncimpl
import (
"context"
"fmt"
"net/http"
"os"
"path/filepath"
"sync"
@ -25,6 +26,10 @@ type sessionRunner struct {
root *tailsync.Root
idx *index.Index
transport http.RoundTripper
peerURL tailsync.PeerURLFunc
client *syncClient
mu sync.RWMutex
state tailsync.SessionState
conflicts []tailsync.ConflictInfo
@ -37,16 +42,18 @@ type sessionRunner struct {
done chan struct{}
}
func newSessionRunner(logf logger.Logf, session *tailsync.Session, root *tailsync.Root) *sessionRunner {
func newSessionRunner(logf logger.Logf, session *tailsync.Session, root *tailsync.Root, transport http.RoundTripper, peerURL tailsync.PeerURLFunc) *sessionRunner {
_, cancel := context.WithCancel(context.Background())
return &sessionRunner{
logf: logf,
session: session,
root: root,
idx: index.New(logf),
state: tailsync.SessionStateIdle,
cancel: cancel,
done: make(chan struct{}),
logf: logf,
session: session,
root: root,
idx: index.New(logf),
transport: transport,
peerURL: peerURL,
state: tailsync.SessionStateIdle,
cancel: cancel,
done: make(chan struct{}),
}
}
@ -88,6 +95,14 @@ func (sr *sessionRunner) run() {
sr.setState(tailsync.SessionStateIdle)
sr.logf("tailsync: session %s: initial index built with %d files", sr.session.Name, sr.idx.Len())
// Set up sync client if transport is available.
var lastPushedSeq uint64
if sr.transport != nil && sr.peerURL != nil {
sr.client = newSyncClient(sr.logf, sr.transport, sr.peerURL, sr.session.PeerID, sr.session.RemoteRoot)
sr.initialReconcile()
lastPushedSeq = sr.idx.LocalSeq()
}
// Process events from watcher.
ctx, cancel := context.WithCancel(context.Background())
sr.mu.Lock()
@ -97,6 +112,9 @@ func (sr *sessionRunner) run() {
tombstoneTicker := time.NewTicker(1 * time.Hour)
defer tombstoneTicker.Stop()
pullTicker := time.NewTicker(5 * time.Second)
defer pullTicker.Stop()
for {
select {
case <-ctx.Done():
@ -106,6 +124,13 @@ func (sr *sessionRunner) run() {
return
}
sr.handleEvents(events)
if sr.client != nil {
lastPushedSeq = sr.pushChanges(lastPushedSeq)
}
case <-pullTicker.C:
if sr.client != nil {
sr.pullRemoteChanges()
}
case <-tombstoneTicker.C:
if n := sr.idx.PurgeTombstones(); n > 0 {
sr.logf("tailsync: session %s: purged %d tombstones", sr.session.Name, n)
@ -243,6 +268,157 @@ func (sr *sessionRunner) fullRescan() {
}
}
func (sr *sessionRunner) pushChanges(lastPushedSeq uint64) uint64 {
entries := sr.idx.ChangedSince(lastPushedSeq)
if len(entries) == 0 {
return lastPushedSeq
}
if sr.session.Mode == tailsync.ModePull {
return sr.idx.LocalSeq()
}
applied, err := sr.client.pushFiles(entries, sr.root.Path)
if err != nil {
sr.logf("tailsync: session %s: push error: %v", sr.session.Name, err)
return lastPushedSeq
}
sr.mu.Lock()
sr.bytesSent += countBytes(entries)
sr.mu.Unlock()
sr.logf("tailsync: session %s: pushed %d files", sr.session.Name, applied)
return sr.idx.LocalSeq()
}
func (sr *sessionRunner) pullRemoteChanges() {
if sr.session.Mode == tailsync.ModePush {
return
}
remoteSeq := sr.idx.RemoteSeq()
entries, err := sr.client.pullChanges(remoteSeq)
if err != nil {
sr.logf("tailsync: session %s: pull error: %v", sr.session.Name, err)
return
}
for _, entry := range entries {
if entry.Deleted {
absPath := filepath.Join(sr.root.Path, entry.Path)
os.Remove(absPath)
sr.idx.ApplyRemote(entry)
continue
}
body, _, err := sr.client.pullFile(entry.Path)
if err != nil {
sr.logf("tailsync: session %s: pull file %s error: %v", sr.session.Name, entry.Path, err)
continue
}
absPath := filepath.Join(sr.root.Path, entry.Path)
mode := entry.Mode
if mode == 0 {
mode = 0o644
}
err = fileWriter(absPath, body, mode)
body.Close()
if err != nil {
sr.logf("tailsync: session %s: write %s error: %v", sr.session.Name, entry.Path, err)
continue
}
sr.idx.ApplyRemote(entry)
sr.mu.Lock()
sr.bytesRecv += entry.Size
sr.mu.Unlock()
if entry.Sequence > remoteSeq {
remoteSeq = entry.Sequence
}
}
sr.idx.SetRemoteSeq(remoteSeq)
if len(entries) > 0 {
sr.logf("tailsync: session %s: pulled %d files", sr.session.Name, len(entries))
}
}
func (sr *sessionRunner) initialReconcile() {
remoteEntries, remoteSeq, err := sr.client.getRemoteIndex()
if err != nil {
sr.logf("tailsync: session %s: initial reconcile: remote index error: %v (will use local-only)", sr.session.Name, err)
return
}
sr.idx.SetRemoteSeq(remoteSeq)
// Pull files we're missing from remote.
if sr.session.Mode != tailsync.ModePush {
for path, remoteEntry := range remoteEntries {
if remoteEntry.Deleted {
continue
}
localEntry := sr.idx.Get(path)
if localEntry == nil || localEntry.Hash != remoteEntry.Hash {
body, _, err := sr.client.pullFile(path)
if err != nil {
sr.logf("tailsync: session %s: initial pull %s: %v", sr.session.Name, path, err)
continue
}
absPath := filepath.Join(sr.root.Path, path)
mode := remoteEntry.Mode
if mode == 0 {
mode = 0o644
}
err = fileWriter(absPath, body, mode)
body.Close()
if err != nil {
sr.logf("tailsync: session %s: initial write %s: %v", sr.session.Name, path, err)
continue
}
sr.idx.ApplyRemote(remoteEntry)
}
}
}
// Push files remote is missing.
if sr.session.Mode != tailsync.ModePull {
var toPush []*tailsync.FileEntry
for path, localEntry := range sr.idx.Entries() {
if localEntry.Deleted {
continue
}
remoteEntry, exists := remoteEntries[path]
if !exists || remoteEntry.Hash != localEntry.Hash {
toPush = append(toPush, localEntry)
}
}
if len(toPush) > 0 {
applied, err := sr.client.pushFiles(toPush, sr.root.Path)
if err != nil {
sr.logf("tailsync: session %s: initial push error: %v", sr.session.Name, err)
} else {
sr.logf("tailsync: session %s: initial push: %d files", sr.session.Name, applied)
}
}
}
}
func countBytes(entries []*tailsync.FileEntry) int64 {
var total int64
for _, e := range entries {
if !e.Deleted {
total += e.Size
}
}
return total
}
func (sr *sessionRunner) status() *tailsync.SessionStatus {
sr.mu.RLock()
defer sr.mu.RUnlock()

View File

@ -0,0 +1,186 @@
// Copyright (c) Tailscale Inc & contributors
// SPDX-License-Identifier: BSD-3-Clause
package tailsyncimpl
import (
"bytes"
"encoding/json"
"fmt"
"io"
"mime/multipart"
"net/http"
"net/textproto"
"os"
"path/filepath"
"strconv"
"tailscale.com/tailsync"
"tailscale.com/types/logger"
)
// syncClient handles network communication with a remote peer's tailsync PeerAPI.
type syncClient struct {
logf logger.Logf
transport http.RoundTripper
peerURL tailsync.PeerURLFunc
peerID string
rootName string
}
func newSyncClient(logf logger.Logf, transport http.RoundTripper, peerURL tailsync.PeerURLFunc, peerID, rootName string) *syncClient {
return &syncClient{
logf: logf,
transport: transport,
peerURL: peerURL,
peerID: peerID,
rootName: rootName,
}
}
func (c *syncClient) baseURL() string {
return c.peerURL(c.peerID) + "/v0/sync"
}
func (c *syncClient) httpClient() *http.Client {
return &http.Client{Transport: c.transport}
}
// pushFiles sends changed files to the remote peer.
// entries are the metadata, rootPath is the local root to read files from.
func (c *syncClient) pushFiles(entries []*tailsync.FileEntry, rootPath string) (int, error) {
if len(entries) == 0 {
return 0, nil
}
// Build multipart body: first part is JSON metadata, subsequent parts are file data.
var buf bytes.Buffer
mw := multipart.NewWriter(&buf)
// Write metadata part.
metaHeader := make(textproto.MIMEHeader)
metaHeader.Set("Content-Type", "application/json")
metaHeader.Set("Content-Disposition", `form-data; name="metadata"`)
metaPart, err := mw.CreatePart(metaHeader)
if err != nil {
return 0, fmt.Errorf("create metadata part: %w", err)
}
if err := json.NewEncoder(metaPart).Encode(entries); err != nil {
return 0, fmt.Errorf("encode metadata: %w", err)
}
// Write file data parts for non-deleted entries.
for _, entry := range entries {
if entry.Deleted || entry.IsSymlink {
continue
}
fileHeader := make(textproto.MIMEHeader)
fileHeader.Set("Content-Type", "application/octet-stream")
fileHeader.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename=%q`, entry.Path))
fileHeader.Set("X-Tailsync-Path", entry.Path)
fileHeader.Set("X-Tailsync-Size", strconv.FormatInt(entry.Size, 10))
fileHeader.Set("X-Tailsync-Mode", strconv.FormatUint(uint64(entry.Mode), 8))
filePart, err := mw.CreatePart(fileHeader)
if err != nil {
return 0, fmt.Errorf("create file part for %s: %w", entry.Path, err)
}
absPath := filepath.Join(rootPath, entry.Path)
f, err := os.Open(absPath)
if err != nil {
c.logf("tailsync: push: skip %s: %v", entry.Path, err)
continue
}
_, err = io.Copy(filePart, f)
f.Close()
if err != nil {
return 0, fmt.Errorf("copy file %s: %w", entry.Path, err)
}
}
mw.Close()
url := c.baseURL() + "/push?root=" + c.rootName
req, err := http.NewRequest("POST", url, &buf)
if err != nil {
return 0, fmt.Errorf("create request: %w", err)
}
req.Header.Set("Content-Type", mw.FormDataContentType())
resp, err := c.httpClient().Do(req)
if err != nil {
return 0, fmt.Errorf("push request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return 0, fmt.Errorf("push failed: %s: %s", resp.Status, string(body))
}
var result struct {
Applied int `json:"applied"`
}
json.NewDecoder(resp.Body).Decode(&result)
return result.Applied, nil
}
// pullChanges fetches changed entries from the remote since the given sequence.
func (c *syncClient) pullChanges(sinceSeq uint64) ([]*tailsync.FileEntry, error) {
url := fmt.Sprintf("%s/pull?root=%s&since=%d", c.baseURL(), c.rootName, sinceSeq)
resp, err := c.httpClient().Get(url)
if err != nil {
return nil, fmt.Errorf("pull request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("pull failed: %s: %s", resp.Status, string(body))
}
var entries []*tailsync.FileEntry
if err := json.NewDecoder(resp.Body).Decode(&entries); err != nil {
return nil, fmt.Errorf("decode pull response: %w", err)
}
return entries, nil
}
// pullFile downloads a single file from the remote peer.
func (c *syncClient) pullFile(relPath string) (io.ReadCloser, int64, error) {
url := fmt.Sprintf("%s/file?root=%s&path=%s", c.baseURL(), c.rootName, relPath)
resp, err := c.httpClient().Get(url)
if err != nil {
return nil, 0, fmt.Errorf("pull file request: %w", err)
}
if resp.StatusCode != http.StatusOK {
resp.Body.Close()
return nil, 0, fmt.Errorf("pull file %s: %s", relPath, resp.Status)
}
return resp.Body, resp.ContentLength, nil
}
// getRemoteIndex fetches the full index from the remote peer.
func (c *syncClient) getRemoteIndex() (map[string]*tailsync.FileEntry, uint64, error) {
url := c.baseURL() + "/index?root=" + c.rootName
resp, err := c.httpClient().Get(url)
if err != nil {
return nil, 0, fmt.Errorf("index request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, 0, fmt.Errorf("index failed: %s: %s", resp.Status, string(body))
}
var snap struct {
Entries map[string]*tailsync.FileEntry `json:"entries"`
LocalSeq uint64 `json:"localSeq"`
}
if err := json.NewDecoder(resp.Body).Decode(&snap); err != nil {
return nil, 0, fmt.Errorf("decode index: %w", err)
}
return snap.Entries, snap.LocalSeq, nil
}