mirror of
https://github.com/tailscale/tailscale.git
synced 2026-05-07 21:26:41 +02:00
ipn, ipn/ipnlocal: add Foreground field for ServeConfig
This PR adds a new field to the serve config that can be used to identify which serves are in "foreground mode" and then can also be used to ensure they do not get persisted to disk so that if Tailscaled gets ungracefully shutdown, the reloaded ServeConfig will not have those ports opened. Updates #8489 Signed-off-by: Marwan Sulaiman <marwan@tailscale.com>
This commit is contained in:
parent
45eeef244e
commit
d77d3e5a45
@ -1094,29 +1094,6 @@ func (lc *LocalClient) NetworkLockDisable(ctx context.Context, secret []byte) er
|
||||
return nil
|
||||
}
|
||||
|
||||
// StreamServe returns an io.ReadCloser that streams serve/Funnel
|
||||
// connections made to the provided HostPort.
|
||||
//
|
||||
// If Serve and Funnel were not already enabled for the HostPort in the ServeConfig,
|
||||
// the backend enables it for the duration of the context's lifespan and
|
||||
// then turns it back off once the context is closed. If either are already enabled,
|
||||
// then they remain that way but logs are still streamed
|
||||
func (lc *LocalClient) StreamServe(ctx context.Context, hp ipn.ServeStreamRequest) (io.ReadCloser, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", "http://"+apitype.LocalAPIHost+"/localapi/v0/stream-serve", jsonBody(hp))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res, err := lc.doLocalRequestNiceError(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if res.StatusCode != 200 {
|
||||
res.Body.Close()
|
||||
return nil, errors.New(res.Status)
|
||||
}
|
||||
return res.Body, nil
|
||||
}
|
||||
|
||||
// GetServeConfig return the current serve config.
|
||||
//
|
||||
// If the serve config is empty, it returns (nil, nil).
|
||||
|
||||
@ -149,7 +149,6 @@ type localServeClient interface {
|
||||
QueryFeature(ctx context.Context, feature string) (*tailcfg.QueryFeatureResponse, error)
|
||||
WatchIPNBus(ctx context.Context, mask ipn.NotifyWatchOpt) (*tailscale.IPNBusWatcher, error)
|
||||
IncrementCounter(ctx context.Context, name string, delta int) error
|
||||
StreamServe(ctx context.Context, req ipn.ServeStreamRequest) (io.ReadCloser, error) // TODO: testing :)
|
||||
}
|
||||
|
||||
// serveEnv is the environment the serve command runs within. All I/O should be
|
||||
|
||||
@ -5,9 +5,10 @@ package cli
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
@ -30,14 +31,14 @@ var infoMap = map[string]commandInfo{
|
||||
ShortHelp: "Serve content and local servers on your tailnet",
|
||||
LongHelp: strings.Join([]string{
|
||||
"Serve lets you share a local server securely within your tailnet.",
|
||||
"To share a local server on the internet, use \"tailscale funnel\"",
|
||||
`To share a local server on the internet, use "tailscale funnel"`,
|
||||
}, "\n"),
|
||||
},
|
||||
"funnel": {
|
||||
ShortHelp: "Serve content and local servers on the internet",
|
||||
LongHelp: strings.Join([]string{
|
||||
"Funnel lets you share a local server on the internet using Tailscale.",
|
||||
"To share only within your tailnet, use \"tailscale serve\"",
|
||||
`To share only within your tailnet, use "tailscale serve"`,
|
||||
}, "\n"),
|
||||
},
|
||||
}
|
||||
@ -134,14 +135,77 @@ func (e *serveEnv) runServeDev(funnel bool) execFunc {
|
||||
}
|
||||
|
||||
func (e *serveEnv) streamServe(ctx context.Context, req ipn.ServeStreamRequest) error {
|
||||
stream, err := e.lc.StreamServe(ctx, req)
|
||||
watcher, err := e.lc.WatchIPNBus(ctx, ipn.NotifyInitialState|ipn.NotifyServeRequest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer stream.Close()
|
||||
defer watcher.Close()
|
||||
n, err := watcher.Next()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sessionID := n.SessionID
|
||||
if sessionID == "" {
|
||||
return errors.New("missing SessionID")
|
||||
}
|
||||
sc, err := e.lc.GetServeConfig(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting serve config: %w", err)
|
||||
}
|
||||
if sc == nil {
|
||||
sc = &ipn.ServeConfig{}
|
||||
}
|
||||
setHandler(sc, req, sessionID)
|
||||
err = e.lc.SetServeConfig(ctx, sc)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error setting serve config: %w", err)
|
||||
}
|
||||
|
||||
fmt.Fprintf(os.Stderr, "Serve started on \"https://%s\".\n", strings.TrimSuffix(string(req.HostPort), ":443"))
|
||||
fmt.Fprintf(os.Stderr, "Press Ctrl-C to stop.\n\n")
|
||||
_, err = io.Copy(os.Stdout, stream)
|
||||
return err
|
||||
fmt.Fprintf(os.Stderr, "Funnel started on \"https://%s\".\n", strings.TrimSuffix(string(req.HostPort), ":443"))
|
||||
fmt.Fprintf(os.Stderr, "Press Ctrl-C to stop Funnel.\n\n")
|
||||
|
||||
for {
|
||||
n, err := watcher.Next()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error calling next: %w", err)
|
||||
}
|
||||
if n.RequestAccessLog == nil || n.RequestAccessLog.SessionID != sessionID {
|
||||
continue
|
||||
}
|
||||
bts, _ := json.Marshal(n.RequestAccessLog)
|
||||
fmt.Printf("%s\n", bts)
|
||||
}
|
||||
}
|
||||
|
||||
func setHandler(sc *ipn.ServeConfig, req ipn.ServeStreamRequest, sessionID string) {
|
||||
if sc.Foreground == nil {
|
||||
sc.Foreground = make(map[string]*ipn.ServeConfig)
|
||||
}
|
||||
if sc.Foreground[sessionID] == nil {
|
||||
sc.Foreground[sessionID] = &ipn.ServeConfig{}
|
||||
}
|
||||
if sc.Foreground[sessionID].TCP == nil {
|
||||
sc.Foreground[sessionID].TCP = make(map[uint16]*ipn.TCPPortHandler)
|
||||
}
|
||||
if _, ok := sc.Foreground[sessionID].TCP[443]; !ok {
|
||||
sc.Foreground[sessionID].TCP[443] = &ipn.TCPPortHandler{HTTPS: true}
|
||||
}
|
||||
if sc.Foreground[sessionID].Web == nil {
|
||||
sc.Foreground[sessionID].Web = make(map[ipn.HostPort]*ipn.WebServerConfig)
|
||||
}
|
||||
wsc, ok := sc.Foreground[sessionID].Web[req.HostPort]
|
||||
if !ok {
|
||||
wsc = &ipn.WebServerConfig{}
|
||||
sc.Foreground[sessionID].Web[req.HostPort] = wsc
|
||||
}
|
||||
if wsc.Handlers == nil {
|
||||
wsc.Handlers = make(map[string]*ipn.HTTPHandler)
|
||||
}
|
||||
wsc.Handlers[req.MountPoint] = &ipn.HTTPHandler{
|
||||
Proxy: req.Source,
|
||||
}
|
||||
if sc.AllowFunnel == nil {
|
||||
sc.AllowFunnel = make(map[ipn.HostPort]bool)
|
||||
}
|
||||
sc.AllowFunnel[req.HostPort] = true
|
||||
}
|
||||
|
||||
@ -66,6 +66,8 @@ const (
|
||||
NotifyInitialNetMap // if set, the first Notify message (sent immediately) will contain the current NetMap
|
||||
|
||||
NotifyNoPrivateKeys // if set, private keys that would normally be sent in updates are zeroed out
|
||||
|
||||
NotifyServeRequest // if set, RequestAccessLog messages will be sent to the watcher
|
||||
)
|
||||
|
||||
// Notify is a communication from a backend (e.g. tailscaled) to a frontend
|
||||
@ -122,6 +124,10 @@ type Notify struct {
|
||||
ClientVersion *tailcfg.ClientVersion `json:",omitempty"`
|
||||
|
||||
// type is mirrored in xcode/Shared/IPN.swift
|
||||
|
||||
// RequestAccessLog is a notification that a request
|
||||
// has been sent via the serve config.
|
||||
RequestAccessLog *RequestAccessLog `json:",omitempty"`
|
||||
}
|
||||
|
||||
func (n Notify) String() string {
|
||||
|
||||
@ -246,9 +246,6 @@ type LocalBackend struct {
|
||||
|
||||
serveListeners map[netip.AddrPort]*serveListener // addrPort => serveListener
|
||||
serveProxyHandlers sync.Map // string (HTTPHandler.Proxy) => *httputil.ReverseProxy
|
||||
// serveStreamers is a map for those running Funnel in the foreground
|
||||
// and streaming incoming requests.
|
||||
serveStreamers map[uint16]map[uint32]func(ipn.FunnelRequestLog) // serve port => map of stream loggers (key is UUID)
|
||||
|
||||
// statusLock must be held before calling statusChanged.Wait() or
|
||||
// statusChanged.Broadcast().
|
||||
@ -2014,7 +2011,18 @@ func (b *LocalBackend) WatchNotifications(ctx context.Context, mask ipn.NotifyWa
|
||||
go b.pollRequestEngineStatus(ctx)
|
||||
}
|
||||
|
||||
defer b.DeleteForegroundSession(sessionID) // TODO(marwan-at-work): check err
|
||||
// TODO(marwan-at-work): check err
|
||||
// TODO(marwan-at-work): streaming background logs?
|
||||
defer b.DeleteForegroundSession(sessionID)
|
||||
|
||||
if mask&ipn.NotifyServeRequest == 0 {
|
||||
fn = func(roNotify *ipn.Notify) (keepGoing bool) {
|
||||
if roNotify.RequestAccessLog != nil {
|
||||
return true
|
||||
}
|
||||
return origFn(roNotify)
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
@ -2346,7 +2354,7 @@ func (b *LocalBackend) setAtomicValuesFromPrefsLocked(p ipn.PrefsView) {
|
||||
} else {
|
||||
filtered := tsaddr.FilterPrefixesCopy(p.AdvertiseRoutes(), tsaddr.IsViaPrefix)
|
||||
b.containsViaIPFuncAtomic.Store(tsaddr.NewContainsIPFunc(filtered))
|
||||
b.setTCPPortsInterceptedFromNetmapAndPrefsLocked(p)
|
||||
b.setTCPPortsInterceptedFromNetmapAndPrefsLocked(p, true)
|
||||
}
|
||||
}
|
||||
|
||||
@ -4048,7 +4056,7 @@ func (b *LocalBackend) setNetMapLocked(nm *netmap.NetworkMap) {
|
||||
netns.SetBindToInterfaceByRoute(hasCapability(nm, tailcfg.CapabilityBindToInterfaceByRoute))
|
||||
netns.SetDisableBindConnToInterface(hasCapability(nm, tailcfg.CapabilityDebugDisableBindConnToInterface))
|
||||
|
||||
b.setTCPPortsInterceptedFromNetmapAndPrefsLocked(b.pm.CurrentPrefs())
|
||||
b.setTCPPortsInterceptedFromNetmapAndPrefsLocked(b.pm.CurrentPrefs(), true)
|
||||
if nm == nil {
|
||||
b.nodeByAddr = nil
|
||||
return
|
||||
@ -4095,7 +4103,11 @@ func (b *LocalBackend) setDebugLogsByCapabilityLocked(nm *netmap.NetworkMap) {
|
||||
}
|
||||
}
|
||||
|
||||
func (b *LocalBackend) reloadServeConfigLocked(prefs ipn.PrefsView) {
|
||||
// reloadServeConfigLocked reloads the serve config from the store or resets the
|
||||
// serve config to nil if not logged in. The "changed" parameter, when false, instructs
|
||||
// the method to only run the reset-logic and not reload the store from memory to ensure
|
||||
// foreground sessions are not removed if they are not saved on disk.
|
||||
func (b *LocalBackend) reloadServeConfigLocked(prefs ipn.PrefsView, changed bool) {
|
||||
if b.netMap == nil || !b.netMap.SelfNode.Valid() || !prefs.Valid() || b.pm.CurrentProfile().ID == "" {
|
||||
// We're not logged in, so we don't have a profile.
|
||||
// Don't try to load the serve config.
|
||||
@ -4103,6 +4115,9 @@ func (b *LocalBackend) reloadServeConfigLocked(prefs ipn.PrefsView) {
|
||||
b.serveConfig = ipn.ServeConfigView{}
|
||||
return
|
||||
}
|
||||
if !changed {
|
||||
return
|
||||
}
|
||||
confKey := ipn.ServeConfigKey(b.pm.CurrentProfile().ID)
|
||||
// TODO(maisem,bradfitz): prevent reading the config from disk
|
||||
// if the profile has not changed.
|
||||
@ -4129,20 +4144,27 @@ func (b *LocalBackend) reloadServeConfigLocked(prefs ipn.PrefsView) {
|
||||
// the ports that tailscaled should handle as a function of b.netMap and b.prefs.
|
||||
//
|
||||
// b.mu must be held.
|
||||
func (b *LocalBackend) setTCPPortsInterceptedFromNetmapAndPrefsLocked(prefs ipn.PrefsView) {
|
||||
func (b *LocalBackend) setTCPPortsInterceptedFromNetmapAndPrefsLocked(prefs ipn.PrefsView, changed bool) {
|
||||
handlePorts := make([]uint16, 0, 4)
|
||||
|
||||
if prefs.Valid() && prefs.RunSSH() && envknob.CanSSHD() {
|
||||
handlePorts = append(handlePorts, 22)
|
||||
}
|
||||
|
||||
b.reloadServeConfigLocked(prefs)
|
||||
b.reloadServeConfigLocked(prefs, changed)
|
||||
if b.serveConfig.Valid() {
|
||||
servePorts := make([]uint16, 0, 3)
|
||||
b.serveConfig.TCP().Range(func(port uint16, _ ipn.TCPPortHandlerView) bool {
|
||||
if port > 0 {
|
||||
servePorts = append(servePorts, uint16(port))
|
||||
}
|
||||
addServePorts := func(tcp views.MapFn[uint16, *ipn.TCPPortHandler, ipn.TCPPortHandlerView]) {
|
||||
tcp.Range(func(port uint16, _ ipn.TCPPortHandlerView) bool {
|
||||
if port > 0 {
|
||||
servePorts = append(servePorts, uint16(port))
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
addServePorts(b.serveConfig.TCP())
|
||||
b.serveConfig.Foreground().Range(func(_ string, v ipn.ServeConfigView) (cont bool) {
|
||||
addServePorts(v.TCP())
|
||||
return true
|
||||
})
|
||||
handlePorts = append(handlePorts, servePorts...)
|
||||
@ -4172,29 +4194,36 @@ func (b *LocalBackend) setServeProxyHandlersLocked() {
|
||||
return
|
||||
}
|
||||
var backends map[string]bool
|
||||
b.serveConfig.Web().Range(func(_ ipn.HostPort, conf ipn.WebServerConfigView) (cont bool) {
|
||||
conf.Handlers().Range(func(_ string, h ipn.HTTPHandlerView) (cont bool) {
|
||||
backend := h.Proxy()
|
||||
if backend == "" {
|
||||
// Only create proxy handlers for servers with a proxy backend.
|
||||
return true
|
||||
}
|
||||
mak.Set(&backends, backend, true)
|
||||
if _, ok := b.serveProxyHandlers.Load(backend); ok {
|
||||
return true
|
||||
}
|
||||
setBackends := func(webCfg views.MapFn[ipn.HostPort, *ipn.WebServerConfig, ipn.WebServerConfigView]) {
|
||||
webCfg.Range(func(_ ipn.HostPort, conf ipn.WebServerConfigView) (cont bool) {
|
||||
conf.Handlers().Range(func(_ string, h ipn.HTTPHandlerView) (cont bool) {
|
||||
backend := h.Proxy()
|
||||
if backend == "" {
|
||||
// Only create proxy handlers for servers with a proxy backend.
|
||||
return true
|
||||
}
|
||||
mak.Set(&backends, backend, true)
|
||||
if _, ok := b.serveProxyHandlers.Load(backend); ok {
|
||||
return true
|
||||
}
|
||||
|
||||
b.logf("serve: creating a new proxy handler for %s", backend)
|
||||
p, err := b.proxyHandlerForBackend(backend)
|
||||
if err != nil {
|
||||
// The backend endpoint (h.Proxy) should have been validated by expandProxyTarget
|
||||
// in the CLI, so just log the error here.
|
||||
b.logf("[unexpected] could not create proxy for %v: %s", backend, err)
|
||||
b.logf("serve: creating a new proxy handler for %s", backend)
|
||||
p, err := b.proxyHandlerForBackend(backend)
|
||||
if err != nil {
|
||||
// The backend endpoint (h.Proxy) should have been validated by expandProxyTarget
|
||||
// in the CLI, so just log the error here.
|
||||
b.logf("[unexpected] could not create proxy for %v: %s", backend, err)
|
||||
return true
|
||||
}
|
||||
b.serveProxyHandlers.Store(backend, p)
|
||||
return true
|
||||
}
|
||||
b.serveProxyHandlers.Store(backend, p)
|
||||
})
|
||||
return true
|
||||
})
|
||||
}
|
||||
setBackends(b.serveConfig.Web())
|
||||
b.serveConfig.Foreground().Range(func(_ string, v ipn.ServeConfigView) (cont bool) {
|
||||
setBackends(v.Web())
|
||||
return true
|
||||
})
|
||||
|
||||
@ -4881,7 +4910,7 @@ func (b *LocalBackend) SetDevStateStore(key, value string) error {
|
||||
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
b.setTCPPortsInterceptedFromNetmapAndPrefsLocked(b.pm.CurrentPrefs())
|
||||
b.setTCPPortsInterceptedFromNetmapAndPrefsLocked(b.pm.CurrentPrefs(), true)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -23,13 +23,14 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"go4.org/mem"
|
||||
"tailscale.com/ipn"
|
||||
"tailscale.com/logtail/backoff"
|
||||
"tailscale.com/net/netutil"
|
||||
"tailscale.com/syncs"
|
||||
"tailscale.com/tailcfg"
|
||||
"tailscale.com/types/logger"
|
||||
"tailscale.com/types/views"
|
||||
"tailscale.com/util/mak"
|
||||
"tailscale.com/version"
|
||||
)
|
||||
@ -239,17 +240,21 @@ func (b *LocalBackend) setServeConfigLocked(config *ipn.ServeConfig) error {
|
||||
|
||||
var bs []byte
|
||||
if config != nil {
|
||||
j, err := json.Marshal(config)
|
||||
// TODO(marwan): either Clone+StripForeground here (which means we need to double check lastServeConfJSON is unaffected)
|
||||
// OR: strip foreground on backend start ups.
|
||||
var err error
|
||||
bs, err = json.Marshal(config)
|
||||
if err != nil {
|
||||
return fmt.Errorf("encoding serve config: %w", err)
|
||||
}
|
||||
bs = j
|
||||
b.serveConfig = config.View()
|
||||
b.lastServeConfJSON = mem.B(bs)
|
||||
}
|
||||
if err := b.store.WriteState(confKey, bs); err != nil {
|
||||
return fmt.Errorf("writing ServeConfig to StateStore: %w", err)
|
||||
}
|
||||
|
||||
b.setTCPPortsInterceptedFromNetmapAndPrefsLocked(b.pm.CurrentPrefs())
|
||||
b.setTCPPortsInterceptedFromNetmapAndPrefsLocked(b.pm.CurrentPrefs(), false)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -271,154 +276,67 @@ func (b *LocalBackend) DeleteForegroundSession(sessionID string) error {
|
||||
return nil
|
||||
}
|
||||
sc := b.serveConfig.AsStruct()
|
||||
if hp, ok := shouldDeleteFunnel(sc, sessionID); ok {
|
||||
delete(sc.AllowFunnel, hp)
|
||||
if len(sc.AllowFunnel) == 0 {
|
||||
sc.AllowFunnel = nil
|
||||
}
|
||||
}
|
||||
delete(sc.Foreground, sessionID)
|
||||
return b.setServeConfigLocked(sc)
|
||||
}
|
||||
|
||||
// StreamServe opens a stream to write any incoming connections made
|
||||
// to the given HostPort out to the listening io.Writer.
|
||||
//
|
||||
// If Serve and Funnel were not already enabled for the HostPort in the ServeConfig,
|
||||
// the backend enables it for the duration of the context's lifespan and
|
||||
// then turns it back off once the context is closed. If either are already enabled,
|
||||
// then they remain that way but logs are still streamed
|
||||
func (b *LocalBackend) StreamServe(ctx context.Context, w io.Writer, req ipn.ServeStreamRequest) (err error) {
|
||||
f, ok := w.(http.Flusher)
|
||||
if !ok {
|
||||
return errors.New("writer not a flusher")
|
||||
// shouldDeleteFunnel returns the port and true if:
|
||||
// 1. This foreground has a TCP port, that
|
||||
// 2. Funnel has it referenced and allowed, and
|
||||
// 3. No other foreground or background session has the port
|
||||
// Ambiguity: what if another background had funnel on?
|
||||
func shouldDeleteFunnel(sc *ipn.ServeConfig, sessionID string) (hp ipn.HostPort, ok bool) {
|
||||
fg := sc.Foreground[sessionID]
|
||||
if len(fg.TCP) == 0 {
|
||||
return "", false
|
||||
}
|
||||
f.Flush()
|
||||
|
||||
port, err := req.HostPort.Port()
|
||||
if err != nil {
|
||||
return err
|
||||
// we can't have multiple TCPs under a single foreground session
|
||||
var port uint16
|
||||
for key := range fg.TCP {
|
||||
port = key
|
||||
}
|
||||
|
||||
// Turn on Funnel for the given HostPort.
|
||||
sc := b.ServeConfig().AsStruct()
|
||||
if sc == nil {
|
||||
sc = &ipn.ServeConfig{}
|
||||
}
|
||||
setHandler(sc, req)
|
||||
if err := b.SetServeConfig(sc); err != nil {
|
||||
return fmt.Errorf("errro setting serve config: %w", err)
|
||||
}
|
||||
// Defer turning off Funnel once stream ends.
|
||||
defer func() {
|
||||
sc := b.ServeConfig().AsStruct()
|
||||
deleteHandler(sc, req, port)
|
||||
err = errors.Join(err, b.SetServeConfig(sc))
|
||||
}()
|
||||
|
||||
var writeErrs []error
|
||||
writeToStream := func(log ipn.FunnelRequestLog) {
|
||||
jsonLog, err := json.Marshal(log)
|
||||
if err != nil {
|
||||
writeErrs = append(writeErrs, err)
|
||||
return
|
||||
var hasFunnel bool
|
||||
for key, b := range sc.AllowFunnel {
|
||||
givenPort, _ := key.Port()
|
||||
if givenPort == port {
|
||||
hasFunnel = b
|
||||
hp = key
|
||||
break
|
||||
}
|
||||
if _, err := fmt.Fprintf(w, "%s\n", jsonLog); err != nil {
|
||||
writeErrs = append(writeErrs, err)
|
||||
return
|
||||
}
|
||||
if !hasFunnel {
|
||||
return "", false
|
||||
}
|
||||
if _, ok := sc.TCP[port]; ok {
|
||||
return "", false
|
||||
}
|
||||
for key, givenFg := range sc.Foreground {
|
||||
if key == sessionID {
|
||||
continue
|
||||
}
|
||||
if _, ok := givenFg.TCP[port]; ok {
|
||||
return "", false
|
||||
}
|
||||
f.Flush()
|
||||
}
|
||||
|
||||
// Hook up connections stream.
|
||||
b.mu.Lock()
|
||||
mak.NonNilMapForJSON(&b.serveStreamers)
|
||||
if b.serveStreamers[port] == nil {
|
||||
b.serveStreamers[port] = make(map[uint32]func(ipn.FunnelRequestLog))
|
||||
}
|
||||
id := uuid.New().ID()
|
||||
b.serveStreamers[port][id] = writeToStream
|
||||
b.mu.Unlock()
|
||||
|
||||
// Clean up streamer when done.
|
||||
defer func() {
|
||||
b.mu.Lock()
|
||||
delete(b.serveStreamers[port], id)
|
||||
b.mu.Unlock()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// Triggered by foreground `tailscale funnel` process
|
||||
// (the streamer) getting closed, or by turning off Tailscale.
|
||||
}
|
||||
|
||||
return errors.Join(writeErrs...)
|
||||
return hp, true
|
||||
}
|
||||
|
||||
func setHandler(sc *ipn.ServeConfig, req ipn.ServeStreamRequest) {
|
||||
if sc.TCP == nil {
|
||||
sc.TCP = make(map[uint16]*ipn.TCPPortHandler)
|
||||
// maybeLogServeConnection creates a RequestAccessLog and sends it to any watchers
|
||||
// who are subscribed to ipn.NotifyServeRequest. SessionID, when not empty, indicates
|
||||
// that this log is intended for a specific foreground serve session, otherwise it may be
|
||||
// a background serve whose logs are being followed.
|
||||
func (b *LocalBackend) maybeLogServeConnection(destPort uint16, srcAddr netip.AddrPort, sessionID string) {
|
||||
log := &ipn.RequestAccessLog{
|
||||
SrcAddr: srcAddr,
|
||||
Time: b.clock.Now(),
|
||||
SessionID: sessionID,
|
||||
}
|
||||
if _, ok := sc.TCP[443]; !ok {
|
||||
sc.TCP[443] = &ipn.TCPPortHandler{
|
||||
HTTPS: true,
|
||||
}
|
||||
}
|
||||
if sc.Web == nil {
|
||||
sc.Web = make(map[ipn.HostPort]*ipn.WebServerConfig)
|
||||
}
|
||||
wsc, ok := sc.Web[req.HostPort]
|
||||
if !ok {
|
||||
wsc = &ipn.WebServerConfig{}
|
||||
sc.Web[req.HostPort] = wsc
|
||||
}
|
||||
if wsc.Handlers == nil {
|
||||
wsc.Handlers = make(map[string]*ipn.HTTPHandler)
|
||||
}
|
||||
wsc.Handlers[req.MountPoint] = &ipn.HTTPHandler{
|
||||
Proxy: req.Source,
|
||||
}
|
||||
if req.Funnel {
|
||||
if sc.AllowFunnel == nil {
|
||||
sc.AllowFunnel = make(map[ipn.HostPort]bool)
|
||||
}
|
||||
sc.AllowFunnel[req.HostPort] = true
|
||||
}
|
||||
}
|
||||
|
||||
func deleteHandler(sc *ipn.ServeConfig, req ipn.ServeStreamRequest, port uint16) {
|
||||
delete(sc.AllowFunnel, req.HostPort)
|
||||
if sc.TCP != nil {
|
||||
delete(sc.TCP, port)
|
||||
}
|
||||
if sc.Web == nil {
|
||||
return
|
||||
}
|
||||
if sc.Web[req.HostPort] == nil {
|
||||
return
|
||||
}
|
||||
wsc, ok := sc.Web[req.HostPort]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if wsc.Handlers == nil {
|
||||
return
|
||||
}
|
||||
if _, ok := wsc.Handlers[req.MountPoint]; !ok {
|
||||
return
|
||||
}
|
||||
delete(wsc.Handlers, req.MountPoint)
|
||||
if len(wsc.Handlers) == 0 {
|
||||
delete(sc.Web, req.HostPort)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *LocalBackend) maybeLogServeConnection(destPort uint16, srcAddr netip.AddrPort) {
|
||||
b.mu.Lock()
|
||||
streamers := b.serveStreamers[destPort]
|
||||
b.mu.Unlock()
|
||||
if len(streamers) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
var log ipn.FunnelRequestLog
|
||||
log.SrcAddr = srcAddr
|
||||
log.Time = b.clock.Now()
|
||||
|
||||
if node, user, ok := b.WhoIs(srcAddr); ok {
|
||||
log.NodeName = node.ComputedName()
|
||||
@ -430,9 +348,7 @@ func (b *LocalBackend) maybeLogServeConnection(destPort uint16, srcAddr netip.Ad
|
||||
}
|
||||
}
|
||||
|
||||
for _, stream := range streamers {
|
||||
stream(log)
|
||||
}
|
||||
b.send(ipn.Notify{RequestAccessLog: log})
|
||||
}
|
||||
|
||||
func (b *LocalBackend) HandleIngressTCPConn(ingressPeer tailcfg.NodeView, target ipn.HostPort, srcAddr netip.AddrPort, getConnOrReset func() (net.Conn, bool), sendRST func()) {
|
||||
@ -504,83 +420,93 @@ func (b *LocalBackend) tcpHandlerForServe(dport uint16, srcAddr netip.AddrPort)
|
||||
return nil
|
||||
}
|
||||
|
||||
tcph, ok := sc.TCP().GetOk(dport)
|
||||
if !ok {
|
||||
b.logf("[unexpected] localbackend: got TCP conn without TCP config for port %v; from %v", dport, srcAddr)
|
||||
findHandler := func(tcpCfg views.MapFn[uint16, *ipn.TCPPortHandler, ipn.TCPPortHandlerView], sessionID string) (handler func(net.Conn) error) {
|
||||
tcph, ok := tcpCfg.GetOk(dport)
|
||||
if !ok {
|
||||
b.logf("[unexpected] localbackend: got TCP conn without TCP config for port %v; from %v", dport, srcAddr)
|
||||
return nil
|
||||
}
|
||||
|
||||
if tcph.HTTPS() || tcph.HTTP() {
|
||||
hs := &http.Server{
|
||||
Handler: b.newServeWebHandlerForSession(sessionID),
|
||||
BaseContext: func(_ net.Listener) context.Context {
|
||||
return context.WithValue(context.Background(), serveHTTPContextKey{}, &serveHTTPContext{
|
||||
SrcAddr: srcAddr,
|
||||
DestPort: dport,
|
||||
})
|
||||
},
|
||||
}
|
||||
if tcph.HTTPS() {
|
||||
hs.TLSConfig = &tls.Config{
|
||||
GetCertificate: b.getTLSServeCertForPort(dport),
|
||||
}
|
||||
return func(c net.Conn) error {
|
||||
return hs.ServeTLS(netutil.NewOneConnListener(c, nil), "", "")
|
||||
}
|
||||
}
|
||||
|
||||
return func(c net.Conn) error {
|
||||
return hs.Serve(netutil.NewOneConnListener(c, nil))
|
||||
}
|
||||
}
|
||||
|
||||
if backDst := tcph.TCPForward(); backDst != "" {
|
||||
return func(conn net.Conn) error {
|
||||
defer conn.Close()
|
||||
b.maybeLogServeConnection(dport, srcAddr, sessionID)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
backConn, err := b.dialer.SystemDial(ctx, "tcp", backDst)
|
||||
cancel()
|
||||
if err != nil {
|
||||
b.logf("localbackend: failed to TCP proxy port %v (from %v) to %s: %v", dport, srcAddr, backDst, err)
|
||||
return nil
|
||||
}
|
||||
defer backConn.Close()
|
||||
if sni := tcph.TerminateTLS(); sni != "" {
|
||||
conn = tls.Server(conn, &tls.Config{
|
||||
GetCertificate: func(hi *tls.ClientHelloInfo) (*tls.Certificate, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
|
||||
defer cancel()
|
||||
pair, err := b.GetCertPEM(ctx, sni, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cert, err := tls.X509KeyPair(pair.CertPEM, pair.KeyPEM)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &cert, nil
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// TODO(bradfitz): do the RegisterIPPortIdentity and
|
||||
// UnregisterIPPortIdentity stuff that netstack does
|
||||
errc := make(chan error, 1)
|
||||
go func() {
|
||||
_, err := io.Copy(backConn, conn)
|
||||
errc <- err
|
||||
}()
|
||||
go func() {
|
||||
_, err := io.Copy(conn, backConn)
|
||||
errc <- err
|
||||
}()
|
||||
return <-errc
|
||||
}
|
||||
}
|
||||
|
||||
b.logf("closing TCP conn to port %v (from %v) with actionless TCPPortHandler", dport, srcAddr)
|
||||
return nil
|
||||
}
|
||||
|
||||
if tcph.HTTPS() || tcph.HTTP() {
|
||||
hs := &http.Server{
|
||||
Handler: http.HandlerFunc(b.serveWebHandler),
|
||||
BaseContext: func(_ net.Listener) context.Context {
|
||||
return context.WithValue(context.Background(), serveHTTPContextKey{}, &serveHTTPContext{
|
||||
SrcAddr: srcAddr,
|
||||
DestPort: dport,
|
||||
})
|
||||
},
|
||||
}
|
||||
if tcph.HTTPS() {
|
||||
hs.TLSConfig = &tls.Config{
|
||||
GetCertificate: b.getTLSServeCertForPort(dport),
|
||||
}
|
||||
return func(c net.Conn) error {
|
||||
return hs.ServeTLS(netutil.NewOneConnListener(c, nil), "", "")
|
||||
}
|
||||
}
|
||||
|
||||
return func(c net.Conn) error {
|
||||
return hs.Serve(netutil.NewOneConnListener(c, nil))
|
||||
}
|
||||
sc.Foreground().Range(func(k string, v ipn.ServeConfigView) (cont bool) {
|
||||
handler = findHandler(v.TCP(), k)
|
||||
return handler == nil
|
||||
})
|
||||
if handler != nil {
|
||||
return handler
|
||||
}
|
||||
|
||||
if backDst := tcph.TCPForward(); backDst != "" {
|
||||
return func(conn net.Conn) error {
|
||||
defer conn.Close()
|
||||
b.maybeLogServeConnection(dport, srcAddr)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
backConn, err := b.dialer.SystemDial(ctx, "tcp", backDst)
|
||||
cancel()
|
||||
if err != nil {
|
||||
b.logf("localbackend: failed to TCP proxy port %v (from %v) to %s: %v", dport, srcAddr, backDst, err)
|
||||
return nil
|
||||
}
|
||||
defer backConn.Close()
|
||||
if sni := tcph.TerminateTLS(); sni != "" {
|
||||
conn = tls.Server(conn, &tls.Config{
|
||||
GetCertificate: func(hi *tls.ClientHelloInfo) (*tls.Certificate, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
|
||||
defer cancel()
|
||||
pair, err := b.GetCertPEM(ctx, sni, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cert, err := tls.X509KeyPair(pair.CertPEM, pair.KeyPEM)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &cert, nil
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// TODO(bradfitz): do the RegisterIPPortIdentity and
|
||||
// UnregisterIPPortIdentity stuff that netstack does
|
||||
errc := make(chan error, 1)
|
||||
go func() {
|
||||
_, err := io.Copy(backConn, conn)
|
||||
errc <- err
|
||||
}()
|
||||
go func() {
|
||||
_, err := io.Copy(conn, backConn)
|
||||
errc <- err
|
||||
}()
|
||||
return <-errc
|
||||
}
|
||||
}
|
||||
|
||||
b.logf("closing TCP conn to port %v (from %v) with actionless TCPPortHandler", dport, srcAddr)
|
||||
return nil
|
||||
return findHandler(sc.TCP(), "")
|
||||
}
|
||||
|
||||
func getServeHTTPContext(r *http.Request) (c *serveHTTPContext, ok bool) {
|
||||
@ -700,40 +626,45 @@ func (b *LocalBackend) addTailscaleIdentityHeaders(r *httputil.ProxyRequest) {
|
||||
r.Out.Header.Set("Tailscale-Headers-Info", "https://tailscale.com/s/serve-headers")
|
||||
}
|
||||
|
||||
func (b *LocalBackend) serveWebHandler(w http.ResponseWriter, r *http.Request) {
|
||||
h, mountPoint, ok := b.getServeHandler(r)
|
||||
if !ok {
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
if c, ok := getServeHTTPContext(r); ok {
|
||||
b.maybeLogServeConnection(c.DestPort, c.SrcAddr)
|
||||
}
|
||||
if s := h.Text(); s != "" {
|
||||
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
||||
io.WriteString(w, s)
|
||||
return
|
||||
}
|
||||
if v := h.Path(); v != "" {
|
||||
b.serveFileOrDirectory(w, r, v, mountPoint)
|
||||
return
|
||||
}
|
||||
if v := h.Proxy(); v != "" {
|
||||
p, ok := b.serveProxyHandlers.Load(v)
|
||||
// newServeWebHandlerForSession returns an http.HandlerFunc that matches incoming
|
||||
// HTTP requests to its corresponding Web Handler in the ServeConfig. The handler
|
||||
// also logs requests to the IPN Bus which will include the given sessionID if not empty.
|
||||
func (b *LocalBackend) newServeWebHandlerForSession(sessionID string) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
h, mountPoint, ok := b.getServeHandler(r)
|
||||
if !ok {
|
||||
http.Error(w, "unknown proxy destination", http.StatusInternalServerError)
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
h := p.(http.Handler)
|
||||
// Trim the mount point from the URL path before proxying. (#6571)
|
||||
if r.URL.Path != "/" {
|
||||
h = http.StripPrefix(strings.TrimSuffix(mountPoint, "/"), h)
|
||||
if c, ok := getServeHTTPContext(r); ok {
|
||||
b.maybeLogServeConnection(c.DestPort, c.SrcAddr, sessionID)
|
||||
}
|
||||
if s := h.Text(); s != "" {
|
||||
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
||||
io.WriteString(w, s)
|
||||
return
|
||||
}
|
||||
if v := h.Path(); v != "" {
|
||||
b.serveFileOrDirectory(w, r, v, mountPoint)
|
||||
return
|
||||
}
|
||||
if v := h.Proxy(); v != "" {
|
||||
p, ok := b.serveProxyHandlers.Load(v)
|
||||
if !ok {
|
||||
http.Error(w, "unknown proxy destination", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
h := p.(http.Handler)
|
||||
// Trim the mount point from the URL path before proxying. (#6571)
|
||||
if r.URL.Path != "/" {
|
||||
h = http.StripPrefix(strings.TrimSuffix(mountPoint, "/"), h)
|
||||
}
|
||||
h.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
h.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
http.Error(w, "empty handler", 500)
|
||||
http.Error(w, "empty handler", 500)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *LocalBackend) serveFileOrDirectory(w http.ResponseWriter, r *http.Request, fileOrDir, mountPoint string) {
|
||||
@ -844,6 +775,13 @@ func (b *LocalBackend) webServerConfig(hostname string, port uint16) (c ipn.WebS
|
||||
if !b.serveConfig.Valid() {
|
||||
return c, false
|
||||
}
|
||||
b.serveConfig.Foreground().Range(func(k string, v ipn.ServeConfigView) (cont bool) {
|
||||
c, ok = v.Web().GetOk(key)
|
||||
return !ok
|
||||
})
|
||||
if ok {
|
||||
return c, ok
|
||||
}
|
||||
return b.serveConfig.Web().GetOk(key)
|
||||
}
|
||||
|
||||
|
||||
@ -296,7 +296,7 @@ func TestServeHTTPProxy(t *testing.T) {
|
||||
}))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
b.serveWebHandler(w, req)
|
||||
b.newServeWebHandlerForSession("")(w, req)
|
||||
|
||||
// Verify the headers.
|
||||
h := w.Result().Header
|
||||
|
||||
@ -97,7 +97,6 @@ var handler = map[string]localAPIHandler{
|
||||
"set-expiry-sooner": (*Handler).serveSetExpirySooner,
|
||||
"start": (*Handler).serveStart,
|
||||
"status": (*Handler).serveStatus,
|
||||
"stream-serve": (*Handler).serveStreamServe,
|
||||
"tka/init": (*Handler).serveTKAInit,
|
||||
"tka/log": (*Handler).serveTKALog,
|
||||
"tka/modify": (*Handler).serveTKAModify,
|
||||
@ -854,35 +853,6 @@ func (h *Handler) serveServeConfig(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
// serveStreamServe handles foreground serve and funnel streams. This is
|
||||
// currently in development per https://github.com/tailscale/tailscale/issues/8489
|
||||
func (h *Handler) serveStreamServe(w http.ResponseWriter, r *http.Request) {
|
||||
if !envknob.UseWIPCode() {
|
||||
http.Error(w, "stream serve not yet available", http.StatusNotImplemented)
|
||||
return
|
||||
}
|
||||
if !h.PermitWrite {
|
||||
// Write permission required because we modify the ServeConfig.
|
||||
http.Error(w, "serve stream denied", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
if r.Method != "POST" {
|
||||
http.Error(w, "POST required", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
var req ipn.ServeStreamRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeErrorJSON(w, fmt.Errorf("decoding HostPort: %w", err))
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
if err := h.b.StreamServe(r.Context(), w, req); err != nil {
|
||||
writeErrorJSON(w, fmt.Errorf("streaming serve: %w", err))
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
func (h *Handler) serveCheckIPForwarding(w http.ResponseWriter, r *http.Request) {
|
||||
if !h.PermitRead {
|
||||
http.Error(w, "IP forwarding check access denied", http.StatusForbidden)
|
||||
|
||||
14
ipn/serve.go
14
ipn/serve.go
@ -38,7 +38,7 @@ type ServeConfig struct {
|
||||
// traffic is allowed, from trusted ingress peers.
|
||||
AllowFunnel map[HostPort]bool `json:",omitempty"`
|
||||
|
||||
// Foreground is a map of an IPN Bus session id to a
|
||||
// Foreground is a map of an IPN Bus session ID to a
|
||||
// foreground serve config. Note that only TCP and Web
|
||||
// are used inside the Foreground map.
|
||||
//
|
||||
@ -107,16 +107,20 @@ type ServeStreamRequest struct {
|
||||
Funnel bool `json:",omitempty"`
|
||||
}
|
||||
|
||||
// FunnelRequestLog is the JSON type written out to io.Writers
|
||||
// watching funnel connections via ipnlocal.StreamServe.
|
||||
// RequestAccessLog is the JSON type written out to io.Writers
|
||||
// watching serve connections via ipnlocal.StreamServe.
|
||||
//
|
||||
// This structure is in development and subject to change.
|
||||
type FunnelRequestLog struct {
|
||||
type RequestAccessLog struct {
|
||||
Time time.Time `json:",omitempty"` // time of request forwarding
|
||||
|
||||
// SrcAddr is the address that initiated the Funnel request.
|
||||
// SrcAddr is the address that initiated the serve or funnel request.
|
||||
SrcAddr netip.AddrPort `json:",omitempty"`
|
||||
|
||||
// SessionID, if not empty, means this request was
|
||||
// meant for a specific WatchIPNBus session.
|
||||
SessionID string `json:",omitempty"`
|
||||
|
||||
// The following fields are only populated if the connection
|
||||
// initiated from another node on the client's tailnet.
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user