| | 1 | | using System; |
| | 2 | | using System.Collections.Generic; |
| | 3 | | using System.Diagnostics.CodeAnalysis; |
| | 4 | | using System.Threading; |
| | 5 | | using System.Threading.Channels; |
| | 6 | | using System.Threading.Tasks; |
| | 7 | |
|
| | 8 | | namespace mkmrk.Channels; |
| | 9 | |
|
| | 10 | | /// <inheritdoc cref="ChannelMux"/> |
| | 11 | | /// <remarks> |
| | 12 | | /// Note that more generic parameters can easily be added by inheriting from this class and additional type params. |
| | 13 | | /// </remarks> |
| | 14 | | public class ChannelMux<T1, T2> : ChannelMux, IDisposable { |
| | 15 | | private ChannelMuxInput<T1> _input1; |
| | 16 | | private ChannelMuxInput<T2> _input2; |
| | 17 | |
|
| | 18 | | /// <inheritdoc cref="ChannelMux{T1,T2}"/> |
| | 19 | | public ChannelMux( IBroadcastChannelAddReaderProvider<T1> channel1, IBroadcastChannelAddReaderProvider<T2> channel2 |
| | 20 | |
|
| | 21 | | /// <inheritdoc cref="ChannelMux{T1,T2}"/> |
| | 22 | | /// <remarks> |
| | 23 | | /// For construction by subclasses |
| | 24 | | /// </remarks> |
| | 25 | | protected ChannelMux( IBroadcastChannelAddReaderProvider<T1> channel1, IBroadcastChannelAddReaderProvider<T2> channe |
| | 26 | | ArgumentNullException.ThrowIfNull( channel1 ); |
| | 27 | | ArgumentNullException.ThrowIfNull( channel2 ); |
| | 28 | | _input1 = new ChannelMuxInput<T1>( channel1, this ); |
| | 29 | | _input2 = new ChannelMuxInput<T2>( channel2, this ); |
| | 30 | | } |
| | 31 | |
|
| | 32 | | /// <inheritdoc cref="System.Threading.Channels.ChannelReader{T}.TryRead"/> |
| | 33 | | [ SuppressMessage( "ReSharper", "RedundantNullableFlowAttribute" ) ] |
| | 34 | | public bool TryRead( [ MaybeNullWhen( false ) ] out T1 item ) => _input1.TryRead( out item ); |
| | 35 | |
|
| | 36 | | /// <inheritdoc cref="System.Threading.Channels.ChannelReader{T}.TryRead"/> |
| | 37 | | [ SuppressMessage( "ReSharper", "RedundantNullableFlowAttribute" ) ] |
| | 38 | | public bool TryRead( [ MaybeNullWhen( false ) ] out T2 item ) => _input2.TryRead( out item ); |
| | 39 | |
|
| | 40 | | /// <summary> |
| | 41 | | /// Replace the <see cref="Channel"/> of the same data type with <paramref name="newChannel"/>. |
| | 42 | | /// |
| | 43 | | /// <list type="bullet"> |
| | 44 | | /// <item> |
| | 45 | | /// <description><see cref="ChannelMux._completeException"/> will be set to <c>null</c> if set by the channe |
| | 46 | | /// </item> |
| | 47 | | /// <item> |
| | 48 | | /// <description><see cref="ChannelMux._hasException"/> will be set to <c>false</c> if set by the channel be |
| | 49 | | /// </item> |
| | 50 | | /// <item> |
| | 51 | | /// <description><see cref="ChannelMux._closedChannels"/> will be decremented by 1 if the channel was closed |
| | 52 | | /// </item> |
| | 53 | | /// <item> |
| | 54 | | /// <description>The <see cref="ChannelMux.Completion"/> task will be created new if <see cref="Task.IsCompl |
| | 55 | | /// </item> |
| | 56 | | /// <item> |
| | 57 | | /// <description>The prior <see cref="Channel"/> will have the reader associated with this <see cref="Channe |
| | 58 | | /// </item> |
| | 59 | | /// </list> |
| | 60 | | /// </summary> |
| | 61 | | /// <param name="newChannel"> |
| | 62 | | /// Channel that will replace the channel of the matching type. |
| | 63 | | /// </param> |
| | 64 | | /// <param name="force"> |
| | 65 | | /// If set to <c>true</c>, the channel will be replaced regardless of whether <see cref="ChannelWriter{T}.Comple |
| | 66 | | /// </param> |
| | 67 | | /// <returns> |
| | 68 | | /// Any data remaining in the channel being replaced. |
| | 69 | | /// </returns> |
| | 70 | | /// <exception cref="ChannelNotClosedException"> |
| | 71 | | /// If the <see cref="Channel"/> being replaced is not complete. |
| | 72 | | /// This can be overriden by setting <paramref name="force"/> to <c>true</c>. |
| | 73 | | /// </exception> |
| | 74 | | public IEnumerable<T1> ReplaceChannel( IBroadcastChannelAddReaderProvider<T1> newChannel, bool force = false ) { |
| | 75 | | if ( force || this._input1.IsComplete ) { |
| | 76 | | ArgumentNullException.ThrowIfNull( newChannel ); |
| | 77 | | this.resetOneChannel( this._input1 ); |
| | 78 | | var oldMuxInput = Interlocked.Exchange( ref _input1, new ChannelMuxInput<T1>( newChannel, this ) ); |
| | 79 | | oldMuxInput.Dispose(); |
| | 80 | | return oldMuxInput; |
| | 81 | | } |
| | 82 | | return ChannelNotClosedException.Throw<IEnumerable<T1>>(); |
| | 83 | | } |
| | 84 | |
|
| | 85 | | /// <inheritdoc cref="M:BroadcastChannelMux.ChannelMux`2.ReplaceChannel(BroadcastChannel.BroadcastChannelWriter{`0,B |
| | 86 | | public IEnumerable<T2> ReplaceChannel( IBroadcastChannelAddReaderProvider<T2> newChannel, bool force = false ) { |
| | 87 | | if ( force || this._input2.IsComplete ) { |
| | 88 | | ArgumentNullException.ThrowIfNull( newChannel ); |
| | 89 | | this.resetOneChannel( this._input2 ); |
| | 90 | | var oldMuxInput = Interlocked.Exchange( ref _input2, new ChannelMuxInput<T2>( newChannel, this ) ); |
| | 91 | | oldMuxInput.Dispose(); |
| | 92 | | return oldMuxInput; |
| | 93 | | } |
| | 94 | | return ChannelNotClosedException.Throw<IEnumerable<T2>>(); |
| | 95 | | } |
| | 96 | |
|
| | 97 | | /* |
| | 98 | | * IDisposable implementation |
| | 99 | | */ |
| | 100 | |
|
| | 101 | | private bool _isDisposed = false; |
| | 102 | |
|
| | 103 | | /// <inheritdoc cref="IDisposable.Dispose" /> |
| | 104 | | public void Dispose( ) { |
| | 105 | | Dispose( true ); |
| | 106 | | GC.SuppressFinalize( this ); |
| | 107 | | } |
| | 108 | |
|
| | 109 | | /// <inheritdoc cref="IDisposable.Dispose" /> |
| | 110 | | [ SuppressMessage( "ReSharper", "InconsistentNaming" ) ] |
| | 111 | | protected virtual void Dispose( bool disposing ) { |
| | 112 | | if ( !_isDisposed ) { |
| | 113 | | if ( disposing ) { |
| | 114 | | _input1.Dispose(); |
| | 115 | | _input2.Dispose(); |
| | 116 | | } |
| | 117 | | // TODO: do I need to null out _inputX here? |
| | 118 | | // _input1 = null |
| | 119 | | _isDisposed = true; |
| | 120 | | } |
| | 121 | | } |
| | 122 | | } |
| | 123 | |
|
| | 124 | | /// <inheritdoc cref="ChannelMux{T1,T2}"/> |
| | 125 | | public class ChannelMux<T1, T2, T3> : ChannelMux<T1, T2> { |
| | 126 | | private ChannelMuxInput<T3> _input; |
| | 127 | |
|
| | 128 | | /// <inheritdoc cref="ChannelMux{T1,T2}"/> |
| | 129 | | public ChannelMux( |
| | 130 | | IBroadcastChannelAddReaderProvider<T1> channel1, |
| | 131 | | IBroadcastChannelAddReaderProvider<T2> channel2, |
| | 132 | | IBroadcastChannelAddReaderProvider<T3> channel3 |
| 2 | 133 | | ) : this( channel1, channel2, channel3, totalChannels: 3 ) { } |
| | 134 | |
|
| | 135 | | /// <inheritdoc cref="M:mkmrk.Channels.ChannelMux`2.#ctor(mkmrk.Channels.BroadcastChannelWriter{`0,mkmrk.Channels.IB |
| | 136 | | protected ChannelMux( |
| | 137 | | IBroadcastChannelAddReaderProvider<T1> channel1, |
| | 138 | | IBroadcastChannelAddReaderProvider<T2> channel2, |
| | 139 | | IBroadcastChannelAddReaderProvider<T3> channel3, |
| | 140 | | int totalChannels |
| 2 | 141 | | ) : base( channel1, channel2, totalChannels: totalChannels ) { |
| 2 | 142 | | ArgumentNullException.ThrowIfNull( channel3 ); |
| 2 | 143 | | _input = new ChannelMuxInput<T3>( channel3, this ); |
| 2 | 144 | | } |
| | 145 | |
|
| | 146 | | /// <inheritdoc cref="System.Threading.Channels.ChannelReader{T}.TryRead"/> |
| 2 | 147 | | public bool TryRead( [ MaybeNullWhen( false ) ] out T3 item ) => _input.TryRead( out item ); |
| | 148 | |
|
| | 149 | | /// <inheritdoc cref="M:BroadcastChannelMux.ChannelMux`3.ReplaceChannel(BroadcastChannel.BroadcastChannelWriter{`0,B |
| 0 | 150 | | public IEnumerable<T3> ReplaceChannel( IBroadcastChannelAddReaderProvider<T3> newChannel, bool force = false ) { |
| 0 | 151 | | if ( force || this._input.IsComplete ) { |
| 0 | 152 | | ArgumentNullException.ThrowIfNull( newChannel ); |
| 0 | 153 | | this.resetOneChannel( this._input ); |
| 0 | 154 | | var oldMuxInput = Interlocked.Exchange( ref _input, new ChannelMuxInput<T3>( newChannel, this ) ); |
| 0 | 155 | | oldMuxInput.Dispose(); |
| 0 | 156 | | return oldMuxInput; |
| | 157 | | } |
| 0 | 158 | | return ChannelNotClosedException.Throw<IEnumerable<T3>>(); |
| 0 | 159 | | } |
| | 160 | |
|
| | 161 | | /* |
| | 162 | | * Disposal |
| | 163 | | */ |
| | 164 | |
|
| 2 | 165 | | private bool _isDisposed = false; |
| | 166 | |
|
| | 167 | | /// <inheritdoc cref="IDisposable.Dispose" /> |
| 2 | 168 | | protected override void Dispose( bool disposing ) { |
| 2 | 169 | | if ( !_isDisposed ) { |
| 2 | 170 | | if ( disposing ) { |
| 2 | 171 | | _input.Dispose(); |
| 2 | 172 | | } |
| 2 | 173 | | _isDisposed = true; |
| 2 | 174 | | } |
| 2 | 175 | | base.Dispose( disposing ); |
| 2 | 176 | | } |
| | 177 | | } |
| | 178 | |
|
| | 179 | | /// <inheritdoc cref="ChannelMux{T1,T2}"/> |
| | 180 | | public class ChannelMux<T1, T2, T3, T4> : ChannelMux<T1, T2, T3> { |
| | 181 | | private ChannelMuxInput<T4> _input; |
| | 182 | |
|
| | 183 | | /// <inheritdoc cref="ChannelMux{T1,T2}"/> |
| | 184 | | public ChannelMux( |
| | 185 | | IBroadcastChannelAddReaderProvider<T1> channel1, |
| | 186 | | IBroadcastChannelAddReaderProvider<T2> channel2, |
| | 187 | | IBroadcastChannelAddReaderProvider<T3> channel3, |
| | 188 | | IBroadcastChannelAddReaderProvider<T4> channel4 |
| | 189 | | ) : this( channel1, channel2, channel3, channel4, totalChannels: 4 ) { } |
| | 190 | |
|
| | 191 | | /// <inheritdoc cref="M:mkmrk.Channels.ChannelMux`2.#ctor(mkmrk.Channels.BroadcastChannelWriter{`0,mkmrk.Channels.IB |
| | 192 | | protected ChannelMux( |
| | 193 | | IBroadcastChannelAddReaderProvider<T1> channel1, |
| | 194 | | IBroadcastChannelAddReaderProvider<T2> channel2, |
| | 195 | | IBroadcastChannelAddReaderProvider<T3> channel3, |
| | 196 | | IBroadcastChannelAddReaderProvider<T4> channel4, |
| | 197 | | int totalChannels |
| | 198 | | ) : base( channel1, channel2, channel3, totalChannels: totalChannels ) { |
| | 199 | | ArgumentNullException.ThrowIfNull( channel4 ); |
| | 200 | | _input = new ChannelMuxInput<T4>( channel4, this ); |
| | 201 | | } |
| | 202 | |
|
| | 203 | | /// <inheritdoc cref="System.Threading.Channels.ChannelReader{T}.TryRead"/> |
| | 204 | | public bool TryRead( [ MaybeNullWhen( false ) ] out T4 item ) => _input.TryRead( out item ); |
| | 205 | |
|
| | 206 | | /// <inheritdoc cref="M:BroadcastChannelMux.ChannelMux`4.ReplaceChannel(BroadcastChannel.BroadcastChannelWriter{`0,B |
| | 207 | | public IEnumerable<T4> ReplaceChannel( IBroadcastChannelAddReaderProvider<T4> newChannel, bool force = false ) { |
| | 208 | | if ( force || this._input.IsComplete ) { |
| | 209 | | ArgumentNullException.ThrowIfNull( newChannel ); |
| | 210 | | this.resetOneChannel( this._input ); |
| | 211 | | var oldMuxInput = Interlocked.Exchange( ref _input, new ChannelMuxInput<T4>( newChannel, this ) ); |
| | 212 | | oldMuxInput.Dispose(); |
| | 213 | | return oldMuxInput; |
| | 214 | | } |
| | 215 | | return ChannelNotClosedException.Throw<IEnumerable<T4>>(); |
| | 216 | | } |
| | 217 | |
|
| | 218 | | /* |
| | 219 | | * Disposal |
| | 220 | | */ |
| | 221 | |
|
| | 222 | | private bool _isDisposed = false; |
| | 223 | |
|
| | 224 | | /// <inheritdoc cref="IDisposable.Dispose" /> |
| | 225 | | protected override void Dispose( bool disposing ) { |
| | 226 | | if ( !_isDisposed ) { |
| | 227 | | if ( disposing ) { |
| | 228 | | _input.Dispose(); |
| | 229 | | } |
| | 230 | | _isDisposed = true; |
| | 231 | | } |
| | 232 | | base.Dispose( disposing ); |
| | 233 | | } |
| | 234 | | } |
| | 235 | |
|
| | 236 | | /// <inheritdoc cref="ChannelMux{T1,T2}"/> |
| | 237 | | public class ChannelMux<T1, T2, T3, T4, T5> : ChannelMux<T1, T2, T3, T4> { |
| | 238 | | private ChannelMuxInput<T5> _input; |
| | 239 | |
|
| | 240 | | /// <inheritdoc cref="ChannelMux{T1,T2}"/> |
| | 241 | | public ChannelMux( |
| | 242 | | IBroadcastChannelAddReaderProvider<T1> channel1, |
| | 243 | | IBroadcastChannelAddReaderProvider<T2> channel2, |
| | 244 | | IBroadcastChannelAddReaderProvider<T3> channel3, |
| | 245 | | IBroadcastChannelAddReaderProvider<T4> channel4, |
| | 246 | | IBroadcastChannelAddReaderProvider<T5> channel5 |
| | 247 | | ) : this( channel1, channel2, channel3, channel4, channel5, totalChannels: 5 ) { } |
| | 248 | |
|
| | 249 | | /// <inheritdoc cref="M:mkmrk.Channels.ChannelMux`2.#ctor(mkmrk.Channels.BroadcastChannelWriter{`0,mkmrk.Channels.IB |
| | 250 | | protected ChannelMux( |
| | 251 | | IBroadcastChannelAddReaderProvider<T1> channel1, |
| | 252 | | IBroadcastChannelAddReaderProvider<T2> channel2, |
| | 253 | | IBroadcastChannelAddReaderProvider<T3> channel3, |
| | 254 | | IBroadcastChannelAddReaderProvider<T4> channel4, |
| | 255 | | IBroadcastChannelAddReaderProvider<T5> channel5, |
| | 256 | | int totalChannels |
| | 257 | | ) : base( channel1, channel2, channel3, channel4, totalChannels: totalChannels ) { |
| | 258 | | ArgumentNullException.ThrowIfNull( channel5 ); |
| | 259 | | _input = new ChannelMuxInput<T5>( channel5, this ); |
| | 260 | | } |
| | 261 | |
|
| | 262 | | /// <inheritdoc cref="System.Threading.Channels.ChannelReader{T}.TryRead"/> |
| | 263 | | public bool TryRead( [ MaybeNullWhen( false ) ] out T5 item ) => _input.TryRead( out item ); |
| | 264 | |
|
| | 265 | | /// <inheritdoc cref="M:BroadcastChannelMux.ChannelMux`5.ReplaceChannel(BroadcastChannel.BroadcastChannelWriter{`0,B |
| | 266 | | public IEnumerable<T5> ReplaceChannel( IBroadcastChannelAddReaderProvider<T5> newChannel, bool force = false ) { |
| | 267 | | if ( force || this._input.IsComplete ) { |
| | 268 | | ArgumentNullException.ThrowIfNull( newChannel ); |
| | 269 | | this.resetOneChannel( this._input ); |
| | 270 | | var oldMuxInput = Interlocked.Exchange( ref _input, new ChannelMuxInput<T5>( newChannel, this ) ); |
| | 271 | | oldMuxInput.Dispose(); |
| | 272 | | return oldMuxInput; |
| | 273 | | } |
| | 274 | | return ChannelNotClosedException.Throw<IEnumerable<T5>>(); |
| | 275 | | } |
| | 276 | |
|
| | 277 | | /* |
| | 278 | | * Disposal |
| | 279 | | */ |
| | 280 | |
|
| | 281 | | private bool _isDisposed = false; |
| | 282 | |
|
| | 283 | | /// <inheritdoc cref="IDisposable.Dispose" /> |
| | 284 | | protected override void Dispose( bool disposing ) { |
| | 285 | | if ( !_isDisposed ) { |
| | 286 | | if ( disposing ) { |
| | 287 | | _input.Dispose(); |
| | 288 | | } |
| | 289 | | _isDisposed = true; |
| | 290 | | } |
| | 291 | | base.Dispose( disposing ); |
| | 292 | | } |
| | 293 | | } |
| | 294 | |
|
| | 295 | | /// <inheritdoc cref="ChannelMux{T1,T2}"/> |
| | 296 | | public class ChannelMux<T1, T2, T3, T4, T5, T6> : ChannelMux<T1, T2, T3, T4, T5> { |
| | 297 | | private ChannelMuxInput<T6> _input; |
| | 298 | |
|
| | 299 | | /// <inheritdoc cref="ChannelMux{T1,T2}"/> |
| | 300 | | public ChannelMux( |
| | 301 | | IBroadcastChannelAddReaderProvider<T1> channel1, |
| | 302 | | IBroadcastChannelAddReaderProvider<T2> channel2, |
| | 303 | | IBroadcastChannelAddReaderProvider<T3> channel3, |
| | 304 | | IBroadcastChannelAddReaderProvider<T4> channel4, |
| | 305 | | IBroadcastChannelAddReaderProvider<T5> channel5, |
| | 306 | | IBroadcastChannelAddReaderProvider<T6> channel6 |
| | 307 | | ) : this( channel1, channel2, channel3, channel4, channel5, channel6, totalChannels: 6 ) { } |
| | 308 | |
|
| | 309 | | /// <inheritdoc cref="M:mkmrk.Channels.ChannelMux`2.#ctor(mkmrk.Channels.BroadcastChannelWriter{`0,mkmrk.Channels.IB |
| | 310 | | protected ChannelMux( |
| | 311 | | IBroadcastChannelAddReaderProvider<T1> channel1, |
| | 312 | | IBroadcastChannelAddReaderProvider<T2> channel2, |
| | 313 | | IBroadcastChannelAddReaderProvider<T3> channel3, |
| | 314 | | IBroadcastChannelAddReaderProvider<T4> channel4, |
| | 315 | | IBroadcastChannelAddReaderProvider<T5> channel5, |
| | 316 | | IBroadcastChannelAddReaderProvider<T6> channel6, |
| | 317 | | int totalChannels |
| | 318 | | ) : base( channel1, channel2, channel3, channel4, channel5, totalChannels: totalChannels ) { |
| | 319 | | ArgumentNullException.ThrowIfNull( channel6 ); |
| | 320 | | _input = new ChannelMuxInput<T6>( channel6, this ); |
| | 321 | | } |
| | 322 | |
|
| | 323 | | /// <inheritdoc cref="System.Threading.Channels.ChannelReader{T}.TryRead"/> |
| | 324 | | public bool TryRead( [ MaybeNullWhen( false ) ] out T6 item ) => _input.TryRead( out item ); |
| | 325 | |
|
| | 326 | | /// <inheritdoc cref="M:BroadcastChannelMux.ChannelMux`6.ReplaceChannel(BroadcastChannel.BroadcastChannelWriter{`0,B |
| | 327 | | public IEnumerable<T6> ReplaceChannel( IBroadcastChannelAddReaderProvider<T6> newChannel, bool force = false ) { |
| | 328 | | if ( force || this._input.IsComplete ) { |
| | 329 | | ArgumentNullException.ThrowIfNull( newChannel ); |
| | 330 | | this.resetOneChannel( this._input ); |
| | 331 | | var oldMuxInput = Interlocked.Exchange( ref _input, new ChannelMuxInput<T6>( newChannel, this ) ); |
| | 332 | | oldMuxInput.Dispose(); |
| | 333 | | return oldMuxInput; |
| | 334 | | } |
| | 335 | | return ChannelNotClosedException.Throw<IEnumerable<T6>>(); |
| | 336 | | } |
| | 337 | |
|
| | 338 | | /* |
| | 339 | | * Disposal |
| | 340 | | */ |
| | 341 | |
|
| | 342 | | private bool _isDisposed = false; |
| | 343 | |
|
| | 344 | | /// <inheritdoc cref="IDisposable.Dispose" /> |
| | 345 | | protected override void Dispose( bool disposing ) { |
| | 346 | | if ( !_isDisposed ) { |
| | 347 | | if ( disposing ) { |
| | 348 | | _input.Dispose(); |
| | 349 | | } |
| | 350 | | _isDisposed = true; |
| | 351 | | } |
| | 352 | | base.Dispose( disposing ); |
| | 353 | | } |
| | 354 | | } |
| | 355 | |
|
| | 356 | | /// <inheritdoc cref="ChannelMux{T1,T2}"/> |
| | 357 | | public class ChannelMux<T1, T2, T3, T4, T5, T6, T7> : ChannelMux<T1, T2, T3, T4, T5, T6> { |
| | 358 | | private ChannelMuxInput<T7> _input; |
| | 359 | |
|
| | 360 | | /// <inheritdoc cref="ChannelMux{T1,T2}"/> |
| | 361 | | public ChannelMux( |
| | 362 | | IBroadcastChannelAddReaderProvider<T1> channel1, |
| | 363 | | IBroadcastChannelAddReaderProvider<T2> channel2, |
| | 364 | | IBroadcastChannelAddReaderProvider<T3> channel3, |
| | 365 | | IBroadcastChannelAddReaderProvider<T4> channel4, |
| | 366 | | IBroadcastChannelAddReaderProvider<T5> channel5, |
| | 367 | | IBroadcastChannelAddReaderProvider<T6> channel6, |
| | 368 | | IBroadcastChannelAddReaderProvider<T7> channel7 |
| | 369 | | ) : this( channel1, channel2, channel3, channel4, channel5, channel6, channel7, totalChannels: 7 ) { } |
| | 370 | |
|
| | 371 | | /// <inheritdoc cref="M:mkmrk.Channels.ChannelMux`2.#ctor(mkmrk.Channels.BroadcastChannelWriter{`0,mkmrk.Channels.IB |
| | 372 | | protected ChannelMux( |
| | 373 | | IBroadcastChannelAddReaderProvider<T1> channel1, |
| | 374 | | IBroadcastChannelAddReaderProvider<T2> channel2, |
| | 375 | | IBroadcastChannelAddReaderProvider<T3> channel3, |
| | 376 | | IBroadcastChannelAddReaderProvider<T4> channel4, |
| | 377 | | IBroadcastChannelAddReaderProvider<T5> channel5, |
| | 378 | | IBroadcastChannelAddReaderProvider<T6> channel6, |
| | 379 | | IBroadcastChannelAddReaderProvider<T7> channel7, |
| | 380 | | int totalChannels |
| | 381 | | ) : base( channel1, channel2, channel3, channel4, channel5, channel6, totalChannels: totalChannels ) { |
| | 382 | | ArgumentNullException.ThrowIfNull( channel7 ); |
| | 383 | | _input = new ChannelMuxInput<T7>( channel7, this ); |
| | 384 | | } |
| | 385 | |
|
| | 386 | | /// <inheritdoc cref="System.Threading.Channels.ChannelReader{T}.TryRead"/> |
| | 387 | | public bool TryRead( [ MaybeNullWhen( false ) ] out T7 item ) => _input.TryRead( out item ); |
| | 388 | |
|
| | 389 | | /// <inheritdoc cref="M:BroadcastChannelMux.ChannelMux`7.ReplaceChannel(BroadcastChannel.BroadcastChannelWriter{`0,B |
| | 390 | | public IEnumerable<T7> ReplaceChannel( IBroadcastChannelAddReaderProvider<T7> newChannel, bool force = false ) { |
| | 391 | | if ( force || this._input.IsComplete ) { |
| | 392 | | ArgumentNullException.ThrowIfNull( newChannel ); |
| | 393 | | this.resetOneChannel( this._input ); |
| | 394 | | var oldMuxInput = Interlocked.Exchange( ref _input, new ChannelMuxInput<T7>( newChannel, this ) ); |
| | 395 | | oldMuxInput.Dispose(); |
| | 396 | | return oldMuxInput; |
| | 397 | | } |
| | 398 | | return ChannelNotClosedException.Throw<IEnumerable<T7>>(); |
| | 399 | | } |
| | 400 | |
|
| | 401 | | /* |
| | 402 | | * Disposal |
| | 403 | | */ |
| | 404 | |
|
| | 405 | | private bool _isDisposed = false; |
| | 406 | |
|
| | 407 | | /// <inheritdoc cref="IDisposable.Dispose" /> |
| | 408 | | protected override void Dispose( bool disposing ) { |
| | 409 | | if ( !_isDisposed ) { |
| | 410 | | if ( disposing ) { |
| | 411 | | _input.Dispose(); |
| | 412 | | } |
| | 413 | | _isDisposed = true; |
| | 414 | | } |
| | 415 | | base.Dispose( disposing ); |
| | 416 | | } |
| | 417 | | } |
| | 418 | |
|
| | 419 | | /// <inheritdoc cref="ChannelMux{T1,T2}"/> |
| | 420 | | public class ChannelMux<T1, T2, T3, T4, T5, T6, T7, T8> : ChannelMux<T1, T2, T3, T4, T5, T6, T7> { |
| | 421 | | private ChannelMuxInput<T8> _input; |
| | 422 | |
|
| | 423 | | /// <inheritdoc cref="ChannelMux{T1,T2}"/> |
| | 424 | | public ChannelMux( |
| | 425 | | IBroadcastChannelAddReaderProvider<T1> channel1, |
| | 426 | | IBroadcastChannelAddReaderProvider<T2> channel2, |
| | 427 | | IBroadcastChannelAddReaderProvider<T3> channel3, |
| | 428 | | IBroadcastChannelAddReaderProvider<T4> channel4, |
| | 429 | | IBroadcastChannelAddReaderProvider<T5> channel5, |
| | 430 | | IBroadcastChannelAddReaderProvider<T6> channel6, |
| | 431 | | IBroadcastChannelAddReaderProvider<T7> channel7, |
| | 432 | | IBroadcastChannelAddReaderProvider<T8> channel8 |
| | 433 | | ) : this( channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8, totalChannels: 8 ) { } |
| | 434 | |
|
| | 435 | | /// <inheritdoc cref="M:mkmrk.Channels.ChannelMux`2.#ctor(mkmrk.Channels.BroadcastChannelWriter{`0,mkmrk.Channels.IB |
| | 436 | | // ReSharper disable once MemberCanBePrivate.Global |
| | 437 | | protected ChannelMux( |
| | 438 | | IBroadcastChannelAddReaderProvider<T1> channel1, |
| | 439 | | IBroadcastChannelAddReaderProvider<T2> channel2, |
| | 440 | | IBroadcastChannelAddReaderProvider<T3> channel3, |
| | 441 | | IBroadcastChannelAddReaderProvider<T4> channel4, |
| | 442 | | IBroadcastChannelAddReaderProvider<T5> channel5, |
| | 443 | | IBroadcastChannelAddReaderProvider<T6> channel6, |
| | 444 | | IBroadcastChannelAddReaderProvider<T7> channel7, |
| | 445 | | IBroadcastChannelAddReaderProvider<T8> channel8, |
| | 446 | | int totalChannels |
| | 447 | | ) : base( channel1, channel2, channel3, channel4, channel5, channel6, channel7, totalChannels: totalChannels ) { |
| | 448 | | ArgumentNullException.ThrowIfNull( channel8 ); |
| | 449 | | _input = new ChannelMuxInput<T8>( channel8, this ); |
| | 450 | | } |
| | 451 | |
|
| | 452 | | /// <inheritdoc cref="System.Threading.Channels.ChannelReader{T}.TryRead"/> |
| | 453 | | public bool TryRead( [ MaybeNullWhen( false ) ] out T8 item ) => _input.TryRead( out item ); |
| | 454 | |
|
| | 455 | | /// <inheritdoc cref="M:BroadcastChannelMux.ChannelMux`8.ReplaceChannel(BroadcastChannel.BroadcastChannelWriter{`0,B |
| | 456 | | public IEnumerable<T8> ReplaceChannel( IBroadcastChannelAddReaderProvider<T8> newChannel, bool force = false ) { |
| | 457 | | if ( force || this._input.IsComplete ) { |
| | 458 | | ArgumentNullException.ThrowIfNull( newChannel ); |
| | 459 | | this.resetOneChannel( this._input ); |
| | 460 | | var oldMuxInput = Interlocked.Exchange( ref _input, new ChannelMuxInput<T8>( newChannel, this ) ); |
| | 461 | | oldMuxInput.Dispose(); |
| | 462 | | return oldMuxInput; |
| | 463 | | } |
| | 464 | | return ChannelNotClosedException.Throw<IEnumerable<T8>>(); |
| | 465 | | } |
| | 466 | |
|
| | 467 | | /* |
| | 468 | | * Disposal |
| | 469 | | */ |
| | 470 | |
|
| | 471 | | private bool _isDisposed = false; |
| | 472 | |
|
| | 473 | | /// <inheritdoc cref="IDisposable.Dispose" /> |
| | 474 | | protected override void Dispose( bool disposing ) { |
| | 475 | | if ( !_isDisposed ) { |
| | 476 | | if ( disposing ) { |
| | 477 | | _input.Dispose(); |
| | 478 | | } |
| | 479 | | _isDisposed = true; |
| | 480 | | } |
| | 481 | | base.Dispose( disposing ); |
| | 482 | | } |
| | 483 | | } |