mirror of
https://github.com/siderolabs/talos.git
synced 2026-05-05 12:26:21 +02:00
297 lines
6.9 KiB
Go
297 lines
6.9 KiB
Go
// This Source Code Form is subject to the terms of the Mozilla Public
|
|
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
|
// Package progress provides functionality to track and report image pull progress.
|
|
package progress
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"log"
|
|
"slices"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/containerd/containerd/v2/core/content"
|
|
"github.com/containerd/containerd/v2/core/images"
|
|
"github.com/containerd/containerd/v2/core/remotes"
|
|
"github.com/containerd/containerd/v2/core/snapshots"
|
|
"github.com/containerd/containerd/v2/pkg/snapshotters"
|
|
"github.com/containerd/errdefs"
|
|
"github.com/moby/moby/client/pkg/stringid"
|
|
"github.com/opencontainers/go-digest"
|
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
|
)
|
|
|
|
// LayerPullStatus represents the status of a single image layer during pull.
|
|
type LayerPullStatus int
|
|
|
|
// Possible values for LayerPullStatus.
|
|
const (
|
|
LayerPullStatusDownloading LayerPullStatus = iota
|
|
LayerPullStatusDownloadComplete
|
|
LayerPullStatusExtracting
|
|
LayerPullStatusExtractComplete
|
|
LayerPullStatusAlreadyExists
|
|
)
|
|
|
|
// LayerPullProgress represents the progress of an image pull operation.
|
|
//
|
|
// Note: keep this in sync machine/images.proto.
|
|
type LayerPullProgress struct {
|
|
LayerID string
|
|
Status LayerPullStatus
|
|
// If Status is Extracting, this shows the elapsed time since extraction started.
|
|
Elapsed time.Duration
|
|
// If Status is Downloading, these show the current offset and total size of the layer.
|
|
Offset int64
|
|
Total int64
|
|
}
|
|
|
|
// UpdateFn is used by PullProgress to report progress updates.
|
|
type UpdateFn func(LayerPullProgress)
|
|
|
|
// PullProgress tracks and reports the progress of image pulls.
|
|
type PullProgress struct {
|
|
store content.Store
|
|
snapshotter snapshots.Snapshotter
|
|
updateFn UpdateFn
|
|
|
|
mu sync.Mutex
|
|
descs map[digest.Digest]ocispec.Descriptor // guarded by mu
|
|
|
|
layers []ocispec.Descriptor
|
|
unpackStart map[digest.Digest]time.Time
|
|
}
|
|
|
|
// NewPullProgress creates a new PullProgress instance.
|
|
func NewPullProgress(store content.Store, snapshotter snapshots.Snapshotter, fn UpdateFn) *PullProgress {
|
|
return &PullProgress{
|
|
updateFn: fn,
|
|
store: store,
|
|
snapshotter: snapshotter,
|
|
descs: make(map[digest.Digest]ocispec.Descriptor),
|
|
mu: sync.Mutex{},
|
|
}
|
|
}
|
|
|
|
// ShowProgress starts tracking and reporting pull progress.
|
|
func (p *PullProgress) ShowProgress(ctx context.Context) func() {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
start := time.Now()
|
|
lastUpdate := make(chan struct{})
|
|
|
|
go func() {
|
|
ticker := time.NewTicker(100 * time.Millisecond)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), time.Millisecond*500)
|
|
defer cancel()
|
|
|
|
if err := p.trackProgress(ctx, start); err != nil {
|
|
log.Printf("failed to write pull progress: %s", err.Error())
|
|
}
|
|
|
|
close(lastUpdate)
|
|
|
|
return
|
|
|
|
case <-ticker.C:
|
|
if err := p.trackProgress(ctx, start); err != nil {
|
|
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
|
log.Printf("updating progress failed %s", err.Error())
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
// call this when done pulling to stop progress updates
|
|
return func() {
|
|
cancel()
|
|
<-lastUpdate
|
|
}
|
|
}
|
|
|
|
func (p *PullProgress) trackProgress(ctx context.Context, start time.Time) error {
|
|
err := p.trackOngoingPulls(ctx, start)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return p.trackPulledLayers(ctx)
|
|
}
|
|
|
|
func (p *PullProgress) trackOngoingPulls(ctx context.Context, start time.Time) error { //nolint:gocyclo
|
|
actives, err := p.store.ListStatuses(ctx, "")
|
|
if err != nil {
|
|
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
pulling := make(map[string]content.Status, len(actives))
|
|
|
|
for _, status := range actives {
|
|
pulling[status.Ref] = status
|
|
}
|
|
|
|
for _, j := range p.jobs() {
|
|
key := remotes.MakeRefKey(ctx, j)
|
|
if info, ok := pulling[key]; ok {
|
|
if info.Offset == 0 {
|
|
continue
|
|
}
|
|
|
|
p.updateFn(LayerPullProgress{
|
|
LayerID: stringid.TruncateID(j.Digest.Encoded()),
|
|
Status: LayerPullStatusDownloading,
|
|
Offset: info.Offset,
|
|
Total: info.Total,
|
|
})
|
|
|
|
continue
|
|
}
|
|
|
|
info, err := p.store.Info(ctx, j.Digest)
|
|
switch {
|
|
case err != nil:
|
|
if !errdefs.IsNotFound(err) {
|
|
return err
|
|
}
|
|
|
|
case info.CreatedAt.After(start):
|
|
p.updateFn(LayerPullProgress{
|
|
LayerID: stringid.TruncateID(j.Digest.Encoded()),
|
|
Status: LayerPullStatusDownloadComplete,
|
|
})
|
|
|
|
if images.IsLayerType(j.MediaType) {
|
|
p.layers = append(p.layers, j)
|
|
}
|
|
|
|
p.remove(j)
|
|
|
|
default:
|
|
p.updateFn(LayerPullProgress{
|
|
LayerID: stringid.TruncateID(j.Digest.Encoded()),
|
|
Status: LayerPullStatusAlreadyExists,
|
|
})
|
|
|
|
if images.IsLayerType(j.MediaType) {
|
|
p.layers = append(p.layers, j)
|
|
}
|
|
|
|
p.remove(j)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
//nolint:gocyclo
|
|
func (p *PullProgress) trackPulledLayers(ctx context.Context) error {
|
|
var committedIdx []int
|
|
|
|
for idx, desc := range p.layers {
|
|
walkFilter := "labels.\"" + snapshotters.TargetLayerDigestLabel + "\"==" + p.layers[idx].Digest.String()
|
|
|
|
err := p.snapshotter.Walk(ctx, func(ctx context.Context, sn snapshots.Info) error {
|
|
if sn.Kind == snapshots.KindActive {
|
|
if p.unpackStart == nil {
|
|
p.unpackStart = make(map[digest.Digest]time.Time)
|
|
}
|
|
|
|
var elapsed time.Duration
|
|
|
|
if began, ok := p.unpackStart[desc.Digest]; !ok {
|
|
p.unpackStart[desc.Digest] = time.Now()
|
|
} else {
|
|
elapsed = time.Since(began)
|
|
}
|
|
|
|
p.updateFn(LayerPullProgress{
|
|
LayerID: stringid.TruncateID(desc.Digest.Encoded()),
|
|
Status: LayerPullStatusExtracting,
|
|
Elapsed: elapsed,
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
if sn.Kind == snapshots.KindCommitted {
|
|
p.updateFn(
|
|
LayerPullProgress{
|
|
LayerID: stringid.TruncateID(desc.Digest.Encoded()),
|
|
Status: LayerPullStatusExtractComplete,
|
|
},
|
|
)
|
|
|
|
committedIdx = append(committedIdx, idx)
|
|
|
|
return nil
|
|
}
|
|
|
|
return nil
|
|
}, walkFilter)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// remove finished/committed layers from p.layers
|
|
if len(committedIdx) > 0 {
|
|
slices.Sort(committedIdx)
|
|
|
|
for i := range slices.Backward(committedIdx) {
|
|
if i < len(committedIdx)-1 && committedIdx[i] == committedIdx[i+1] {
|
|
continue
|
|
}
|
|
|
|
p.layers = slices.Delete(p.layers, committedIdx[i], committedIdx[i]+1)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Add adds new descriptors to be tracked.
|
|
func (p *PullProgress) Add(desc ...ocispec.Descriptor) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
for _, d := range desc {
|
|
if _, ok := p.descs[d.Digest]; ok {
|
|
continue
|
|
}
|
|
|
|
p.descs[d.Digest] = d
|
|
}
|
|
}
|
|
|
|
func (p *PullProgress) remove(desc ocispec.Descriptor) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
delete(p.descs, desc.Digest)
|
|
}
|
|
|
|
func (p *PullProgress) jobs() []ocispec.Descriptor {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
descs := make([]ocispec.Descriptor, 0, len(p.descs))
|
|
for _, d := range p.descs {
|
|
descs = append(descs, d)
|
|
}
|
|
|
|
return descs
|
|
}
|