diff --git a/k8s-operator/sessionrecording/spdy/conn.go b/k8s-operator/sessionrecording/spdy/conn.go index 9fefca11f..243ea36bc 100644 --- a/k8s-operator/sessionrecording/spdy/conn.go +++ b/k8s-operator/sessionrecording/spdy/conn.go @@ -72,6 +72,7 @@ type conn struct { // rec knows how to send data written to it to a tsrecorder instance. rec *tsrecorder.Client + stdinStreamID atomic.Uint32 stdoutStreamID atomic.Uint32 stderrStreamID atomic.Uint32 resizeStreamID atomic.Uint32 @@ -141,10 +142,21 @@ func (c *conn) Read(b []byte) (int, error) { } c.readBuf.Next(len(sf.Raw)) // advance buffer past the parsed frame - if !sf.Ctrl && c.hasTerm { // data frame + if !sf.Ctrl { // data frame switch sf.StreamID { + case c.stdinStreamID.Load(): + select { + case <-c.ctx.Done(): + return 0, c.ctx.Err() + case <-c.initialCastHeaderSent: + if err := c.rec.WriteInput(sf.Payload); err != nil { + return 0, fmt.Errorf("error sending stdin to session recorder: %w", err) + } + } case c.resizeStreamID.Load(): - + if !c.hasTerm { + return n, nil + } var msg spdyResizeMsg if err = json.Unmarshal(sf.Payload, &msg); err != nil { return 0, fmt.Errorf("error umarshalling resize msg: %w", err) @@ -257,6 +269,8 @@ func (c *conn) storeStreamID(sf spdyFrame, header http.Header) { ) id := binary.BigEndian.Uint32(sf.Payload[0:4]) switch header.Get(streamTypeHeaderKey) { + case corev1.StreamTypeStdin: + c.stdinStreamID.Store(id) case corev1.StreamTypeStdout: c.stdoutStreamID.Store(id) case corev1.StreamTypeStderr: diff --git a/k8s-operator/sessionrecording/tsrecorder/tsrecorder.go b/k8s-operator/sessionrecording/tsrecorder/tsrecorder.go index a5bdf7ddd..bbdc1310c 100644 --- a/k8s-operator/sessionrecording/tsrecorder/tsrecorder.go +++ b/k8s-operator/sessionrecording/tsrecorder/tsrecorder.go @@ -61,6 +61,19 @@ func (rec *Client) WriteOutput(p []byte) (err error) { string(p)}) } +// WriteInput sends terminal stdin to the tsrecorder. +// https://docs.asciinema.org/manual/asciicast/v2/#i-input-data-read-from-a-terminal +func (rec *Client) WriteInput(p []byte) (err error) { + const inputEventCode = "i" + if len(p) == 0 { + return nil + } + return rec.write([]any{ + rec.clock.Now().Sub(rec.start).Seconds(), + inputEventCode, + string(p)}) +} + // WriteResize writes an asciinema resize message. This can be called if // terminal size has changed. // https://docs.asciinema.org/manual/asciicast/v2/#r-resize diff --git a/k8s-operator/sessionrecording/ws/conn.go b/k8s-operator/sessionrecording/ws/conn.go index a618f85fb..f3a39ea2d 100644 --- a/k8s-operator/sessionrecording/ws/conn.go +++ b/k8s-operator/sessionrecording/ws/conn.go @@ -193,10 +193,22 @@ func (c *conn) Read(b []byte) (int, error) { c.readBuf.Next(len(readMsg.raw)) if readMsg.isFinalized && !c.readMsgIsIncomplete() { - // we want to send stream resize messages for terminal sessions // Stream IDs for websocket streams are static. // https://github.com/kubernetes/client-go/blob/v0.30.0-rc.1/tools/remotecommand/websocket.go#L218 - if readMsg.streamID.Load() == remotecommand.StreamResize && c.hasTerm { + switch readMsg.streamID.Load() { + case remotecommand.StreamStdIn: + select { + case <-c.ctx.Done(): + return 0, c.ctx.Err() + case <-c.initialCastHeaderSent: + if err := c.rec.WriteInput(readMsg.payload); err != nil { + return 0, fmt.Errorf("error sending stdin to session recorder: %w", err) + } + } + case remotecommand.StreamResize: + if !c.hasTerm { + break + } var msg tsrecorder.ResizeMsg if err = json.Unmarshal(readMsg.payload, &msg); err != nil { return 0, fmt.Errorf("error umarshalling resize message: %w", err) diff --git a/k8s-operator/sessionrecording/ws/conn_test.go b/k8s-operator/sessionrecording/ws/conn_test.go index 87205c4e6..dbd1542ed 100644 --- a/k8s-operator/sessionrecording/ws/conn_test.go +++ b/k8s-operator/sessionrecording/ws/conn_test.go @@ -254,13 +254,23 @@ func Test_conn_ReadRand(t *testing.T) { if err != nil { t.Fatalf("error creating a test logger: %v", err) } + cl := tstest.NewClock(tstest.ClockOpts{}) + sr := &fakes.TestSessionRecorder{} + rec := tsrecorder.New(sr, cl, cl.Now(), true, zl.Sugar()) for i := range 100 { tc := &fakes.TestConn{} tc.ResetReadBuf() c := &conn{ - Conn: tc, - log: zl.Sugar(), + Conn: tc, + log: zl.Sugar(), + rec: rec, + ctx: context.Background(), + initialCastHeaderSent: make(chan struct{}), } + // Never block for random data. + c.writeCastHeaderOnce.Do(func() { + close(c.initialCastHeaderSent) + }) bb := fakes.RandomBytes(t) for j, input := range bb { if err := tc.WriteReadBufBytes(input); err != nil {