< Summary - erichiller/mkmrk.Channels coverage

Information
Class: mkmrk.Channels.BroadcastChannelWriter<TData, TResponse>
Assembly: mkmrk.Channels
File(s): /home/runner/work/mkmrk.Channels/mkmrk.Channels/src/mkmrk.Channels/BroadcastChannel/BroadcastChannelWriter.cs
Tag: 161_8859726157
Line coverage
88%
Covered lines: 8
Uncovered lines: 1
Coverable lines: 9
Total lines: 296
Line coverage: 88.8%
Branch coverage
N/A
Covered branches: 0
Total branches: 0
Branch coverage: N/A
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Coverage history

Metrics

File(s)

/home/runner/work/mkmrk.Channels/mkmrk.Channels/src/mkmrk.Channels/BroadcastChannel/BroadcastChannelWriter.cs

#LineLine coverage
 1using System;
 2using System.Collections.Generic;
 3using System.Collections.Immutable;
 4using System.ComponentModel;
 5using System.Diagnostics.CodeAnalysis;
 6using System.Linq;
 7using System.Threading;
 8using System.Threading.Channels;
 9using System.Threading.Tasks;
 10
 11using Microsoft.Extensions.Logging;
 12using Microsoft.Extensions.Logging.Abstractions;
 13
 14namespace mkmrk.Channels;
 15
 16/// <inheritdoc cref="IBroadcastChannelWriter{TData,TResponse}" />
 17public class BroadcastChannelWriter<TData, TResponse> : ChannelWriter<TData>, IBroadcastChannelWriter<TData, TResponse>,
 18    private readonly ChannelReader<TResponse>                          _responseReader;
 19    private readonly Channel<TResponse>                                _responseChannel;
 20    private          ImmutableArray<ChannelWriter<TData>>              _outputWriters = ImmutableArray<ChannelWriter<TDa
 21    private readonly object                                            _readersLock   = new object();
 22    private readonly ILoggerFactory?                                   _loggerFactory;
 23    private readonly ILogger<BroadcastChannelWriter<TData, TResponse>> _logger;
 24
 25    /// <inheritdoc />
 26    public int ReaderCount {
 27        get {
 28            lock ( this._readersLock ) {
 29                return this._outputWriters.Length;
 30            }
 31        }
 32    }
 33
 34    /// <summary>
 35    /// Necessary resources to be written to by this <see cref="BroadcastChannelWriter{TData,TResponse}"/>
 36    /// </summary>
 37    /// <param name="DataChannelReader"><see cref="ChannelReader{T}"/> for <c>TData</c> messages</param>
 38    /// <param name="WriterHash">Hash code of writer to use for <paramref name="RemoveOutputWriterCallback"/></param>
 39    /// <param name="RemoveOutputWriterCallback">Deregister / Dispose callback</param>
 40    /// <param name="ResponseChannelWriter"><see cref="ChannelWriter{T}"/> for <c>TResponse</c> messages</param>
 41    /// <param name="Logger"><see cref="BroadcastChannelReader{TData,TResponse}"/> specific Logger</param>
 142    internal sealed record ReaderConfiguration(
 143        ChannelReader<TData>                              DataChannelReader,
 144        int                                               WriterHash,
 145        RemoveWriterByHashCode                            RemoveOutputWriterCallback,
 146        ChannelWriter<TResponse>                          ResponseChannelWriter,
 147        ILogger<BroadcastChannelReader<TData, TResponse>> Logger
 148    );
 49
 50    /// <summary>
 51    /// This is only for Dependency Injection and internal creation by <see cref="BroadcastChannel{TData,TResponse}.Writ
 52    /// and should not be used directly, instead use <see cref="BroadcastChannel{TData,TResponse}.Writer"/>.
 53    /// </summary>
 54    [ EditorBrowsable( EditorBrowsableState.Never ) ]
 55    // ReSharper disable once UnusedParameter.Local
 56    public BroadcastChannelWriter( ILoggerFactory? loggerFactory = null ) {
 57        this._responseChannel = Channel.CreateUnbounded<TResponse>(
 58            new UnboundedChannelOptions() {
 59                SingleReader = true,
 60                SingleWriter = false
 61            }
 62        );
 63        this._loggerFactory  = loggerFactory;
 64        this._logger         = loggerFactory?.CreateLogger<BroadcastChannelWriter<TData, TResponse>>() ?? NullLogger<Bro
 65        this._responseReader = this._responseChannel.Reader;
 66        this._logger.LogTrace( "Constructed: {Writer}", this );
 67    }
 68
 69    /* ************************************************** */
 70
 71    /// <inheritdoc cref="IBroadcastChannelWriter{TData,TResponse}.GetNewReaderConfiguration" />
 72    internal ReaderConfiguration GetNewReaderConfiguration( ) {
 73        Channel<TData> dataChannel = Channel.CreateUnbounded<TData>( new UnboundedChannelOptions() {
 74                                                                         SingleReader = true,
 75                                                                         SingleWriter = true
 76                                                                     } );
 77        lock ( this._readersLock ) {
 78            // TODO: try the below ;; TEST IT! -- use in any other writes to _outputWriters ?
 79            // ImmutableInterlocked.InterlockedExchange( ref this._outputWriters, this._outputWriters.Add( dataChannel.W
 80            this._outputWriters = this._outputWriters.Add( dataChannel.Writer );
 81        }
 82
 83        this._logger.LogTrace( $"{nameof(GetNewReaderConfiguration)} {nameof(ReaderConfiguration.WriterHash)} is {{Write
 84        return new ReaderConfiguration( DataChannelReader: dataChannel.Reader,
 85                                        WriterHash: dataChannel.Writer.GetHashCode(),
 86                                        RemoveOutputWriterCallback: this.removeReader,
 87                                        ResponseChannelWriter: this._responseChannel.Writer,
 88                                        Logger: this._loggerFactory?.CreateLogger<BroadcastChannelReader<TData, TRespons
 89                                                ?? NullLogger<BroadcastChannelReader<TData, TResponse>>.Instance );
 90    }
 91
 92    /// <inheritdoc />
 93    ReaderConfiguration IBroadcastChannelWriter<TData, TResponse>.GetNewReaderConfiguration( ) => this.GetNewReaderConfi
 94
 95    IBroadcastChannelReader<TData, TResponse> IBroadcastChannelWriter<TData, TResponse>.CreateReader( ) {
 96        Channel<TData> dataChannel = Channel.CreateUnbounded<TData>( new UnboundedChannelOptions() {
 97                                                                         SingleReader = true,
 98                                                                         SingleWriter = true
 99                                                                     } );
 100        BroadcastChannelReader<TData, TResponse> reader = new BroadcastChannelReader<TData, TResponse>(
 101            dataChannel.Reader,
 102            dataChannel.Writer.GetHashCode(),
 103            this._responseChannel.Writer,
 104            this.removeReader,
 105            this._loggerFactory?.CreateLogger<BroadcastChannelReader<TData, TResponse>>() ?? NullLogger<BroadcastChannel
 106        this._logger.LogTrace( "Created Reader: {Reader}", reader );
 107        lock ( this._readersLock ) {
 108            this._outputWriters = this._outputWriters.Add( dataChannel.Writer );
 109        }
 110
 111        this._logger.LogTrace( "Created Reader, Reader count is now {Count}", ReaderCount );
 112
 113        return reader;
 114    }
 115
 116    /// <inheritdoc />
 117    RemoveWriterByHashCode IBroadcastChannelAddReaderProvider<TData>.AddReader( ChannelWriter<TData> reader ) => this.Ad
 118
 119    internal RemoveWriterByHashCode AddReader( ChannelWriter<TData> reader ) {
 120        this._logger.LogTrace( "Created Reader: {Reader}", reader.ToString() );
 121        lock ( this._readersLock ) {
 122            this._outputWriters = this._outputWriters.Add( reader );
 123        }
 124
 125        this._logger.LogTrace( "Created Reader, Reader count is now {Count}", ReaderCount );
 126
 127        return this.removeReader;
 128    }
 129
 130
 131    private void removeReader( in int writerHash ) {
 132        ChannelWriter<TData>? writerFound = null;
 133        lock ( this._readersLock ) {
 134            for ( int i = 0 ; i < _outputWriters.Length ; i++ ) {
 135                if ( _outputWriters[ i ].GetHashCode() == writerHash ) {
 136                    writerFound         = _outputWriters[ i ];
 137                    this._outputWriters = this._outputWriters.Remove( writerFound );
 138                    writerFound.TryComplete();
 139                }
 140            }
 141        }
 142        if ( writerFound is null ) {
 143            ThrowHelper.ThrowKeyNotFoundException( $"ChannelWriter with hash {writerHash} was not found." );
 144        }
 145    }
 146
 147
 148    private bool _isDisposed;
 149
 150    /// <inheritdoc />
 151    public void Dispose( ) {
 152        this.Dispose( true );
 153        GC.SuppressFinalize( this );
 154    }
 155
 156    // ReSharper disable once InconsistentNaming
 157    /// <inheritdoc cref="IDisposable.Dispose"/>
 158    protected virtual void Dispose( bool disposing ) {
 159        this._logger.LogTrace( "Dispose({Disposing})", disposing );
 160        if ( this._isDisposed ) {
 161            return;
 162        }
 163
 164        if ( disposing ) {
 165            lock ( this._readersLock ) {
 166                foreach ( var channelWriter in this._outputWriters ) {
 167                    channelWriter.TryComplete();
 168                }
 169            }
 170
 171            this._responseChannel.Writer.TryComplete();
 172        }
 173
 174        this._isDisposed = true;
 175    }
 176
 177    /* **** Response **** */
 178
 179    /// <summary>
 180    /// Return <see cref="ChannelReader{T}"/> for <typeparamref name="TResponse"/>.
 181    /// </summary>
 182    public ChannelReader<TResponse> Responses => this._responseReader;
 183
 184    /// <inheritdoc cref="ChannelReader{T}.ReadAllAsync"/>
 185    public IAsyncEnumerable<TResponse> ReadAllResponsesAsync( CancellationToken ct ) => this._responseReader.ReadAllAsyn
 186
 187    /// <inheritdoc cref="ChannelReader{T}.ReadAsync"/>
 188    public ValueTask<TResponse> ReadResponseAsync( CancellationToken ct ) => this._responseReader.ReadAsync( ct );
 189
 190    /// <inheritdoc cref="ChannelReader{T}.TryPeek"/>
 191    public bool TryPeekResponse( [ MaybeNullWhen( false ) ] out TResponse response ) => this._responseReader.TryPeek( ou
 192
 193    /// <inheritdoc cref="ChannelReader{T}.TryRead"/>
 194    public bool TryReadResponse( [ MaybeNullWhen( false ) ] out TResponse response ) => this._responseReader.TryRead( ou
 195
 196    /// <inheritdoc cref="ChannelReader{T}.WaitToReadAsync"/>
 197    public ValueTask<bool> WaitToReadResponseAsync( CancellationToken ct = default ) => this._responseReader.WaitToReadA
 198
 199
 200    /* **** Data **** */
 201
 202    /// <inheritdoc cref="IBroadcastChannelWriter{TData}.TryComplete"/>
 203    public override bool TryComplete( Exception? error = null ) {
 204        bool result = true;
 205        lock ( this._readersLock ) {
 206            foreach ( ChannelWriter<TData> channelWriter in this._outputWriters ) {
 207                result &= channelWriter.TryComplete( error );
 208            }
 209        }
 210
 211        return result;
 212    }
 213
 214    /// <inheritdoc cref="IBroadcastChannelWriter{TData}.TryWrite(TData)"/>
 215    public override bool TryWrite( TData item ) {
 216        lock ( this._readersLock ) {
 217            if ( this._outputWriters.Length == 1 ) {
 218                return this._outputWriters[ 0 ].TryWrite( item );
 219            }
 220
 221            if ( this._outputWriters.Length == 0 ) { return true; } // this returns true as if it had written regardless
 222
 223            bool result = true;
 224            foreach ( var channelWriter in this._outputWriters ) {
 225                result &= channelWriter.TryWrite( item );
 226            }
 227
 228            return result;
 229        }
 230    }
 231
 232
 233    /// <inheritdoc />
 234    public bool TryWrite( IEnumerable<TData> items ) {
 235        lock ( this._readersLock ) {
 236            TData[] itemsArray = items as TData[] ?? items.ToArray();
 237            bool    result     = true;
 238            this._logger.LogTrace( "Writing {Count} items to {ReadersCount} readers", itemsArray.Length, this._outputWri
 239            if ( this._outputWriters.Length == 1 ) {
 240                foreach ( TData item in itemsArray ) {
 241                    result &= this._outputWriters[ 0 ].TryWrite( item );
 242                }
 243
 244                return result;
 245            }
 246
 247            if ( this._outputWriters.Length == 0 ) { return true; } // this returns true as if it had written regardless
 248
 249            foreach ( var channelWriter in this._outputWriters ) {
 250                foreach ( TData item in itemsArray ) {
 251                    result &= channelWriter.TryWrite( item );
 252                }
 253            }
 254
 255            return result;
 256        }
 257    }
 258
 259    /// <inheritdoc cref="IBroadcastChannelWriter{TData}.WaitToWriteAsync"/>
 260    public override ValueTask<bool> WaitToWriteAsync( CancellationToken cancellationToken = default )
 261        => new ValueTask<bool>( true );
 262
 263    /// <inheritdoc cref="IBroadcastChannelWriter{TData}.WriteAsync"/>
 264    /// <remarks>This runs slower than using <see cref="WaitToWriteAsync"/> and <see cref="TryWrite(TData)"/>.</remarks>
 265    public override ValueTask WriteAsync( TData item, CancellationToken cancellationToken = default ) {
 266        lock ( this._readersLock ) {
 267            if ( this._outputWriters.Length == 0 ) {
 268                return ValueTask.CompletedTask;
 269            }
 270
 271            if ( this._outputWriters.Length == 1 ) {
 272                return this._outputWriters[ 0 ].WriteAsync( item, cancellationToken );
 273            }
 274
 0275            return this._outputWriters.Select( r => r.WriteAsync( item, cancellationToken ) ).ToArray().WhenAll();
 276        }
 277    }
 278
 279    /// <inheritdoc />
 280    public override string ToString( ) {
 281        lock ( this._readersLock ) {
 1282            return $"{this.GetType().GenericTypeShortDescriptor( useShortGenericName: false )} [Hash: {this.GetHashCode(
 283        }
 284    }
 285}
 286
 287/// <inheritdoc />
 288/// <remarks>
 289/// <see cref="BroadcastChannelWriter{TData}"/> with a default Response type of <see cref="IBroadcastChannelResponse"/> 
 290/// </remarks>
 291public class BroadcastChannelWriter<TData> : BroadcastChannelWriter<TData, IBroadcastChannelResponse> {
 292    /// <inheritdoc />
 293    [ EditorBrowsable( EditorBrowsableState.Never ) ]
 294    public BroadcastChannelWriter( ILoggerFactory loggerFactory ) :
 295        base( loggerFactory ) { }
 296}