| | 1 | | /* |
| | 2 | | * Some of the below is modified code from |
| | 3 | | * https://github.com/dotnet/runtime/blob/main/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Sin |
| | 4 | | */ |
| | 5 | |
|
| | 6 | | using System; |
| | 7 | | using System.Collections; |
| | 8 | | using System.Collections.Concurrent; |
| | 9 | | using System.Collections.Generic; |
| | 10 | | using System.Diagnostics.CodeAnalysis; |
| | 11 | | using System.Threading; |
| | 12 | | using System.Threading.Channels; |
| | 13 | | using System.Threading.Tasks; |
| | 14 | |
|
| | 15 | | // ReSharper disable MemberCanBePrivate.Global |
| | 16 | |
|
| | 17 | | namespace mkmrk.Channels; |
| | 18 | |
|
| | 19 | | /// <summary> |
| | 20 | | /// <see cref="ChannelMux"/> is meant to aggregate multiple <see cref="BroadcastChannel{T}"/> into a single, awaitable o |
| | 21 | | /// It is a generic type and each type parameter has a dedicated <c>TryRead(out T data)</c> method. |
| | 22 | | /// <see cref="ChannelMuxInput{T}"/> acts presents as a writer to <see cref="BroadcastChannelWriter{T}"/> and each has a |
| | 23 | | /// <see cref="SingleProducerSingleConsumerQueue{T}"/>. |
| | 24 | | /// </summary> |
| | 25 | | /// <remarks> |
| | 26 | | /// Note that each <see cref="ChannelMuxInput{T}"/> is a single input, single output where <i>single</i> means both a si |
| | 27 | | /// and a single instance reading, and thus can be optimized using <see cref="SingleProducerSingleConsumerQueue{T}"/>. |
| | 28 | | /// </remarks> |
| | 29 | | public abstract class ChannelMux { |
| | 30 | | /// <summary>A waiting reader (e.g. WaitForReadAsync) if there is one.</summary> |
| | 31 | | private AsyncOperation<bool>? _waitingReader; |
| 2 | 32 | | private volatile bool _isReaderWaiting = false; |
| | 33 | | private readonly AsyncOperation<bool> _waiterSingleton; |
| | 34 | | private readonly bool _runContinuationsAsynchronously; |
| 2 | 35 | | private readonly object _waiterLockObj = new (); |
| 2 | 36 | | private readonly object _closedChannelLockObj = new (); |
| 2 | 37 | | private Exception? _completeException = null; |
| 2 | 38 | | private Type? _completeExceptionChannelDataType = null; |
| 2 | 39 | | private volatile bool _hasException = false; |
| 2 | 40 | | private volatile int _readableItems = 0; |
| 2 | 41 | | private volatile int _closedChannels = 0; |
| | 42 | | private readonly int _totalChannels; |
| 2 | 43 | | private bool _areAllChannelsComplete => _closedChannels >= _totalChannels; |
| | 44 | |
|
| | 45 | | /// <summary>Task that indicates the channel has completed.</summary> |
| | 46 | | private TaskCompletionSource _completion; |
| | 47 | |
|
| | 48 | | /// <inheritdoc cref="System.Threading.Channels.ChannelReader{T}.Completion"/> |
| 2 | 49 | | public Task Completion => _completion.Task; |
| | 50 | |
|
| 2 | 51 | | private TaskCompletionSource createCompletionTask( ) => new TaskCompletionSource( _runContinuationsAsynchronously ? |
| | 52 | |
|
| | 53 | | /// <summary> |
| | 54 | | /// Common functionality for <see cref="ChannelMux{T1,T2}.ReplaceChannel(mkmrk.Channels.IBroadcastChannelAddReaderPr |
| | 55 | | /// </summary> |
| | 56 | | /// <param name="muxInput"></param> |
| | 57 | | /// <typeparam name="TData"></typeparam> |
| 2 | 58 | | protected void resetOneChannel<TData>( ChannelMuxInput<TData> muxInput ) { |
| 2 | 59 | | ArgumentNullException.ThrowIfNull( muxInput ); |
| 2 | 60 | | if ( muxInput.IsClosed ) { |
| 2 | 61 | | Interlocked.Decrement( ref _closedChannels ); |
| 2 | 62 | | } |
| 2 | 63 | | if ( _completion.Task.IsCompleted ) { |
| 2 | 64 | | _completion = createCompletionTask(); |
| 2 | 65 | | } |
| 2 | 66 | | if ( _completeExceptionChannelDataType == typeof(TData) ) { |
| 0 | 67 | | _completeException = null; |
| 0 | 68 | | _hasException = false; |
| 0 | 69 | | } |
| 2 | 70 | | } |
| | 71 | |
|
| | 72 | | /// <summary> |
| | 73 | | /// Return <c>Exception</c> if the entire <see cref="ChannelMux"/> and all associated ChannelReaders (<see cref="Cha |
| | 74 | | /// <list type="bullet"> |
| | 75 | | /// <item> |
| | 76 | | /// <description>Exits <see cref="WaitToReadAsync"/> and then returns <see cref="ChannelMux._completeExcepti |
| | 77 | | /// </item> |
| | 78 | | /// <item> |
| | 79 | | /// <description>Ends <see cref="Completion"/> Task (once all items have been read).</description> |
| | 80 | | /// </item> |
| | 81 | | /// <item> |
| | 82 | | /// <description><see cref="ChannelMuxInput{T}.TryWrite"/> for any other channels is closed (immediately).</ |
| | 83 | | /// </item> |
| | 84 | | /// </list> |
| | 85 | | /// </summary> |
| | 86 | | /// <remarks> |
| | 87 | | /// Note that <see cref="ChannelMuxInput{TData}.TryRead"/> will still be allowed until the queue is empty, but beca |
| | 88 | | /// </remarks> |
| | 89 | | public delegate Exception? ChannelCompleteHandler( Type reportingChannelType, Exception? exception ); |
| | 90 | |
|
| | 91 | | /// <inheritdoc cref="ChannelCompleteHandler" /> |
| 2 | 92 | | public ChannelCompleteHandler? OnChannelComplete { get; init; } |
| | 93 | |
|
| | 94 | | /// <inheritdoc cref="ChannelMux" /> |
| 2 | 95 | | protected ChannelMux( int totalChannels, bool runContinuationsAsynchronously = true ) { |
| 2 | 96 | | _runContinuationsAsynchronously = runContinuationsAsynchronously; |
| 2 | 97 | | _completion = createCompletionTask(); |
| 2 | 98 | | _waiterSingleton = new AsyncOperation<bool>( runContinuationsAsynchronously, pooled: true ); |
| 2 | 99 | | _totalChannels = totalChannels; |
| 2 | 100 | | } |
| | 101 | |
|
| | 102 | | /// <inheritdoc cref="System.Threading.Channels.ChannelReader{T}.WaitToReadAsync"/> |
| 2 | 103 | | public ValueTask<bool> WaitToReadAsync( CancellationToken cancellationToken ) { |
| 2 | 104 | | _isReaderWaiting = false; |
| 2 | 105 | | if ( cancellationToken.IsCancellationRequested ) { |
| 2 | 106 | | return new ValueTask<bool>( Task.FromCanceled<bool>( cancellationToken ) ); |
| | 107 | | } |
| | 108 | |
|
| 2 | 109 | | if ( _hasException && _completeException is { } ) { |
| | 110 | | // if an exception is present, return a cancelled ValueTask with the exception. |
| 2 | 111 | | return new ValueTask<bool>( Task.FromException<bool>( _completeException ) ); |
| | 112 | | } |
| | 113 | |
|
| | 114 | | // Outside of the lock, check if there are any items waiting to be read. If there are, we're done. |
| 2 | 115 | | if ( _readableItems > 0 ) { |
| 2 | 116 | | return new ValueTask<bool>( true ); |
| | 117 | | } |
| | 118 | | AsyncOperation<bool>? oldWaitingReader, newWaitingReader; |
| 2 | 119 | | lock ( _waiterLockObj ) { |
| | 120 | | // Again while holding the lock, check to see if there are any items available. |
| 2 | 121 | | if ( _readableItems > 0 ) { |
| 2 | 122 | | return new ValueTask<bool>( true ); |
| | 123 | | } |
| | 124 | | // There aren't any items; if we're done writing, there never will be more items. |
| 2 | 125 | | if ( _areAllChannelsComplete ) { |
| | 126 | | // if an exception is present, return a cancelled ValueTask with the exception. |
| 2 | 127 | | return _completeException is { } exception ? new ValueTask<bool>( Task.FromException<bool>( exception ) |
| | 128 | | } |
| | 129 | | // Try to use the singleton waiter. If it's currently being used, then the channel |
| | 130 | | // is being used erroneously, and we cancel the outstanding operation. |
| 2 | 131 | | oldWaitingReader = _waitingReader; |
| 2 | 132 | | if ( !cancellationToken.CanBeCanceled && _waiterSingleton.TryOwnAndReset() ) { |
| 2 | 133 | | newWaitingReader = _waiterSingleton; |
| 2 | 134 | | if ( newWaitingReader == oldWaitingReader ) { |
| | 135 | | // The previous operation completed, so null out the "old" waiter |
| | 136 | | // so we don't end up canceling the new operation. |
| 0 | 137 | | oldWaitingReader = null; |
| 0 | 138 | | } |
| 2 | 139 | | } else { |
| 2 | 140 | | newWaitingReader = new AsyncOperation<bool>( _runContinuationsAsynchronously, cancellationToken ); // TO |
| 2 | 141 | | } |
| 2 | 142 | | _isReaderWaiting = true; |
| 2 | 143 | | _waitingReader = newWaitingReader; |
| 2 | 144 | | } |
| | 145 | |
|
| 2 | 146 | | if ( _readableItems > 0 ) { |
| 2 | 147 | | return new ValueTask<bool>( true ); |
| | 148 | | } |
| | 149 | |
|
| 2 | 150 | | oldWaitingReader?.TrySetCanceled( default ); |
| 2 | 151 | | return newWaitingReader.ValueTaskOfT; |
| 2 | 152 | | } |
| | 153 | |
|
| | 154 | | /* |
| | 155 | | * ChannelMuxInput |
| | 156 | | */ |
| | 157 | |
|
| | 158 | | /// <summary> |
| | 159 | | /// <see cref="ChannelMuxInput{T}"/> acts presents as a writer to <see cref="BroadcastChannelWriter{T}"/> and each h |
| | 160 | | /// <see cref="SingleProducerSingleConsumerQueue{T}"/>. |
| | 161 | | /// </summary> |
| | 162 | | protected sealed class ChannelMuxInput<TData> : ChannelWriter<TData>, IDisposable, IEnumerable<TData> { |
| | 163 | | private readonly ChannelMux _parent; |
| | 164 | | private readonly RemoveWriterByHashCode _removeWriterCallback; |
| 2 | 165 | | private readonly SingleProducerSingleConsumerQueue<TData> _queue = new SingleProducerSingleConsumerQu |
| 2 | 166 | | private volatile bool _isComplete = false; |
| 2 | 167 | | private volatile bool _emptyAndComplete = false; |
| 2 | 168 | | private volatile bool _isClosed = false; // set once the parent's _c |
| | 169 | |
|
| 2 | 170 | | internal ChannelMuxInput( IBroadcastChannelAddReaderProvider<TData> channel, ChannelMux parent ) { |
| 2 | 171 | | _removeWriterCallback = channel.AddReader( this ); |
| 2 | 172 | | _parent = parent; |
| 2 | 173 | | } |
| | 174 | |
|
| | 175 | | /// <inheritdoc /> |
| 2 | 176 | | public override bool TryWrite( TData item ) { |
| 2 | 177 | | if ( _isComplete || _parent._hasException ) { |
| 2 | 178 | | return false; |
| | 179 | | } |
| | 180 | |
|
| 2 | 181 | | _queue.Enqueue( item ); |
| 2 | 182 | | Interlocked.Increment( ref _parent._readableItems ); |
| 2 | 183 | | if ( !_parent._isReaderWaiting ) { |
| 2 | 184 | | return true; |
| | 185 | | } |
| 2 | 186 | | AsyncOperation<bool>? waitingReader = null; |
| 2 | 187 | | if ( Monitor.TryEnter( _parent._waiterLockObj ) ) { |
| 2 | 188 | | try { |
| 2 | 189 | | waitingReader = _parent._waitingReader; |
| 2 | 190 | | if ( waitingReader == null ) { |
| 2 | 191 | | return true; |
| | 192 | | } |
| 2 | 193 | | _parent._isReaderWaiting = false; |
| 2 | 194 | | _parent._waitingReader = null; |
| 2 | 195 | | } finally { |
| | 196 | | // Ensure that the lock is released. |
| 2 | 197 | | Monitor.Exit( _parent._waiterLockObj ); |
| 2 | 198 | | } |
| 2 | 199 | | } |
| 2 | 200 | | if ( waitingReader != null ) { |
| | 201 | | // Waiting reader is present, set its result so that it ends and the waiting reader continues. |
| 2 | 202 | | waitingReader.TrySetResult( item: true ); |
| 2 | 203 | | } |
| 2 | 204 | | return true; |
| 2 | 205 | | } |
| | 206 | |
|
| | 207 | | /// <inheritdoc /> |
| | 208 | | /// <remarks> |
| | 209 | | /// This will always return immediately. |
| | 210 | | /// </remarks> |
| 0 | 211 | | public override ValueTask<bool> WaitToWriteAsync( CancellationToken cancellationToken = new CancellationToken() |
| 0 | 212 | | Exception? completeException = _parent._completeException; // URGENT: maybe setting to a local is something |
| 0 | 213 | | return cancellationToken.IsCancellationRequested ? new ValueTask<bool>( Task.FromCanceled<bool>( cancellatio |
| 0 | 214 | | !_isComplete ? new ValueTask<bool>( true ) : |
| 0 | 215 | | completeException is { } ? new ValueTask<bool>( Task.FromException<bool>( completeEx |
| 0 | 216 | | default; |
| 0 | 217 | | } |
| | 218 | |
|
| | 219 | |
|
| | 220 | | /// <inheritdoc /> |
| | 221 | | /// <remarks> |
| | 222 | | /// Any waiting readers will only be exited if the queue is empty. |
| | 223 | | /// </remarks> |
| 2 | 224 | | public override bool TryComplete( Exception? error = null ) { |
| 2 | 225 | | AsyncOperation<bool>? waitingReader = null; |
| | 226 | |
|
| | 227 | | // If we're already marked as complete, there's nothing more to do. |
| 2 | 228 | | if ( _isComplete ) { |
| 2 | 229 | | return false; |
| | 230 | | } |
| | 231 | |
|
| | 232 | | // allow the user to ignore or modify the Exception |
| 2 | 233 | | error = _parent.OnChannelComplete?.Invoke( typeof(TData), error ); |
| 2 | 234 | | error?.Data.Add( nameof(ChannelMux) + " Type", typeof(TData) ); |
| 2 | 235 | | if ( error is { } ) { |
| 2 | 236 | | _parent._hasException = true; |
| 2 | 237 | | Interlocked.Exchange( ref this._parent._completeException, error ); |
| 2 | 238 | | Interlocked.Exchange( ref _parent._completeExceptionChannelDataType, typeof(TData) ); |
| 2 | 239 | | } |
| | 240 | |
|
| 2 | 241 | | lock ( _parent._closedChannelLockObj ) { |
| | 242 | | // Mark as complete for writing. |
| 2 | 243 | | _isComplete = true; |
| 2 | 244 | | if ( !_queue.IsEmpty ) { |
| 2 | 245 | | return true; |
| | 246 | | } |
| 2 | 247 | | _emptyAndComplete = true; |
| 2 | 248 | | Interlocked.Increment( ref _parent._closedChannels ); |
| 2 | 249 | | _isClosed = true; |
| 2 | 250 | | } |
| | 251 | | // if all channels are closed, or if this complete was reported with an exception, close everything so long |
| 2 | 252 | | if ( ( _parent._closedChannels >= _parent._totalChannels || error is { } ) ) { |
| | 253 | | // If we have no more items remaining, then the channel needs to be marked as completed |
| | 254 | | // and readers need to be informed they'll never get another item. All of that needs |
| | 255 | | // to happen outside of the lock to avoid invoking continuations under the lock. |
| 2 | 256 | | lock ( _parent._waiterLockObj ) { |
| 2 | 257 | | if ( _parent._waitingReader != null ) { |
| 2 | 258 | | waitingReader = _parent._waitingReader; |
| 2 | 259 | | _parent._waitingReader = null; |
| 2 | 260 | | _parent._isReaderWaiting = false; |
| 2 | 261 | | } |
| 2 | 262 | | } |
| 2 | 263 | | ChannelUtilities.Complete( _parent._completion, error ); |
| | 264 | | // Complete a waiting reader if there is one (this is only encountered when _queue.IsEmpty is true |
| 2 | 265 | | if ( waitingReader != null ) { |
| 2 | 266 | | if ( error != null ) { |
| 0 | 267 | | waitingReader.TrySetException( error ); |
| 2 | 268 | | } else { |
| 2 | 269 | | waitingReader.TrySetResult( item: false ); |
| 2 | 270 | | } |
| 2 | 271 | | } |
| 2 | 272 | | } |
| | 273 | |
|
| | 274 | | // Successfully completed the channel |
| 2 | 275 | | return true; |
| 2 | 276 | | } |
| | 277 | |
|
| | 278 | | /// <inheritdoc cref="ChannelReader{T}.TryRead" /> |
| | 279 | | [ SuppressMessage( "ReSharper", "RedundantNullableFlowAttribute" ) ] |
| 2 | 280 | | public bool TryRead( [ MaybeNullWhen( false ) ] out TData? item ) { |
| 2 | 281 | | if ( _queue.TryDequeue( out item ) ) { |
| 2 | 282 | | Interlocked.Decrement( ref _parent._readableItems ); |
| 2 | 283 | | if ( _isComplete ) { |
| 2 | 284 | | lock ( _parent._closedChannelLockObj ) { |
| 2 | 285 | | if ( !_queue.IsEmpty || _emptyAndComplete ) { |
| 2 | 286 | | return true; |
| | 287 | | } |
| 2 | 288 | | _emptyAndComplete = true; |
| 2 | 289 | | Interlocked.Increment( ref _parent._closedChannels ); |
| 2 | 290 | | _isClosed = true; |
| 2 | 291 | | } |
| 2 | 292 | | if ( _parent._areAllChannelsComplete || _parent._hasException ) { |
| 2 | 293 | | ChannelUtilities.Complete( _parent._completion, _parent._completeException ); |
| 2 | 294 | | } |
| 2 | 295 | | } |
| 2 | 296 | | return true; |
| | 297 | | } |
| 2 | 298 | | return false; |
| 2 | 299 | | } |
| | 300 | |
|
| | 301 | | /// <inheritdoc /> |
| | 302 | | public override ValueTask WriteAsync( TData item, CancellationToken cancellationToken = default ) => |
| | 303 | | // Writing always succeeds (unless we've already completed writing or cancellation has been requested), |
| | 304 | | // so just TryWrite and return a completed task. |
| 2 | 305 | | cancellationToken.IsCancellationRequested ? new ValueTask( Task.FromCanceled( cancellationToken ) ) : |
| 2 | 306 | | TryWrite( item ) ? default : |
| 2 | 307 | | new ValueTask( Task.FromException( ChannelUtilities.CreateInvali |
| | 308 | |
|
| | 309 | |
|
| | 310 | | /* |
| | 311 | | * IEnumerable implementation |
| | 312 | | */ |
| | 313 | |
|
| | 314 | | /// <inheritdoc cref="P:BroadcastChannelMux.SingleProducerSingleConsumerQueue`1.IsEmpty" /> |
| 0 | 315 | | public bool IsEmpty => this._queue.IsEmpty; |
| | 316 | |
|
| | 317 | | /// <summary> |
| | 318 | | /// Whether the Channel is has had <see cref="ChannelMuxInput{TData}.TryComplete"/> called. |
| | 319 | | /// </summary> |
| 2 | 320 | | public bool IsComplete => this._isComplete; |
| | 321 | |
|
| | 322 | | /// <summary> |
| | 323 | | /// Whether when the input has incremented its parent's <see cref="ChannelMux._closedChannels"/>. |
| | 324 | | /// </summary> |
| | 325 | | public bool IsClosed { |
| 2 | 326 | | get { |
| 2 | 327 | | lock ( _parent._closedChannelLockObj ) { |
| 2 | 328 | | return this._isClosed; |
| | 329 | | } |
| 2 | 330 | | } |
| | 331 | | } |
| | 332 | |
|
| | 333 | | /* |
| | 334 | | * IEnumerable implementation |
| | 335 | | */ |
| | 336 | |
|
| | 337 | | /// <inheritdoc /> |
| 2 | 338 | | public IEnumerator<TData> GetEnumerator( ) => this._queue.GetEnumerator(); |
| | 339 | |
|
| | 340 | | /// <inheritdoc /> |
| 0 | 341 | | IEnumerator IEnumerable.GetEnumerator( ) { |
| 0 | 342 | | return GetEnumerator(); |
| 0 | 343 | | } |
| | 344 | |
|
| | 345 | | /* |
| | 346 | | * IDisposable implementation |
| | 347 | | */ |
| | 348 | |
|
| 2 | 349 | | private bool _isDisposed = false; |
| | 350 | |
|
| | 351 | | /// <inheritdoc /> |
| 2 | 352 | | public void Dispose( ) { |
| 2 | 353 | | if ( !_isDisposed ) { |
| 2 | 354 | | _removeWriterCallback.Invoke( this.GetHashCode() ); |
| 2 | 355 | | _isDisposed = true; |
| 2 | 356 | | } |
| 2 | 357 | | } |
| | 358 | | } |
| | 359 | | } |