using System; using System.Buffers; using System.Collections.Generic; using System.Linq; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; namespace Jellyfin.Database.Implementations; /// /// Contains helpers to partition EFCore queries. /// public static class QueryPartitionHelpers { /// /// Adds a callback to any directly following calls of Partition for every partition thats been invoked. /// /// The entity to load. /// The source query. /// The callback invoked for partition before enumerating items. /// The callback invoked for partition after enumerating items. /// A queryable that can be used to partition. public static ProgressablePartitionReporting WithPartitionProgress(this IOrderedQueryable query, Action? beginPartition = null, Action? endPartition = null) { var progressable = new ProgressablePartitionReporting(query); progressable.OnBeginPartition = beginPartition; progressable.OnEndPartition = endPartition; return progressable; } /// /// Adds a callback to any directly following calls of Partition for every item thats been invoked. /// /// The entity to load. /// The source query. /// The callback invoked for each item before processing. /// The callback invoked for each item after processing. /// A queryable that can be used to partition. public static ProgressablePartitionReporting WithItemProgress(this IOrderedQueryable query, Action? beginItem = null, Action? endItem = null) { var progressable = new ProgressablePartitionReporting(query); progressable.OnBeginItem = beginItem; progressable.OnEndItem = endItem; return progressable; } /// /// Adds a callback to any directly following calls of Partition for every partition thats been invoked. /// /// The entity to load. /// The source query. /// The callback invoked for partition before enumerating items. /// The callback invoked for partition after enumerating items. /// A queryable that can be used to partition. public static ProgressablePartitionReporting WithPartitionProgress(this ProgressablePartitionReporting progressable, Action? beginPartition = null, Action? endPartition = null) { progressable.OnBeginPartition = beginPartition; progressable.OnEndPartition = endPartition; return progressable; } /// /// Adds a callback to any directly following calls of Partition for every item thats been invoked. /// /// The entity to load. /// The source query. /// The callback invoked for each item before processing. /// The callback invoked for each item after processing. /// A queryable that can be used to partition. public static ProgressablePartitionReporting WithItemProgress(this ProgressablePartitionReporting progressable, Action? beginItem = null, Action? endItem = null) { progressable.OnBeginItem = beginItem; progressable.OnEndItem = endItem; return progressable; } /// /// Enumerates the source query by loading the entities in partitions in a lazy manner reading each item from the database as its requested. /// /// The entity to load. /// The source query. /// The number of elements to load per partition. /// The cancellation token. /// A enumerable representing the whole of the query. public static async IAsyncEnumerable PartitionAsync(this ProgressablePartitionReporting partitionInfo, int partitionSize, [EnumeratorCancellation] CancellationToken cancellationToken = default) { await foreach (var item in partitionInfo.Source.PartitionAsync(partitionSize, partitionInfo, cancellationToken).ConfigureAwait(false)) { yield return item; } } /// /// Enumerates the source query by loading the entities in partitions directly into memory. /// /// The entity to load. /// The source query. /// The number of elements to load per partition. /// The cancellation token. /// A enumerable representing the whole of the query. public static async IAsyncEnumerable PartitionEagerAsync(this ProgressablePartitionReporting partitionInfo, int partitionSize, [EnumeratorCancellation] CancellationToken cancellationToken = default) { await foreach (var item in partitionInfo.Source.PartitionEagerAsync(partitionSize, partitionInfo, cancellationToken).ConfigureAwait(false)) { yield return item; } } /// /// Enumerates the source query by loading the entities in partitions in a lazy manner reading each item from the database as its requested. /// /// The entity to load. /// The source query. /// The number of elements to load per partition. /// Reporting helper. /// The cancellation token. /// A enumerable representing the whole of the query. public static async IAsyncEnumerable PartitionAsync( this IOrderedQueryable query, int partitionSize, ProgressablePartitionReporting? progressablePartition = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) { var iterator = 0; int itemCounter; do { progressablePartition?.BeginPartition(iterator); itemCounter = 0; await foreach (var item in query .Skip(partitionSize * iterator) .Take(partitionSize) .AsAsyncEnumerable() .WithCancellation(cancellationToken) .ConfigureAwait(false)) { progressablePartition?.BeginItem(item, iterator, itemCounter); yield return item; progressablePartition?.EndItem(item, iterator, itemCounter); itemCounter++; } progressablePartition?.EndPartition(iterator); iterator++; } while (itemCounter == partitionSize && !cancellationToken.IsCancellationRequested); } /// /// Enumerates the source query by loading the entities in partitions directly into memory. /// /// The entity to load. /// The source query. /// The number of elements to load per partition. /// Reporting helper. /// The cancellation token. /// A enumerable representing the whole of the query. public static async IAsyncEnumerable PartitionEagerAsync( this IOrderedQueryable query, int partitionSize, ProgressablePartitionReporting? progressablePartition = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) { var iterator = 0; int itemCounter; var items = ArrayPool.Shared.Rent(partitionSize); try { do { progressablePartition?.BeginPartition(iterator); itemCounter = 0; await foreach (var item in query .Skip(partitionSize * iterator) .Take(partitionSize) .AsAsyncEnumerable() .WithCancellation(cancellationToken) .ConfigureAwait(false)) { items[itemCounter++] = item; } for (int i = 0; i < itemCounter; i++) { progressablePartition?.BeginItem(items[i], iterator, itemCounter); yield return items[i]; progressablePartition?.EndItem(items[i], iterator, itemCounter); } progressablePartition?.EndPartition(iterator); iterator++; } while (itemCounter == partitionSize && !cancellationToken.IsCancellationRequested); } finally { ArrayPool.Shared.Return(items); } } /// /// Adds an Index to the enumeration of the async enumerable. /// /// The entity to load. /// The source query. /// The source list with an index added. public static async IAsyncEnumerable<(TEntity Item, int Index)> WithIndex(this IAsyncEnumerable query) { var index = 0; await foreach (var item in query.ConfigureAwait(false)) { yield return (item, index++); } } }