| | | 1 | | using System; |
| | | 2 | | using System.Collections.Generic; |
| | | 3 | | using System.Diagnostics.CodeAnalysis; |
| | | 4 | | using System.Threading; |
| | | 5 | | using System.Threading.Channels; |
| | | 6 | | using System.Threading.Tasks; |
| | | 7 | | |
| | | 8 | | namespace mkmrk.Channels; |
| | | 9 | | |
| | | 10 | | /// <inheritdoc cref="ChannelMux"/> |
| | | 11 | | /// <remarks> |
| | | 12 | | /// Note that more generic parameters can easily be added by inheriting from this class and additional type params. |
| | | 13 | | /// </remarks> |
| | | 14 | | public class ChannelMux<T1, T2> : ChannelMux, IDisposable { |
| | | 15 | | private ChannelMuxInput<T1> _input1; |
| | | 16 | | private ChannelMuxInput<T2> _input2; |
| | | 17 | | |
| | | 18 | | /// <inheritdoc cref="ChannelMux{T1,T2}"/> |
| | | 19 | | public ChannelMux( IBroadcastChannelAddReaderProvider<T1> channel1, IBroadcastChannelAddReaderProvider<T2> channel2 |
| | | 20 | | |
| | | 21 | | /// <inheritdoc cref="ChannelMux{T1,T2}"/> |
| | | 22 | | /// <remarks> |
| | | 23 | | /// For construction by subclasses |
| | | 24 | | /// </remarks> |
| | | 25 | | protected ChannelMux( IBroadcastChannelAddReaderProvider<T1> channel1, IBroadcastChannelAddReaderProvider<T2> channe |
| | | 26 | | ArgumentNullException.ThrowIfNull( channel1 ); |
| | | 27 | | ArgumentNullException.ThrowIfNull( channel2 ); |
| | | 28 | | _input1 = new ChannelMuxInput<T1>( channel1, this ); |
| | | 29 | | _input2 = new ChannelMuxInput<T2>( channel2, this ); |
| | | 30 | | } |
| | | 31 | | |
| | | 32 | | /// <inheritdoc cref="System.Threading.Channels.ChannelReader{T}.TryRead"/> |
| | | 33 | | [ SuppressMessage( "ReSharper", "RedundantNullableFlowAttribute" ) ] |
| | | 34 | | public bool TryRead( [ MaybeNullWhen( false ) ] out T1 item ) => _input1.TryRead( out item ); |
| | | 35 | | |
| | | 36 | | /// <inheritdoc cref="System.Threading.Channels.ChannelReader{T}.TryRead"/> |
| | | 37 | | [ SuppressMessage( "ReSharper", "RedundantNullableFlowAttribute" ) ] |
| | | 38 | | public bool TryRead( [ MaybeNullWhen( false ) ] out T2 item ) => _input2.TryRead( out item ); |
| | | 39 | | |
| | | 40 | | /// <summary> |
| | | 41 | | /// Replace the <see cref="Channel"/> of the same data type with <paramref name="newChannel"/>. |
| | | 42 | | /// |
| | | 43 | | /// <list type="bullet"> |
| | | 44 | | /// <item> |
| | | 45 | | /// <description><see cref="ChannelMux._completeException"/> will be set to <c>null</c> if set by the channe |
| | | 46 | | /// </item> |
| | | 47 | | /// <item> |
| | | 48 | | /// <description><see cref="ChannelMux._hasException"/> will be set to <c>false</c> if set by the channel be |
| | | 49 | | /// </item> |
| | | 50 | | /// <item> |
| | | 51 | | /// <description><see cref="ChannelMux._closedChannels"/> will be decremented by 1 if the channel was closed |
| | | 52 | | /// </item> |
| | | 53 | | /// <item> |
| | | 54 | | /// <description>The <see cref="ChannelMux.Completion"/> task will be created new if <see cref="Task.IsCompl |
| | | 55 | | /// </item> |
| | | 56 | | /// <item> |
| | | 57 | | /// <description>The prior <see cref="Channel"/> will have the reader associated with this <see cref="Channe |
| | | 58 | | /// </item> |
| | | 59 | | /// </list> |
| | | 60 | | /// </summary> |
| | | 61 | | /// <param name="newChannel"> |
| | | 62 | | /// Channel that will replace the channel of the matching type. |
| | | 63 | | /// </param> |
| | | 64 | | /// <param name="force"> |
| | | 65 | | /// If set to <c>true</c>, the channel will be replaced regardless of whether <see cref="ChannelWriter{T}.Comple |
| | | 66 | | /// </param> |
| | | 67 | | /// <returns> |
| | | 68 | | /// Any data remaining in the channel being replaced. |
| | | 69 | | /// </returns> |
| | | 70 | | /// <exception cref="ChannelNotClosedException"> |
| | | 71 | | /// If the <see cref="Channel"/> being replaced is not complete. |
| | | 72 | | /// This can be overriden by setting <paramref name="force"/> to <c>true</c>. |
| | | 73 | | /// </exception> |
| | | 74 | | public IEnumerable<T1> ReplaceChannel( IBroadcastChannelAddReaderProvider<T1> newChannel, bool force = false ) { |
| | | 75 | | if ( force || this._input1.IsComplete ) { |
| | | 76 | | ArgumentNullException.ThrowIfNull( newChannel ); |
| | | 77 | | this.resetOneChannel( this._input1 ); |
| | | 78 | | var oldMuxInput = Interlocked.Exchange( ref _input1, new ChannelMuxInput<T1>( newChannel, this ) ); |
| | | 79 | | oldMuxInput.Dispose(); |
| | | 80 | | return oldMuxInput; |
| | | 81 | | } |
| | | 82 | | return ChannelNotClosedException.Throw<IEnumerable<T1>>(); |
| | | 83 | | } |
| | | 84 | | |
| | | 85 | | /// <inheritdoc cref="M:BroadcastChannelMux.ChannelMux`2.ReplaceChannel(BroadcastChannel.BroadcastChannelWriter{`0,B |
| | | 86 | | public IEnumerable<T2> ReplaceChannel( IBroadcastChannelAddReaderProvider<T2> newChannel, bool force = false ) { |
| | | 87 | | if ( force || this._input2.IsComplete ) { |
| | | 88 | | ArgumentNullException.ThrowIfNull( newChannel ); |
| | | 89 | | this.resetOneChannel( this._input2 ); |
| | | 90 | | var oldMuxInput = Interlocked.Exchange( ref _input2, new ChannelMuxInput<T2>( newChannel, this ) ); |
| | | 91 | | oldMuxInput.Dispose(); |
| | | 92 | | return oldMuxInput; |
| | | 93 | | } |
| | | 94 | | return ChannelNotClosedException.Throw<IEnumerable<T2>>(); |
| | | 95 | | } |
| | | 96 | | |
| | | 97 | | /* |
| | | 98 | | * IDisposable implementation |
| | | 99 | | */ |
| | | 100 | | |
| | | 101 | | private bool _isDisposed = false; |
| | | 102 | | |
| | | 103 | | /// <inheritdoc cref="IDisposable.Dispose" /> |
| | | 104 | | public void Dispose( ) { |
| | | 105 | | Dispose( true ); |
| | | 106 | | GC.SuppressFinalize( this ); |
| | | 107 | | } |
| | | 108 | | |
| | | 109 | | /// <inheritdoc cref="IDisposable.Dispose" /> |
| | | 110 | | [ SuppressMessage( "ReSharper", "InconsistentNaming" ) ] |
| | | 111 | | protected virtual void Dispose( bool disposing ) { |
| | | 112 | | if ( !_isDisposed ) { |
| | | 113 | | if ( disposing ) { |
| | | 114 | | _input1.Dispose(); |
| | | 115 | | _input2.Dispose(); |
| | | 116 | | } |
| | | 117 | | // TODO: do I need to null out _inputX here? |
| | | 118 | | // _input1 = null |
| | | 119 | | _isDisposed = true; |
| | | 120 | | } |
| | | 121 | | } |
| | | 122 | | } |
| | | 123 | | |
| | | 124 | | /// <inheritdoc cref="ChannelMux{T1,T2}"/> |
| | | 125 | | public class ChannelMux<T1, T2, T3> : ChannelMux<T1, T2> { |
| | | 126 | | private ChannelMuxInput<T3> _input; |
| | | 127 | | |
| | | 128 | | /// <inheritdoc cref="ChannelMux{T1,T2}"/> |
| | | 129 | | public ChannelMux( |
| | | 130 | | IBroadcastChannelAddReaderProvider<T1> channel1, |
| | | 131 | | IBroadcastChannelAddReaderProvider<T2> channel2, |
| | | 132 | | IBroadcastChannelAddReaderProvider<T3> channel3 |
| | | 133 | | ) : this( channel1, channel2, channel3, totalChannels: 3 ) { } |
| | | 134 | | |
| | | 135 | | /// <inheritdoc cref="M:mkmrk.Channels.ChannelMux`2.#ctor(mkmrk.Channels.BroadcastChannelWriter{`0,mkmrk.Channels.IB |
| | | 136 | | protected ChannelMux( |
| | | 137 | | IBroadcastChannelAddReaderProvider<T1> channel1, |
| | | 138 | | IBroadcastChannelAddReaderProvider<T2> channel2, |
| | | 139 | | IBroadcastChannelAddReaderProvider<T3> channel3, |
| | | 140 | | int totalChannels |
| | | 141 | | ) : base( channel1, channel2, totalChannels: totalChannels ) { |
| | | 142 | | ArgumentNullException.ThrowIfNull( channel3 ); |
| | | 143 | | _input = new ChannelMuxInput<T3>( channel3, this ); |
| | | 144 | | } |
| | | 145 | | |
| | | 146 | | /// <inheritdoc cref="System.Threading.Channels.ChannelReader{T}.TryRead"/> |
| | | 147 | | public bool TryRead( [ MaybeNullWhen( false ) ] out T3 item ) => _input.TryRead( out item ); |
| | | 148 | | |
| | | 149 | | /// <inheritdoc cref="M:BroadcastChannelMux.ChannelMux`3.ReplaceChannel(BroadcastChannel.BroadcastChannelWriter{`0,B |
| | | 150 | | public IEnumerable<T3> ReplaceChannel( IBroadcastChannelAddReaderProvider<T3> newChannel, bool force = false ) { |
| | | 151 | | if ( force || this._input.IsComplete ) { |
| | | 152 | | ArgumentNullException.ThrowIfNull( newChannel ); |
| | | 153 | | this.resetOneChannel( this._input ); |
| | | 154 | | var oldMuxInput = Interlocked.Exchange( ref _input, new ChannelMuxInput<T3>( newChannel, this ) ); |
| | | 155 | | oldMuxInput.Dispose(); |
| | | 156 | | return oldMuxInput; |
| | | 157 | | } |
| | | 158 | | return ChannelNotClosedException.Throw<IEnumerable<T3>>(); |
| | | 159 | | } |
| | | 160 | | |
| | | 161 | | /* |
| | | 162 | | * Disposal |
| | | 163 | | */ |
| | | 164 | | |
| | | 165 | | private bool _isDisposed = false; |
| | | 166 | | |
| | | 167 | | /// <inheritdoc cref="IDisposable.Dispose" /> |
| | | 168 | | protected override void Dispose( bool disposing ) { |
| | | 169 | | if ( !_isDisposed ) { |
| | | 170 | | if ( disposing ) { |
| | | 171 | | _input.Dispose(); |
| | | 172 | | } |
| | | 173 | | _isDisposed = true; |
| | | 174 | | } |
| | | 175 | | base.Dispose( disposing ); |
| | | 176 | | } |
| | | 177 | | } |
| | | 178 | | |
| | | 179 | | /// <inheritdoc cref="ChannelMux{T1,T2}"/> |
| | | 180 | | public class ChannelMux<T1, T2, T3, T4> : ChannelMux<T1, T2, T3> { |
| | | 181 | | private ChannelMuxInput<T4> _input; |
| | | 182 | | |
| | | 183 | | /// <inheritdoc cref="ChannelMux{T1,T2}"/> |
| | | 184 | | public ChannelMux( |
| | | 185 | | IBroadcastChannelAddReaderProvider<T1> channel1, |
| | | 186 | | IBroadcastChannelAddReaderProvider<T2> channel2, |
| | | 187 | | IBroadcastChannelAddReaderProvider<T3> channel3, |
| | | 188 | | IBroadcastChannelAddReaderProvider<T4> channel4 |
| | | 189 | | ) : this( channel1, channel2, channel3, channel4, totalChannels: 4 ) { } |
| | | 190 | | |
| | | 191 | | /// <inheritdoc cref="M:mkmrk.Channels.ChannelMux`2.#ctor(mkmrk.Channels.BroadcastChannelWriter{`0,mkmrk.Channels.IB |
| | | 192 | | protected ChannelMux( |
| | | 193 | | IBroadcastChannelAddReaderProvider<T1> channel1, |
| | | 194 | | IBroadcastChannelAddReaderProvider<T2> channel2, |
| | | 195 | | IBroadcastChannelAddReaderProvider<T3> channel3, |
| | | 196 | | IBroadcastChannelAddReaderProvider<T4> channel4, |
| | | 197 | | int totalChannels |
| | | 198 | | ) : base( channel1, channel2, channel3, totalChannels: totalChannels ) { |
| | | 199 | | ArgumentNullException.ThrowIfNull( channel4 ); |
| | | 200 | | _input = new ChannelMuxInput<T4>( channel4, this ); |
| | | 201 | | } |
| | | 202 | | |
| | | 203 | | /// <inheritdoc cref="System.Threading.Channels.ChannelReader{T}.TryRead"/> |
| | | 204 | | public bool TryRead( [ MaybeNullWhen( false ) ] out T4 item ) => _input.TryRead( out item ); |
| | | 205 | | |
| | | 206 | | /// <inheritdoc cref="M:BroadcastChannelMux.ChannelMux`4.ReplaceChannel(BroadcastChannel.BroadcastChannelWriter{`0,B |
| | | 207 | | public IEnumerable<T4> ReplaceChannel( IBroadcastChannelAddReaderProvider<T4> newChannel, bool force = false ) { |
| | | 208 | | if ( force || this._input.IsComplete ) { |
| | | 209 | | ArgumentNullException.ThrowIfNull( newChannel ); |
| | | 210 | | this.resetOneChannel( this._input ); |
| | | 211 | | var oldMuxInput = Interlocked.Exchange( ref _input, new ChannelMuxInput<T4>( newChannel, this ) ); |
| | | 212 | | oldMuxInput.Dispose(); |
| | | 213 | | return oldMuxInput; |
| | | 214 | | } |
| | | 215 | | return ChannelNotClosedException.Throw<IEnumerable<T4>>(); |
| | | 216 | | } |
| | | 217 | | |
| | | 218 | | /* |
| | | 219 | | * Disposal |
| | | 220 | | */ |
| | | 221 | | |
| | | 222 | | private bool _isDisposed = false; |
| | | 223 | | |
| | | 224 | | /// <inheritdoc cref="IDisposable.Dispose" /> |
| | | 225 | | protected override void Dispose( bool disposing ) { |
| | | 226 | | if ( !_isDisposed ) { |
| | | 227 | | if ( disposing ) { |
| | | 228 | | _input.Dispose(); |
| | | 229 | | } |
| | | 230 | | _isDisposed = true; |
| | | 231 | | } |
| | | 232 | | base.Dispose( disposing ); |
| | | 233 | | } |
| | | 234 | | } |
| | | 235 | | |
| | | 236 | | /// <inheritdoc cref="ChannelMux{T1,T2}"/> |
| | | 237 | | public class ChannelMux<T1, T2, T3, T4, T5> : ChannelMux<T1, T2, T3, T4> { |
| | | 238 | | private ChannelMuxInput<T5> _input; |
| | | 239 | | |
| | | 240 | | /// <inheritdoc cref="ChannelMux{T1,T2}"/> |
| | | 241 | | public ChannelMux( |
| | | 242 | | IBroadcastChannelAddReaderProvider<T1> channel1, |
| | | 243 | | IBroadcastChannelAddReaderProvider<T2> channel2, |
| | | 244 | | IBroadcastChannelAddReaderProvider<T3> channel3, |
| | | 245 | | IBroadcastChannelAddReaderProvider<T4> channel4, |
| | | 246 | | IBroadcastChannelAddReaderProvider<T5> channel5 |
| | | 247 | | ) : this( channel1, channel2, channel3, channel4, channel5, totalChannels: 5 ) { } |
| | | 248 | | |
| | | 249 | | /// <inheritdoc cref="M:mkmrk.Channels.ChannelMux`2.#ctor(mkmrk.Channels.BroadcastChannelWriter{`0,mkmrk.Channels.IB |
| | | 250 | | protected ChannelMux( |
| | | 251 | | IBroadcastChannelAddReaderProvider<T1> channel1, |
| | | 252 | | IBroadcastChannelAddReaderProvider<T2> channel2, |
| | | 253 | | IBroadcastChannelAddReaderProvider<T3> channel3, |
| | | 254 | | IBroadcastChannelAddReaderProvider<T4> channel4, |
| | | 255 | | IBroadcastChannelAddReaderProvider<T5> channel5, |
| | | 256 | | int totalChannels |
| | | 257 | | ) : base( channel1, channel2, channel3, channel4, totalChannels: totalChannels ) { |
| | | 258 | | ArgumentNullException.ThrowIfNull( channel5 ); |
| | | 259 | | _input = new ChannelMuxInput<T5>( channel5, this ); |
| | | 260 | | } |
| | | 261 | | |
| | | 262 | | /// <inheritdoc cref="System.Threading.Channels.ChannelReader{T}.TryRead"/> |
| | | 263 | | public bool TryRead( [ MaybeNullWhen( false ) ] out T5 item ) => _input.TryRead( out item ); |
| | | 264 | | |
| | | 265 | | /// <inheritdoc cref="M:BroadcastChannelMux.ChannelMux`5.ReplaceChannel(BroadcastChannel.BroadcastChannelWriter{`0,B |
| | | 266 | | public IEnumerable<T5> ReplaceChannel( IBroadcastChannelAddReaderProvider<T5> newChannel, bool force = false ) { |
| | | 267 | | if ( force || this._input.IsComplete ) { |
| | | 268 | | ArgumentNullException.ThrowIfNull( newChannel ); |
| | | 269 | | this.resetOneChannel( this._input ); |
| | | 270 | | var oldMuxInput = Interlocked.Exchange( ref _input, new ChannelMuxInput<T5>( newChannel, this ) ); |
| | | 271 | | oldMuxInput.Dispose(); |
| | | 272 | | return oldMuxInput; |
| | | 273 | | } |
| | | 274 | | return ChannelNotClosedException.Throw<IEnumerable<T5>>(); |
| | | 275 | | } |
| | | 276 | | |
| | | 277 | | /* |
| | | 278 | | * Disposal |
| | | 279 | | */ |
| | | 280 | | |
| | | 281 | | private bool _isDisposed = false; |
| | | 282 | | |
| | | 283 | | /// <inheritdoc cref="IDisposable.Dispose" /> |
| | | 284 | | protected override void Dispose( bool disposing ) { |
| | | 285 | | if ( !_isDisposed ) { |
| | | 286 | | if ( disposing ) { |
| | | 287 | | _input.Dispose(); |
| | | 288 | | } |
| | | 289 | | _isDisposed = true; |
| | | 290 | | } |
| | | 291 | | base.Dispose( disposing ); |
| | | 292 | | } |
| | | 293 | | } |
| | | 294 | | |
| | | 295 | | /// <inheritdoc cref="ChannelMux{T1,T2}"/> |
| | | 296 | | public class ChannelMux<T1, T2, T3, T4, T5, T6> : ChannelMux<T1, T2, T3, T4, T5> { |
| | | 297 | | private ChannelMuxInput<T6> _input; |
| | | 298 | | |
| | | 299 | | /// <inheritdoc cref="ChannelMux{T1,T2}"/> |
| | | 300 | | public ChannelMux( |
| | | 301 | | IBroadcastChannelAddReaderProvider<T1> channel1, |
| | | 302 | | IBroadcastChannelAddReaderProvider<T2> channel2, |
| | | 303 | | IBroadcastChannelAddReaderProvider<T3> channel3, |
| | | 304 | | IBroadcastChannelAddReaderProvider<T4> channel4, |
| | | 305 | | IBroadcastChannelAddReaderProvider<T5> channel5, |
| | | 306 | | IBroadcastChannelAddReaderProvider<T6> channel6 |
| | | 307 | | ) : this( channel1, channel2, channel3, channel4, channel5, channel6, totalChannels: 6 ) { } |
| | | 308 | | |
| | | 309 | | /// <inheritdoc cref="M:mkmrk.Channels.ChannelMux`2.#ctor(mkmrk.Channels.BroadcastChannelWriter{`0,mkmrk.Channels.IB |
| | | 310 | | protected ChannelMux( |
| | | 311 | | IBroadcastChannelAddReaderProvider<T1> channel1, |
| | | 312 | | IBroadcastChannelAddReaderProvider<T2> channel2, |
| | | 313 | | IBroadcastChannelAddReaderProvider<T3> channel3, |
| | | 314 | | IBroadcastChannelAddReaderProvider<T4> channel4, |
| | | 315 | | IBroadcastChannelAddReaderProvider<T5> channel5, |
| | | 316 | | IBroadcastChannelAddReaderProvider<T6> channel6, |
| | | 317 | | int totalChannels |
| | | 318 | | ) : base( channel1, channel2, channel3, channel4, channel5, totalChannels: totalChannels ) { |
| | | 319 | | ArgumentNullException.ThrowIfNull( channel6 ); |
| | | 320 | | _input = new ChannelMuxInput<T6>( channel6, this ); |
| | | 321 | | } |
| | | 322 | | |
| | | 323 | | /// <inheritdoc cref="System.Threading.Channels.ChannelReader{T}.TryRead"/> |
| | | 324 | | public bool TryRead( [ MaybeNullWhen( false ) ] out T6 item ) => _input.TryRead( out item ); |
| | | 325 | | |
| | | 326 | | /// <inheritdoc cref="M:BroadcastChannelMux.ChannelMux`6.ReplaceChannel(BroadcastChannel.BroadcastChannelWriter{`0,B |
| | | 327 | | public IEnumerable<T6> ReplaceChannel( IBroadcastChannelAddReaderProvider<T6> newChannel, bool force = false ) { |
| | | 328 | | if ( force || this._input.IsComplete ) { |
| | | 329 | | ArgumentNullException.ThrowIfNull( newChannel ); |
| | | 330 | | this.resetOneChannel( this._input ); |
| | | 331 | | var oldMuxInput = Interlocked.Exchange( ref _input, new ChannelMuxInput<T6>( newChannel, this ) ); |
| | | 332 | | oldMuxInput.Dispose(); |
| | | 333 | | return oldMuxInput; |
| | | 334 | | } |
| | | 335 | | return ChannelNotClosedException.Throw<IEnumerable<T6>>(); |
| | | 336 | | } |
| | | 337 | | |
| | | 338 | | /* |
| | | 339 | | * Disposal |
| | | 340 | | */ |
| | | 341 | | |
| | | 342 | | private bool _isDisposed = false; |
| | | 343 | | |
| | | 344 | | /// <inheritdoc cref="IDisposable.Dispose" /> |
| | | 345 | | protected override void Dispose( bool disposing ) { |
| | | 346 | | if ( !_isDisposed ) { |
| | | 347 | | if ( disposing ) { |
| | | 348 | | _input.Dispose(); |
| | | 349 | | } |
| | | 350 | | _isDisposed = true; |
| | | 351 | | } |
| | | 352 | | base.Dispose( disposing ); |
| | | 353 | | } |
| | | 354 | | } |
| | | 355 | | |
| | | 356 | | /// <inheritdoc cref="ChannelMux{T1,T2}"/> |
| | | 357 | | public class ChannelMux<T1, T2, T3, T4, T5, T6, T7> : ChannelMux<T1, T2, T3, T4, T5, T6> { |
| | | 358 | | private ChannelMuxInput<T7> _input; |
| | | 359 | | |
| | | 360 | | /// <inheritdoc cref="ChannelMux{T1,T2}"/> |
| | | 361 | | public ChannelMux( |
| | | 362 | | IBroadcastChannelAddReaderProvider<T1> channel1, |
| | | 363 | | IBroadcastChannelAddReaderProvider<T2> channel2, |
| | | 364 | | IBroadcastChannelAddReaderProvider<T3> channel3, |
| | | 365 | | IBroadcastChannelAddReaderProvider<T4> channel4, |
| | | 366 | | IBroadcastChannelAddReaderProvider<T5> channel5, |
| | | 367 | | IBroadcastChannelAddReaderProvider<T6> channel6, |
| | | 368 | | IBroadcastChannelAddReaderProvider<T7> channel7 |
| | | 369 | | ) : this( channel1, channel2, channel3, channel4, channel5, channel6, channel7, totalChannels: 7 ) { } |
| | | 370 | | |
| | | 371 | | /// <inheritdoc cref="M:mkmrk.Channels.ChannelMux`2.#ctor(mkmrk.Channels.BroadcastChannelWriter{`0,mkmrk.Channels.IB |
| | | 372 | | protected ChannelMux( |
| | | 373 | | IBroadcastChannelAddReaderProvider<T1> channel1, |
| | | 374 | | IBroadcastChannelAddReaderProvider<T2> channel2, |
| | | 375 | | IBroadcastChannelAddReaderProvider<T3> channel3, |
| | | 376 | | IBroadcastChannelAddReaderProvider<T4> channel4, |
| | | 377 | | IBroadcastChannelAddReaderProvider<T5> channel5, |
| | | 378 | | IBroadcastChannelAddReaderProvider<T6> channel6, |
| | | 379 | | IBroadcastChannelAddReaderProvider<T7> channel7, |
| | | 380 | | int totalChannels |
| | | 381 | | ) : base( channel1, channel2, channel3, channel4, channel5, channel6, totalChannels: totalChannels ) { |
| | | 382 | | ArgumentNullException.ThrowIfNull( channel7 ); |
| | | 383 | | _input = new ChannelMuxInput<T7>( channel7, this ); |
| | | 384 | | } |
| | | 385 | | |
| | | 386 | | /// <inheritdoc cref="System.Threading.Channels.ChannelReader{T}.TryRead"/> |
| | | 387 | | public bool TryRead( [ MaybeNullWhen( false ) ] out T7 item ) => _input.TryRead( out item ); |
| | | 388 | | |
| | | 389 | | /// <inheritdoc cref="M:BroadcastChannelMux.ChannelMux`7.ReplaceChannel(BroadcastChannel.BroadcastChannelWriter{`0,B |
| | | 390 | | public IEnumerable<T7> ReplaceChannel( IBroadcastChannelAddReaderProvider<T7> newChannel, bool force = false ) { |
| | | 391 | | if ( force || this._input.IsComplete ) { |
| | | 392 | | ArgumentNullException.ThrowIfNull( newChannel ); |
| | | 393 | | this.resetOneChannel( this._input ); |
| | | 394 | | var oldMuxInput = Interlocked.Exchange( ref _input, new ChannelMuxInput<T7>( newChannel, this ) ); |
| | | 395 | | oldMuxInput.Dispose(); |
| | | 396 | | return oldMuxInput; |
| | | 397 | | } |
| | | 398 | | return ChannelNotClosedException.Throw<IEnumerable<T7>>(); |
| | | 399 | | } |
| | | 400 | | |
| | | 401 | | /* |
| | | 402 | | * Disposal |
| | | 403 | | */ |
| | | 404 | | |
| | | 405 | | private bool _isDisposed = false; |
| | | 406 | | |
| | | 407 | | /// <inheritdoc cref="IDisposable.Dispose" /> |
| | | 408 | | protected override void Dispose( bool disposing ) { |
| | | 409 | | if ( !_isDisposed ) { |
| | | 410 | | if ( disposing ) { |
| | | 411 | | _input.Dispose(); |
| | | 412 | | } |
| | | 413 | | _isDisposed = true; |
| | | 414 | | } |
| | | 415 | | base.Dispose( disposing ); |
| | | 416 | | } |
| | | 417 | | } |
| | | 418 | | |
| | | 419 | | /// <inheritdoc cref="ChannelMux{T1,T2}"/> |
| | | 420 | | public class ChannelMux<T1, T2, T3, T4, T5, T6, T7, T8> : ChannelMux<T1, T2, T3, T4, T5, T6, T7> { |
| | | 421 | | private ChannelMuxInput<T8> _input; |
| | | 422 | | |
| | | 423 | | /// <inheritdoc cref="ChannelMux{T1,T2}"/> |
| | | 424 | | public ChannelMux( |
| | | 425 | | IBroadcastChannelAddReaderProvider<T1> channel1, |
| | | 426 | | IBroadcastChannelAddReaderProvider<T2> channel2, |
| | | 427 | | IBroadcastChannelAddReaderProvider<T3> channel3, |
| | | 428 | | IBroadcastChannelAddReaderProvider<T4> channel4, |
| | | 429 | | IBroadcastChannelAddReaderProvider<T5> channel5, |
| | | 430 | | IBroadcastChannelAddReaderProvider<T6> channel6, |
| | | 431 | | IBroadcastChannelAddReaderProvider<T7> channel7, |
| | | 432 | | IBroadcastChannelAddReaderProvider<T8> channel8 |
| | 0 | 433 | | ) : this( channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8, totalChannels: 8 ) { } |
| | | 434 | | |
| | | 435 | | /// <inheritdoc cref="M:mkmrk.Channels.ChannelMux`2.#ctor(mkmrk.Channels.BroadcastChannelWriter{`0,mkmrk.Channels.IB |
| | | 436 | | // ReSharper disable once MemberCanBePrivate.Global |
| | | 437 | | protected ChannelMux( |
| | | 438 | | IBroadcastChannelAddReaderProvider<T1> channel1, |
| | | 439 | | IBroadcastChannelAddReaderProvider<T2> channel2, |
| | | 440 | | IBroadcastChannelAddReaderProvider<T3> channel3, |
| | | 441 | | IBroadcastChannelAddReaderProvider<T4> channel4, |
| | | 442 | | IBroadcastChannelAddReaderProvider<T5> channel5, |
| | | 443 | | IBroadcastChannelAddReaderProvider<T6> channel6, |
| | | 444 | | IBroadcastChannelAddReaderProvider<T7> channel7, |
| | | 445 | | IBroadcastChannelAddReaderProvider<T8> channel8, |
| | | 446 | | int totalChannels |
| | 0 | 447 | | ) : base( channel1, channel2, channel3, channel4, channel5, channel6, channel7, totalChannels: totalChannels ) { |
| | 0 | 448 | | ArgumentNullException.ThrowIfNull( channel8 ); |
| | 0 | 449 | | _input = new ChannelMuxInput<T8>( channel8, this ); |
| | 0 | 450 | | } |
| | | 451 | | |
| | | 452 | | /// <inheritdoc cref="System.Threading.Channels.ChannelReader{T}.TryRead"/> |
| | 0 | 453 | | public bool TryRead( [ MaybeNullWhen( false ) ] out T8 item ) => _input.TryRead( out item ); |
| | | 454 | | |
| | | 455 | | /// <inheritdoc cref="M:BroadcastChannelMux.ChannelMux`8.ReplaceChannel(BroadcastChannel.BroadcastChannelWriter{`0,B |
| | 0 | 456 | | public IEnumerable<T8> ReplaceChannel( IBroadcastChannelAddReaderProvider<T8> newChannel, bool force = false ) { |
| | 0 | 457 | | if ( force || this._input.IsComplete ) { |
| | 0 | 458 | | ArgumentNullException.ThrowIfNull( newChannel ); |
| | 0 | 459 | | this.resetOneChannel( this._input ); |
| | 0 | 460 | | var oldMuxInput = Interlocked.Exchange( ref _input, new ChannelMuxInput<T8>( newChannel, this ) ); |
| | 0 | 461 | | oldMuxInput.Dispose(); |
| | 0 | 462 | | return oldMuxInput; |
| | | 463 | | } |
| | 0 | 464 | | return ChannelNotClosedException.Throw<IEnumerable<T8>>(); |
| | 0 | 465 | | } |
| | | 466 | | |
| | | 467 | | /* |
| | | 468 | | * Disposal |
| | | 469 | | */ |
| | | 470 | | |
| | 0 | 471 | | private bool _isDisposed = false; |
| | | 472 | | |
| | | 473 | | /// <inheritdoc cref="IDisposable.Dispose" /> |
| | 0 | 474 | | protected override void Dispose( bool disposing ) { |
| | 0 | 475 | | if ( !_isDisposed ) { |
| | 0 | 476 | | if ( disposing ) { |
| | 0 | 477 | | _input.Dispose(); |
| | 0 | 478 | | } |
| | 0 | 479 | | _isDisposed = true; |
| | 0 | 480 | | } |
| | 0 | 481 | | base.Dispose( disposing ); |
| | 0 | 482 | | } |
| | | 483 | | } |