< Summary - erichiller/mkmrk.Channels coverage

Information
Class: mkmrk.Channels.ChannelMux.ChannelMuxInput<TData>
Assembly: mkmrk.Channels
File(s): /home/runner/work/mkmrk.Channels/mkmrk.Channels/src/mkmrk.Channels/ChannelMux/ChannelMux.cs
Tag: 161_8859726157
Line coverage
89%
Covered lines: 103
Uncovered lines: 12
Coverable lines: 115
Total lines: 359
Line coverage: 89.5%
Branch coverage
85%
Covered branches: 48
Total branches: 56
Branch coverage: 85.7%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Coverage history

Metrics

MethodBranch coverage Cyclomatic complexity Line coverage
.ctor(...)100%1100%
TryWrite(...)100%12100%
WaitToWriteAsync(...)0%60%
TryComplete(...)95%2097.37%
TryRead(...)100%12100%
WriteAsync(...)75%4100%
get_IsEmpty()100%10%
get_IsComplete()100%1100%
get_IsClosed()100%1100%
GetEnumerator()100%1100%
System.Collections.IEnumerable.GetEnumerator()100%10%
Dispose()100%2100%

File(s)

/home/runner/work/mkmrk.Channels/mkmrk.Channels/src/mkmrk.Channels/ChannelMux/ChannelMux.cs

#LineLine coverage
 1/*
 2 * Some of the below is modified code from
 3 * https://github.com/dotnet/runtime/blob/main/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Sin
 4 */
 5
 6using System;
 7using System.Collections;
 8using System.Collections.Concurrent;
 9using System.Collections.Generic;
 10using System.Diagnostics.CodeAnalysis;
 11using System.Threading;
 12using System.Threading.Channels;
 13using System.Threading.Tasks;
 14
 15// ReSharper disable MemberCanBePrivate.Global
 16
 17namespace mkmrk.Channels;
 18
 19/// <summary>
 20/// <see cref="ChannelMux"/> is meant to aggregate multiple <see cref="BroadcastChannel{T}"/> into a single, awaitable o
 21/// It is a generic type and each type parameter has a dedicated <c>TryRead(out T data)</c> method.
 22/// <see cref="ChannelMuxInput{T}"/> acts presents as a writer to <see cref="BroadcastChannelWriter{T}"/> and each has a
 23/// <see cref="SingleProducerSingleConsumerQueue{T}"/>.
 24/// </summary>
 25/// <remarks>
 26/// Note that each <see cref="ChannelMuxInput{T}"/> is a single input, single output where <i>single</i> means both a si
 27/// and a single instance reading, and thus can be optimized using <see cref="SingleProducerSingleConsumerQueue{T}"/>.
 28/// </remarks>
 29public abstract class ChannelMux {
 30    /// <summary>A waiting reader (e.g. WaitForReadAsync) if there is one.</summary>
 31    private AsyncOperation<bool>? _waitingReader;
 32    private volatile bool                 _isReaderWaiting = false;
 33    private readonly AsyncOperation<bool> _waiterSingleton;
 34    private readonly bool                 _runContinuationsAsynchronously;
 35    private readonly object               _waiterLockObj                    = new ();
 36    private readonly object               _closedChannelLockObj             = new ();
 37    private          Exception?           _completeException                = null;
 38    private          Type?                _completeExceptionChannelDataType = null;
 39    private volatile bool                 _hasException                     = false;
 40    private volatile int                  _readableItems                    = 0;
 41    private volatile int                  _closedChannels                   = 0;
 42    private readonly int                  _totalChannels;
 43    private          bool                 _areAllChannelsComplete => _closedChannels >= _totalChannels;
 44
 45    /// <summary>Task that indicates the channel has completed.</summary>
 46    private TaskCompletionSource _completion;
 47
 48    /// <inheritdoc cref="System.Threading.Channels.ChannelReader{T}.Completion"/>
 49    public Task Completion => _completion.Task;
 50
 51    private TaskCompletionSource createCompletionTask( ) => new TaskCompletionSource( _runContinuationsAsynchronously ? 
 52
 53    /// <summary>
 54    /// Common functionality for <see cref="ChannelMux{T1,T2}.ReplaceChannel(mkmrk.Channels.IBroadcastChannelAddReaderPr
 55    /// </summary>
 56    /// <param name="muxInput"></param>
 57    /// <typeparam name="TData"></typeparam>
 58    protected void resetOneChannel<TData>( ChannelMuxInput<TData> muxInput ) {
 59        ArgumentNullException.ThrowIfNull( muxInput );
 60        if ( muxInput.IsClosed ) {
 61            Interlocked.Decrement( ref _closedChannels );
 62        }
 63        if ( _completion.Task.IsCompleted ) {
 64            _completion = createCompletionTask();
 65        }
 66        if ( _completeExceptionChannelDataType == typeof(TData) ) {
 67            _completeException = null;
 68            _hasException      = false;
 69        }
 70    }
 71
 72    /// <summary>
 73    /// Return <c>Exception</c> if the entire <see cref="ChannelMux"/> and all associated ChannelReaders (<see cref="Cha
 74    /// <list type="bullet">
 75    ///     <item>
 76    ///         <description>Exits <see cref="WaitToReadAsync"/> and then returns <see cref="ChannelMux._completeExcepti
 77    ///     </item>
 78    ///     <item>
 79    ///         <description>Ends <see cref="Completion"/> Task (once all items have been read).</description>
 80    ///     </item>
 81    ///     <item>
 82    ///         <description><see cref="ChannelMuxInput{T}.TryWrite"/> for any other channels is closed (immediately).</
 83    ///     </item>
 84    /// </list>
 85    /// </summary>
 86    /// <remarks>
 87    /// Note that <see cref="ChannelMuxInput{TData}.TryRead"/>  will still be allowed until the queue is empty, but beca
 88    /// </remarks>
 89    public delegate Exception? ChannelCompleteHandler( Type reportingChannelType, Exception? exception );
 90
 91    /// <inheritdoc cref="ChannelCompleteHandler" />
 92    public ChannelCompleteHandler? OnChannelComplete { get; init; }
 93
 94    /// <inheritdoc cref="ChannelMux" />
 95    protected ChannelMux( int totalChannels, bool runContinuationsAsynchronously = true ) {
 96        _runContinuationsAsynchronously = runContinuationsAsynchronously;
 97        _completion                     = createCompletionTask();
 98        _waiterSingleton                = new AsyncOperation<bool>( runContinuationsAsynchronously, pooled: true );
 99        _totalChannels                  = totalChannels;
 100    }
 101
 102    /// <inheritdoc cref="System.Threading.Channels.ChannelReader{T}.WaitToReadAsync"/>
 103    public ValueTask<bool> WaitToReadAsync( CancellationToken cancellationToken ) {
 104        _isReaderWaiting = false;
 105        if ( cancellationToken.IsCancellationRequested ) {
 106            return new ValueTask<bool>( Task.FromCanceled<bool>( cancellationToken ) );
 107        }
 108
 109        if ( _hasException && _completeException is { } ) {
 110            // if an exception is present, return a cancelled ValueTask with the exception.
 111            return new ValueTask<bool>( Task.FromException<bool>( _completeException ) );
 112        }
 113
 114        // Outside of the lock, check if there are any items waiting to be read.  If there are, we're done.
 115        if ( _readableItems > 0 ) {
 116            return new ValueTask<bool>( true );
 117        }
 118        AsyncOperation<bool>? oldWaitingReader, newWaitingReader;
 119        lock ( _waiterLockObj ) {
 120            // Again while holding the lock, check to see if there are any items available.
 121            if ( _readableItems > 0 ) {
 122                return new ValueTask<bool>( true );
 123            }
 124            // There aren't any items; if we're done writing, there never will be more items.
 125            if ( _areAllChannelsComplete ) {
 126                // if an exception is present, return a cancelled ValueTask with the exception.
 127                return _completeException is { } exception ? new ValueTask<bool>( Task.FromException<bool>( exception ) 
 128            }
 129            // Try to use the singleton waiter.  If it's currently being used, then the channel
 130            // is being used erroneously, and we cancel the outstanding operation.
 131            oldWaitingReader = _waitingReader;
 132            if ( !cancellationToken.CanBeCanceled && _waiterSingleton.TryOwnAndReset() ) {
 133                newWaitingReader = _waiterSingleton;
 134                if ( newWaitingReader == oldWaitingReader ) {
 135                    // The previous operation completed, so null out the "old" waiter
 136                    // so we don't end up canceling the new operation.
 137                    oldWaitingReader = null;
 138                }
 139            } else {
 140                newWaitingReader = new AsyncOperation<bool>( _runContinuationsAsynchronously, cancellationToken ); // TO
 141            }
 142            _isReaderWaiting = true;
 143            _waitingReader   = newWaitingReader;
 144        }
 145
 146        if ( _readableItems > 0 ) {
 147            return new ValueTask<bool>( true );
 148        }
 149
 150        oldWaitingReader?.TrySetCanceled( default );
 151        return newWaitingReader.ValueTaskOfT;
 152    }
 153
 154    /*
 155     * ChannelMuxInput
 156     */
 157
 158    /// <summary>
 159    /// <see cref="ChannelMuxInput{T}"/> acts presents as a writer to <see cref="BroadcastChannelWriter{T}"/> and each h
 160    /// <see cref="SingleProducerSingleConsumerQueue{T}"/>.
 161    /// </summary>
 162    protected sealed class ChannelMuxInput<TData> : ChannelWriter<TData>, IDisposable, IEnumerable<TData> {
 163        private readonly ChannelMux                               _parent;
 164        private readonly RemoveWriterByHashCode                   _removeWriterCallback;
 2165        private readonly SingleProducerSingleConsumerQueue<TData> _queue            = new SingleProducerSingleConsumerQu
 2166        private volatile bool                                     _isComplete       = false;
 2167        private volatile bool                                     _emptyAndComplete = false;
 2168        private volatile bool                                     _isClosed         = false; // set once the parent's _c
 169
 2170        internal ChannelMuxInput( IBroadcastChannelAddReaderProvider<TData> channel, ChannelMux parent ) {
 2171            _removeWriterCallback = channel.AddReader( this );
 2172            _parent               = parent;
 2173        }
 174
 175        /// <inheritdoc />
 2176        public override bool TryWrite( TData item ) {
 2177            if ( _isComplete || _parent._hasException ) {
 2178                return false;
 179            }
 180
 2181            _queue.Enqueue( item );
 2182            Interlocked.Increment( ref _parent._readableItems );
 2183            if ( !_parent._isReaderWaiting ) {
 2184                return true;
 185            }
 2186            AsyncOperation<bool>? waitingReader = null;
 2187            if ( Monitor.TryEnter( _parent._waiterLockObj ) ) {
 2188                try {
 2189                    waitingReader = _parent._waitingReader;
 2190                    if ( waitingReader == null ) {
 2191                        return true;
 192                    }
 2193                    _parent._isReaderWaiting = false;
 2194                    _parent._waitingReader   = null;
 2195                } finally {
 196                    // Ensure that the lock is released.
 2197                    Monitor.Exit( _parent._waiterLockObj );
 2198                }
 2199            }
 2200            if ( waitingReader != null ) {
 201                // Waiting reader is present, set its result so that it ends and the waiting reader continues.
 2202                waitingReader.TrySetResult( item: true );
 2203            }
 2204            return true;
 2205        }
 206
 207        /// <inheritdoc />
 208        /// <remarks>
 209        /// This will always return immediately.
 210        /// </remarks>
 0211        public override ValueTask<bool> WaitToWriteAsync( CancellationToken cancellationToken = new CancellationToken() 
 0212            Exception? completeException = _parent._completeException; // URGENT: maybe setting to a local is something 
 0213            return cancellationToken.IsCancellationRequested ? new ValueTask<bool>( Task.FromCanceled<bool>( cancellatio
 0214                !_isComplete                                 ? new ValueTask<bool>( true ) :
 0215                completeException is { }                     ? new ValueTask<bool>( Task.FromException<bool>( completeEx
 0216                                                               default;
 0217        }
 218
 219
 220        /// <inheritdoc />
 221        /// <remarks>
 222        /// Any waiting readers will only be exited if the queue is empty.
 223        /// </remarks>
 2224        public override bool TryComplete( Exception? error = null ) {
 2225            AsyncOperation<bool>? waitingReader = null;
 226
 227            // If we're already marked as complete, there's nothing more to do.
 2228            if ( _isComplete ) {
 2229                return false;
 230            }
 231
 232            // allow the user to ignore or modify the Exception
 2233            error = _parent.OnChannelComplete?.Invoke( typeof(TData), error );
 2234            error?.Data.Add( nameof(ChannelMux) + " Type", typeof(TData) );
 2235            if ( error is { } ) {
 2236                _parent._hasException = true;
 2237                Interlocked.Exchange( ref this._parent._completeException, error );
 2238                Interlocked.Exchange( ref _parent._completeExceptionChannelDataType, typeof(TData) );
 2239            }
 240
 2241            lock ( _parent._closedChannelLockObj ) {
 242                // Mark as complete for writing.
 2243                _isComplete = true;
 2244                if ( !_queue.IsEmpty ) {
 2245                    return true;
 246                }
 2247                _emptyAndComplete = true;
 2248                Interlocked.Increment( ref _parent._closedChannels );
 2249                _isClosed = true;
 2250            }
 251            // if all channels are closed, or if this complete was reported with an exception, close everything so long 
 2252            if ( ( _parent._closedChannels >= _parent._totalChannels || error is { } ) ) {
 253                // If we have no more items remaining, then the channel needs to be marked as completed
 254                // and readers need to be informed they'll never get another item.  All of that needs
 255                // to happen outside of the lock to avoid invoking continuations under the lock.
 2256                lock ( _parent._waiterLockObj ) {
 2257                    if ( _parent._waitingReader != null ) {
 2258                        waitingReader            = _parent._waitingReader;
 2259                        _parent._waitingReader   = null;
 2260                        _parent._isReaderWaiting = false;
 2261                    }
 2262                }
 2263                ChannelUtilities.Complete( _parent._completion, error );
 264                // Complete a waiting reader if there is one (this is only encountered when _queue.IsEmpty is true
 2265                if ( waitingReader != null ) {
 2266                    if ( error != null ) {
 0267                        waitingReader.TrySetException( error );
 2268                    } else {
 2269                        waitingReader.TrySetResult( item: false );
 2270                    }
 2271                }
 2272            }
 273
 274            // Successfully completed the channel
 2275            return true;
 2276        }
 277
 278        /// <inheritdoc cref="ChannelReader{T}.TryRead" />
 279        [ SuppressMessage( "ReSharper", "RedundantNullableFlowAttribute" ) ]
 2280        public bool TryRead( [ MaybeNullWhen( false ) ] out TData? item ) {
 2281            if ( _queue.TryDequeue( out item ) ) {
 2282                Interlocked.Decrement( ref _parent._readableItems );
 2283                if ( _isComplete ) {
 2284                    lock ( _parent._closedChannelLockObj ) {
 2285                        if ( !_queue.IsEmpty || _emptyAndComplete ) {
 2286                            return true;
 287                        }
 2288                        _emptyAndComplete = true;
 2289                        Interlocked.Increment( ref _parent._closedChannels );
 2290                        _isClosed = true;
 2291                    }
 2292                    if ( _parent._areAllChannelsComplete || _parent._hasException ) {
 2293                        ChannelUtilities.Complete( _parent._completion, _parent._completeException );
 2294                    }
 2295                }
 2296                return true;
 297            }
 2298            return false;
 2299        }
 300
 301        /// <inheritdoc />
 302        public override ValueTask WriteAsync( TData item, CancellationToken cancellationToken = default ) =>
 303            // Writing always succeeds (unless we've already completed writing or cancellation has been requested),
 304            // so just TryWrite and return a completed task.
 2305            cancellationToken.IsCancellationRequested ? new ValueTask( Task.FromCanceled( cancellationToken ) ) :
 2306            TryWrite( item )                          ? default :
 2307                                                        new ValueTask( Task.FromException( ChannelUtilities.CreateInvali
 308
 309
 310        /*
 311         * IEnumerable implementation
 312         */
 313
 314        /// <inheritdoc cref="P:BroadcastChannelMux.SingleProducerSingleConsumerQueue`1.IsEmpty" />
 0315        public bool IsEmpty => this._queue.IsEmpty;
 316
 317        /// <summary>
 318        /// Whether the Channel is has had <see cref="ChannelMuxInput{TData}.TryComplete"/> called.
 319        /// </summary>
 2320        public bool IsComplete => this._isComplete;
 321
 322        /// <summary>
 323        /// Whether when the input has incremented its parent's <see cref="ChannelMux._closedChannels"/>.
 324        /// </summary>
 325        public bool IsClosed {
 2326            get {
 2327                lock ( _parent._closedChannelLockObj ) {
 2328                    return this._isClosed;
 329                }
 2330            }
 331        }
 332
 333        /*
 334         * IEnumerable implementation
 335         */
 336
 337        /// <inheritdoc />
 2338        public IEnumerator<TData> GetEnumerator( ) => this._queue.GetEnumerator();
 339
 340        /// <inheritdoc />
 0341        IEnumerator IEnumerable.GetEnumerator( ) {
 0342            return GetEnumerator();
 0343        }
 344
 345        /*
 346         * IDisposable implementation
 347         */
 348
 2349        private bool _isDisposed = false;
 350
 351        /// <inheritdoc />
 2352        public void Dispose( ) {
 2353            if ( !_isDisposed ) {
 2354                _removeWriterCallback.Invoke( this.GetHashCode() );
 2355                _isDisposed = true;
 2356            }
 2357        }
 358    }
 359}