cmd/k8s-operator: record stdin

Updates tailscale/corp#36126

Signed-off-by: Raj Singh <raj@tailscale.com>
This commit is contained in:
Raj Singh 2026-01-20 20:45:59 -05:00
parent 0a5639dcc0
commit 9f95400bd4
4 changed files with 55 additions and 6 deletions

View File

@ -72,6 +72,7 @@ type conn struct {
// rec knows how to send data written to it to a tsrecorder instance.
rec *tsrecorder.Client
stdinStreamID atomic.Uint32
stdoutStreamID atomic.Uint32
stderrStreamID atomic.Uint32
resizeStreamID atomic.Uint32
@ -141,10 +142,21 @@ func (c *conn) Read(b []byte) (int, error) {
}
c.readBuf.Next(len(sf.Raw)) // advance buffer past the parsed frame
if !sf.Ctrl && c.hasTerm { // data frame
if !sf.Ctrl { // data frame
switch sf.StreamID {
case c.stdinStreamID.Load():
select {
case <-c.ctx.Done():
return 0, c.ctx.Err()
case <-c.initialCastHeaderSent:
if err := c.rec.WriteInput(sf.Payload); err != nil {
return 0, fmt.Errorf("error sending stdin to session recorder: %w", err)
}
}
case c.resizeStreamID.Load():
if !c.hasTerm {
return n, nil
}
var msg spdyResizeMsg
if err = json.Unmarshal(sf.Payload, &msg); err != nil {
return 0, fmt.Errorf("error umarshalling resize msg: %w", err)
@ -257,6 +269,8 @@ func (c *conn) storeStreamID(sf spdyFrame, header http.Header) {
)
id := binary.BigEndian.Uint32(sf.Payload[0:4])
switch header.Get(streamTypeHeaderKey) {
case corev1.StreamTypeStdin:
c.stdinStreamID.Store(id)
case corev1.StreamTypeStdout:
c.stdoutStreamID.Store(id)
case corev1.StreamTypeStderr:

View File

@ -61,6 +61,19 @@ func (rec *Client) WriteOutput(p []byte) (err error) {
string(p)})
}
// WriteInput sends terminal stdin to the tsrecorder.
// https://docs.asciinema.org/manual/asciicast/v2/#i-input-data-read-from-a-terminal
func (rec *Client) WriteInput(p []byte) (err error) {
const inputEventCode = "i"
if len(p) == 0 {
return nil
}
return rec.write([]any{
rec.clock.Now().Sub(rec.start).Seconds(),
inputEventCode,
string(p)})
}
// WriteResize writes an asciinema resize message. This can be called if
// terminal size has changed.
// https://docs.asciinema.org/manual/asciicast/v2/#r-resize

View File

@ -193,10 +193,22 @@ func (c *conn) Read(b []byte) (int, error) {
c.readBuf.Next(len(readMsg.raw))
if readMsg.isFinalized && !c.readMsgIsIncomplete() {
// we want to send stream resize messages for terminal sessions
// Stream IDs for websocket streams are static.
// https://github.com/kubernetes/client-go/blob/v0.30.0-rc.1/tools/remotecommand/websocket.go#L218
if readMsg.streamID.Load() == remotecommand.StreamResize && c.hasTerm {
switch readMsg.streamID.Load() {
case remotecommand.StreamStdIn:
select {
case <-c.ctx.Done():
return 0, c.ctx.Err()
case <-c.initialCastHeaderSent:
if err := c.rec.WriteInput(readMsg.payload); err != nil {
return 0, fmt.Errorf("error sending stdin to session recorder: %w", err)
}
}
case remotecommand.StreamResize:
if !c.hasTerm {
break
}
var msg tsrecorder.ResizeMsg
if err = json.Unmarshal(readMsg.payload, &msg); err != nil {
return 0, fmt.Errorf("error umarshalling resize message: %w", err)

View File

@ -254,13 +254,23 @@ func Test_conn_ReadRand(t *testing.T) {
if err != nil {
t.Fatalf("error creating a test logger: %v", err)
}
cl := tstest.NewClock(tstest.ClockOpts{})
sr := &fakes.TestSessionRecorder{}
rec := tsrecorder.New(sr, cl, cl.Now(), true, zl.Sugar())
for i := range 100 {
tc := &fakes.TestConn{}
tc.ResetReadBuf()
c := &conn{
Conn: tc,
log: zl.Sugar(),
Conn: tc,
log: zl.Sugar(),
rec: rec,
ctx: context.Background(),
initialCastHeaderSent: make(chan struct{}),
}
// Never block for random data.
c.writeCastHeaderOnce.Do(func() {
close(c.initialCastHeaderSent)
})
bb := fakes.RandomBytes(t)
for j, input := range bb {
if err := tc.WriteReadBufBytes(input); err != nil {