- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
ASP.NET Core中有很多RateLimit组件,.NET 7甚至推出了官方版本。不过这些组件的主要目标是限制客户端访问服务的频率,在HTTP服务器崩溃前主动拒绝部分请求。如果请求没有被拒绝服务会尽可能调用资源尽快处理。 现在有一个问题,有什么办法限制响应的发送速率吗?这在一些需要长时间传输流式数据的情况时很有用,避免少量请求耗尽网络带宽,尽可能同时服务更多请求.
本文节选自我的新书《C#与.NET6 开发从入门到实践》12.11 流量控制。实现方式偏向知识讲解和教学,不保证组件稳定性,不建议直接在产品中使用。有关新书的更多介绍欢迎查看 《C#与.NET6 开发从入门到实践》预售,作者亲自来打广告了! 。
用过百度网盘的人应该都深有体会,如果没有会员,下载速度会非常慢。实现这种效果的方法有两种:控制TCP协议的滑动窗口大小;控制响应流的写入大小和频率。偏向系统底层的流量控制软件因为无法干涉软件中的流,所以一般会直接控制内核TCP协议的滑动窗口大小;而下载软件等客户端应用通常直接控制流的写入和读取,此时TCP协议的拥塞控制算法会自动调整滑动窗口大小。这种流量控制对提供大型多媒体资源的应用(例如在线视频网站)非常重要,能防止一个请求的响应占用太多带宽影响其他请求的响应发送。 ASP.NET Core并没有原生提供相关功能,Nuget上也没有找到相关的程序包(截止截稿)。但其实利用ASP.NET Core提供的接口,是可以实现这个功能的。笔者以ASP.NET Core的响应压缩中间件为蓝本,实现了一个简单的响应限流中间件.
using System;
namespace AccessControlElementary;
/// <summary>
/// 支持流量控制的流
/// </summary>
public class ThrottlingStream : Stream
{
/// <summary>
/// 用于指定每秒可传输的无限字节数的常数。
/// </summary>
public const long Infinite = 0;
#region Private members
/// <summary>
/// 基础流
/// </summary>
private readonly Stream _baseStream;
/// <summary>
/// 每秒可通过基础流传输的最大字节数。
/// </summary>
private long _maximumBytesPerSecond;
/// <summary>
/// 自上次限制以来已传输的字节数。
/// </summary>
private long _byteCount;
/// <summary>
/// 最后一次限制的开始时间(毫秒)。
/// </summary>
private long _start;
#endregion
#region Properties
/// <summary>
/// 获取当前毫秒数。
/// </summary>
/// <value>当前毫秒数。</value>
protected long CurrentMilliseconds => Environment.TickCount;
/// <summary>
/// 获取或设置每秒可通过基础流传输的最大字节数。
/// </summary>
/// <value>每秒最大字节数。</value>
public long MaximumBytesPerSecond
{
get => _maximumBytesPerSecond;
set
{
if (MaximumBytesPerSecond != value)
{
_maximumBytesPerSecond = value;
Reset();
}
}
}
/// <summary>
/// 获取一个值,该值指示当前流是否支持读取。
/// </summary>
/// <returns>如果流支持读取,则为true;否则为false。</returns>
public override bool CanRead => _baseStream.CanRead;
/// <summary>
/// 获取估算的流当前的比特率(单位:bps)。
/// </summary>
public long CurrentBitsPerSecond { get; protected set; }
/// <summary>
/// 获取一个值,该值指示当前流是否支持定位。
/// </summary>
/// <value></value>
/// <returns>如果流支持定位,则为true;否则为false。</returns>
public override bool CanSeek => _baseStream.CanSeek;
/// <summary>
/// 获取一个值,该值指示当前流是否支持写入。
/// </summary>
/// <value></value>
/// <returns>如果流支持写入,则为true;否则为false。</returns>
public override bool CanWrite => _baseStream.CanWrite;
/// <summary>
/// 获取流的长度(以字节为单位)。
/// </summary>
/// <value></value>
/// <returns>一个long值,表示流的长度(字节)。</returns>
/// <exception cref="T:System.NotSupportedException">基础流不支持定位。</exception>
/// <exception cref="T:System.ObjectDisposedException">方法在流关闭后被调用。</exception>
public override long Length => _baseStream.Length;
/// <summary>
/// 获取或设置当前流中的位置。
/// </summary>
/// <value></value>
/// <returns>流中的当前位置。</returns>
/// <exception cref="T:System.IO.IOException">发生I/O错误。</exception>
/// <exception cref="T:System.NotSupportedException">基础流不支持定位。</exception>
/// <exception cref="T:System.ObjectDisposedException">方法在流关闭后被调用。</exception>
public override long Position
{
get => _baseStream.Position;
set => _baseStream.Position = value;
}
#endregion
#region Ctor
/// <summary>
/// 使用每秒可传输无限字节数的常数初始化 <see cref="T:ThrottlingStream"/> 类的新实例。
/// </summary>
/// <param name="baseStream">基础流。</param>
public ThrottlingStream(Stream baseStream)
: this(baseStream, Infinite) { }
/// <summary>
/// 初始化 <see cref="T:ThrottlingStream"/> 类的新实例。
/// </summary>
/// <param name="baseStream">基础流。</param>
/// <param name="maximumBytesPerSecond">每秒可通过基础流传输的最大字节数。</param>
/// <exception cref="ArgumentNullException">当 <see cref="baseStream"/> 是null引用时抛出。</exception>
/// <exception cref="ArgumentOutOfRangeException">当 <see cref="maximumBytesPerSecond"/> 是负数时抛出.</exception>
public ThrottlingStream(Stream baseStream, long maximumBytesPerSecond)
{
if (maximumBytesPerSecond < 0)
{
throw new ArgumentOutOfRangeException(nameof(maximumBytesPerSecond),
maximumBytesPerSecond, "The maximum number of bytes per second can't be negatie.");
}
_baseStream = baseStream ?? throw new ArgumentNullException(nameof(baseStream));
_maximumBytesPerSecond = maximumBytesPerSecond;
_start = CurrentMilliseconds;
_byteCount = 0;
}
#endregion
#region Public methods
/// <summary>
/// 清除此流的所有缓冲区,并将所有缓冲数据写入基础设备。
/// </summary>
/// <exception cref="T:System.IO.IOException">发生I/O错误。</exception>
public override void Flush() => _baseStream.Flush();
/// <summary>
/// 清除此流的所有缓冲区,并将所有缓冲数据写入基础设备。
/// </summary>
/// <exception cref="T:System.IO.IOException">发生I/O错误。</exception>
public override Task FlushAsync(CancellationToken cancellationToken) => _baseStream.FlushAsync(cancellationToken);
/// <summary>
/// 从当前流中读取字节序列,并将流中的位置前进读取的字节数。
/// </summary>
/// <param name="buffer">字节数组。当此方法返回时,缓冲区包含指定的字节数组,其值介于offset和(offset+count-1)之间,由从当前源读取的字节替换。</param>
/// <param name="offset">缓冲区中从零开始的字节偏移量,开始存储从当前流中读取的数据。</param>
/// <param name="count">从当前流中读取的最大字节数。</param>
/// <returns>
/// 读入缓冲区的字节总数。如果许多字节当前不可用,则该值可以小于请求的字节数;如果已到达流的结尾,则该值可以小于零(0)。
/// </returns>
/// <exception cref="T:System.ArgumentException">偏移量和计数之和大于缓冲区长度。</exception>
/// <exception cref="T:System.ObjectDisposedException">方法在流关闭后被调用。</exception>
/// <exception cref="T:System.NotSupportedException">基础流不支持读取。 </exception>
/// <exception cref="T:System.ArgumentNullException">缓冲区为null。</exception>
/// <exception cref="T:System.IO.IOException">发生I/O错误。</exception>
/// <exception cref="T:System.ArgumentOutOfRangeException">偏移量或读取的最大字节数为负。</exception>
public override int Read(byte[] buffer, int offset, int count)
{
Throttle(count);
return _baseStream.Read(buffer, offset, count);
}
/// <summary>
/// 从当前流中读取字节序列,并将流中的位置前进读取的字节数。
/// </summary>
/// <param name="buffer">字节数组。当此方法返回时,缓冲区包含指定的字节数组,其值介于offset和(offset+count-1)之间,由从当前源读取的字节替换。</param>
/// <param name="offset">缓冲区中从零开始的字节偏移量,开始存储从当前流中读取的数据。</param>
/// <param name="count">从当前流中读取的最大字节数。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>
/// 读入缓冲区的字节总数。如果许多字节当前不可用,则该值可以小于请求的字节数;如果已到达流的结尾,则该值可以小于零(0)。
/// </returns>
/// <exception cref="T:System.ArgumentException">偏移量和计数之和大于缓冲区长度。</exception>
/// <exception cref="T:System.ObjectDisposedException">方法在流关闭后被调用。</exception>
/// <exception cref="T:System.NotSupportedException">基础流不支持读取。 </exception>
/// <exception cref="T:System.ArgumentNullException">缓冲区为null。</exception>
/// <exception cref="T:System.IO.IOException">发生I/O错误。</exception>
/// <exception cref="T:System.ArgumentOutOfRangeException">偏移量或读取的最大字节数为负。</exception>
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return await ReadAsync(buffer.AsMemory(offset, count), cancellationToken);
}
/// <summary>
/// 从当前流中读取字节序列,并将流中的位置前进读取的字节数。
/// </summary>
/// <param name="buffer">内存缓冲区。当此方法返回时,缓冲区包含读取的数据。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>
/// 读入缓冲区的字节总数。如果许多字节当前不可用,则该值可以小于请求的字节数;如果已到达流的结尾,则该值可以小于零(0)。
/// </returns>
/// <exception cref="T:System.ArgumentException">偏移量和计数之和大于缓冲区长度。</exception>
/// <exception cref="T:System.ObjectDisposedException">方法在流关闭后被调用。</exception>
/// <exception cref="T:System.NotSupportedException">基础流不支持读取。 </exception>
/// <exception cref="T:System.ArgumentNullException">缓冲区为null。</exception>
/// <exception cref="T:System.IO.IOException">发生I/O错误。</exception>
/// <exception cref="T:System.ArgumentOutOfRangeException">偏移量或读取的最大字节数为负。</exception>
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
await ThrottleAsync(buffer.Length, cancellationToken);
return await _baseStream.ReadAsync(buffer, cancellationToken);
}
/// <summary>
/// 设置当前流中的位置。
/// </summary>
/// <param name="offset">相对于参考点的字节偏移量。</param>
/// <param name="origin">类型为<see cref="T:System.IO.SeekOrigin"/>的值,指示用于获取新位置的参考点。</param>
/// <returns>
/// 当前流中的新位置。
/// </returns>
/// <exception cref="T:System.IO.IOException">发生I/O错误。</exception>
/// <exception cref="T:System.NotSupportedException">基础流不支持定位,例如流是从管道或控制台输出构造的。</exception>
/// <exception cref="T:System.ObjectDisposedException">方法在流关闭后被调用。</exception>
public override long Seek(long offset, SeekOrigin origin)
{
return _baseStream.Seek(offset, origin);
}
/// <summary>
/// 设置当前流的长度。
/// </summary>
/// <param name="value">当前流的所需长度(字节)。</param>
/// <exception cref="T:System.NotSupportedException">基础流不支持写入和定位,例如流是从管道或控制台输出构造的。</exception>
/// <exception cref="T:System.IO.IOException">发生I/O错误。</exception>
/// <exception cref="T:System.ObjectDisposedException">方法在流关闭后被调用。</exception>
public override void SetLength(long value)
{
_baseStream.SetLength(value);
}
/// <summary>
/// 将字节序列写入当前流,并按写入的字节数前进此流中的当前位置。
/// </summary>
/// <param name="buffer">字节数组。此方法将要写入当前流的字节从缓冲区复制到当前流。</param>
/// <param name="offset">缓冲区中从零开始向当前流复制字节的字节偏移量。</param>
/// <param name="count">要写入当前流的字节数。</param>
/// <exception cref="T:System.IO.IOException">发生I/O错误。</exception>
/// <exception cref="T:System.NotSupportedException">基础流不支持写入。</exception>
/// <exception cref="T:System.ObjectDisposedException">方法在流关闭后被调用。</exception>
/// <exception cref="T:System.ArgumentNullException">缓冲区为null。</exception>
/// <exception cref="T:System.ArgumentException">偏移量和写入字节数之和大于缓冲区长度。</exception>
/// <exception cref="T:System.ArgumentOutOfRangeException">偏移量或写入字节数为负。</exception>
public override void Write(byte[] buffer, int offset, int count)
{
Throttle(count);
_baseStream.Write(buffer, offset, count);
}
/// <summary>
/// 将字节序列写入当前流,并按写入的字节数前进此流中的当前位置。
/// </summary>
/// <param name="buffer">字节数组。此方法将要写入当前流的字节从缓冲区复制到当前流。</param>
/// <param name="offset">缓冲区中从零开始向当前流复制字节的字节偏移量。</param>
/// <param name="count">要写入当前流的字节数。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <exception cref="T:System.IO.IOException">发生I/O错误。</exception>
/// <exception cref="T:System.NotSupportedException">基础流不支持写入。</exception>
/// <exception cref="T:System.ObjectDisposedException">方法在流关闭后被调用。</exception>
/// <exception cref="T:System.ArgumentNullException">缓冲区为null。</exception>
/// <exception cref="T:System.ArgumentException">偏移量和写入字节数之和大于缓冲区长度。</exception>
/// <exception cref="T:System.ArgumentOutOfRangeException">偏移量或写入字节数为负。</exception>
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
await WriteAsync(buffer.AsMemory(offset, count), cancellationToken);
}
/// <summary>
/// 将内存缓冲区写入当前流,并按写入的字节数前进此流中的当前位置。
/// </summary>
/// <param name="buffer">内存缓冲区。此方法将要写入当前流的字节从缓冲区复制到当前流。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <exception cref="T:System.IO.IOException">发生I/O错误。</exception>
/// <exception cref="T:System.NotSupportedException">基础流不支持写入。</exception>
/// <exception cref="T:System.ObjectDisposedException">方法在流关闭后被调用。</exception>
/// <exception cref="T:System.ArgumentNullException">缓冲区为null。</exception>
/// <exception cref="T:System.ArgumentException">偏移量和写入字节数之和大于缓冲区长度。</exception>
/// <exception cref="T:System.ArgumentOutOfRangeException">偏移量或写入字节数为负。</exception>
public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
await ThrottleAsync(buffer.Length, cancellationToken);
await _baseStream.WriteAsync(buffer, cancellationToken);
}
/// <summary>
/// 返回一个表示当前<see cref="T:System.Object" />的<see cref="T:System.String" />。
/// </summary>
/// <returns>
/// 表示当前<see cref="T:System.Object" />的<see cref="T:System.String" />。
/// </returns>
public override string ToString()
{
return _baseStream.ToString()!;
}
#endregion
#region Protected methods
/// <summary>
/// 如果比特率大于最大比特率,尝试限流
/// </summary>
/// <param name="bufferSizeInBytes">缓冲区大小(字节)。</param>
protected void Throttle(int bufferSizeInBytes)
{
var toSleep = CaculateThrottlingMilliseconds(bufferSizeInBytes);
if (toSleep > 1)
{
try
{
Thread.Sleep(toSleep);
}
catch (ThreadAbortException)
{
// 忽略ThreadAbortException。
}
// 睡眠已经完成,重置限流
Reset();
}
}
/// <summary>
/// 如果比特率大于最大比特率,尝试限流。
/// </summary>
/// <param name="bufferSizeInBytes">缓冲区大小(字节)。</param>
/// <param name="cancellationToken">取消令牌。</param>
protected async Task ThrottleAsync(int bufferSizeInBytes, CancellationToken cancellationToken)
{
var toSleep = CaculateThrottlingMilliseconds(bufferSizeInBytes);
if (toSleep > 1)
{
try
{
await Task.Delay(toSleep, cancellationToken);
}
catch (TaskCanceledException)
{
// 忽略TaskCanceledException。
}
// 延迟已经完成,重置限流。
Reset();
}
}
/// <summary>
/// 计算在操作流之前应当延迟的时间(单位:毫秒)。
/// 更新流当前的比特率。
/// </summary>
/// <param name="bufferSizeInBytes">缓冲区大小(字节)。</param>
/// <returns>应当延迟的时间(毫秒)。</returns>
protected int CaculateThrottlingMilliseconds(int bufferSizeInBytes)
{
int toSleep = 0;
// 确保缓冲区不为null
if (bufferSizeInBytes <= 0)
{
CurrentBitsPerSecond = 0;
}
else
{
_byteCount += bufferSizeInBytes;
long elapsedMilliseconds = CurrentMilliseconds - _start;
if (elapsedMilliseconds > 0)
{
// 计算当前瞬时比特率
var bp = _byteCount * 1000L;
var bps = bp / elapsedMilliseconds;
var avgBps = bps;
//如果bps大于最大bps,返回应当延迟的时间。
if (_maximumBytesPerSecond > 0 && bps > _maximumBytesPerSecond)
{
// 计算延迟时间
long wakeElapsed = bp / _maximumBytesPerSecond;
var result = (int)(wakeElapsed - elapsedMilliseconds);
// 计算平均比特率
var div = result / 1000.0;
avgBps = (long)(bps / (div == 0 ? 1 : div));
if (result > 1)
{
toSleep = result; ;
}
}
// 更新当前(平均)比特率
CurrentBitsPerSecond = (long)(avgBps / 8);
}
}
return toSleep;
}
/// <summary>
/// 将字节数重置为0,并将开始时间重置为当前时间。
/// </summary>
protected void Reset()
{
long difference = CurrentMilliseconds - _start;
// 只有在已知历史记录可用时间超过1秒时才重置计数器。
if (difference > 1000)
{
_byteCount = 0;
_start = CurrentMilliseconds;
}
}
#endregion
}
CaculateThrottleMilliseconds 、Throttle和ThrottleAsync是这个流的核心。CaculateThrottleMilliseconds方法负责计算在写入或读取流之前应该延迟多久和更新流当前的传输速率,Throttle和ThrottleAsync方法负责同步和异步延迟.
using Microsoft.AspNetCore.Http.Features;
using Microsoft.Extensions.Options;
using System.IO.Pipelines;
using System;
namespace AccessControlElementary;
// 自定义的HTTP功能接口,提供获取限流速率设置和当前速率的获取能力
public interface IHttpResponseThrottlingFeature
{
public long? MaximumBytesPerSecond { get; }
public long? CurrentBitsPerSecond { get; }
}
// 限流响应正文的实现类,实现了自定义的功能接口
public class ThrottlingResponseBody : Stream, IHttpResponseBodyFeature, IHttpResponseThrottlingFeature
{
private readonly IHttpResponseBodyFeature _innerBodyFeature;
private readonly IOptionsSnapshot<ResponseThrottlingOptions> _options;
private readonly HttpContext _httpContext;
private readonly Stream _innerStream;
private ThrottlingStream? _throttlingStream;
private PipeWriter? _pipeAdapter;
private bool _throttlingChecked;
private bool _complete;
private int _throttlingRefreshCycleCount;
public ThrottlingResponseBody(IHttpResponseBodyFeature innerBodyFeature, HttpContext httpContext, IOptionsSnapshot<ResponseThrottlingOptions> options)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
_httpContext = httpContext ?? throw new ArgumentNullException(nameof(httpContext));
_innerBodyFeature = innerBodyFeature ?? throw new ArgumentNullException(nameof(innerBodyFeature));
_innerStream = innerBodyFeature.Stream;
_throttlingRefreshCycleCount = 0;
}
public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => _innerStream.CanWrite;
public override long Length => _innerStream.Length;
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
public Stream Stream => this;
public PipeWriter Writer
{
get
{
if (_pipeAdapter == null)
{
_pipeAdapter = PipeWriter.Create(Stream, new StreamPipeWriterOptions(leaveOpen: true));
if (_complete)
{
_pipeAdapter.Complete();
}
}
return _pipeAdapter;
}
}
public long? MaximumBytesPerSecond => _throttlingStream?.MaximumBytesPerSecond;
public long? CurrentBitsPerSecond => _throttlingStream?.CurrentBitsPerSecond;
public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count)
{
OnWriteAsync().ConfigureAwait(false).GetAwaiter().GetResult();
if (_throttlingStream != null)
{
_throttlingStream.Write(buffer, offset, count);
_throttlingStream.Flush();
}
else
{
_innerStream.Write(buffer, offset, count);
}
}
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
await WriteAsync(buffer.AsMemory(offset, count), cancellationToken);
}
public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
await OnWriteAsync();
if (_throttlingStream != null)
{
await _throttlingStream.WriteAsync(buffer, cancellationToken);
await _throttlingStream.FlushAsync(cancellationToken);
}
else
{
await _innerStream.WriteAsync(buffer, cancellationToken);
}
}
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
{
var tcs = new TaskCompletionSource(state: state, TaskCreationOptions.RunContinuationsAsynchronously);
InternalWriteAsync(buffer, offset, count, callback, tcs);
return tcs.Task;
}
private async void InternalWriteAsync(byte[] buffer, int offset, int count, AsyncCallback? callback, TaskCompletionSource tcs)
{
try
{
await WriteAsync(buffer.AsMemory(offset, count));
tcs.TrySetResult();
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
if (callback != null)
{
// Offload callbacks to avoid stack dives on sync completions.
var ignored = Task.Run(() =>
{
try
{
callback(tcs.Task);
}
catch (Exception)
{
// Suppress exceptions on background threads.
}
});
}
}
public override void EndWrite(IAsyncResult asyncResult)
{
if (asyncResult == null)
{
throw new ArgumentNullException(nameof(asyncResult));
}
var task = (Task)asyncResult;
task.GetAwaiter().GetResult();
}
public async Task CompleteAsync()
{
if (_complete)
{
return;
}
await FinishThrottlingAsync(); // Sets _complete
await _innerBodyFeature.CompleteAsync();
}
public void DisableBuffering()
{
_innerBodyFeature?.DisableBuffering();
}
public override void Flush()
{
if (!_throttlingChecked)
{
OnWriteAsync().ConfigureAwait(false).GetAwaiter().GetResult();
// Flush the original stream to send the headers. Flushing the compression stream won't
// flush the original stream if no data has been written yet.
_innerStream.Flush();
return;
}
if (_throttlingStream != null)
{
_throttlingStream.Flush();
}
else
{
_innerStream.Flush();
}
}
public override async Task FlushAsync(CancellationToken cancellationToken)
{
if (!_throttlingChecked)
{
await OnWriteAsync();
// Flush the original stream to send the headers. Flushing the compression stream won't
// flush the original stream if no data has been written yet.
await _innerStream.FlushAsync(cancellationToken);
return;
}
if (_throttlingStream != null)
{
await _throttlingStream.FlushAsync(cancellationToken);
return;
}
await _innerStream.FlushAsync(cancellationToken);
}
public async Task SendFileAsync(string path, long offset, long? count, CancellationToken cancellationToken)
{
await OnWriteAsync();
if (_throttlingStream != null)
{
await SendFileFallback.SendFileAsync(Stream, path, offset, count, cancellationToken);
return;
}
await _innerBodyFeature.SendFileAsync(path, offset, count, cancellationToken);
}
public async Task StartAsync(CancellationToken cancellationToken = default)
{
await OnWriteAsync();
await _innerBodyFeature.StartAsync(cancellationToken);
}
internal async Task FinishThrottlingAsync()
{
if (_complete)
{
return;
}
_complete = true;
if (_pipeAdapter != null)
{
await _pipeAdapter.CompleteAsync();
}
if (_throttlingStream != null)
{
await _throttlingStream.DisposeAsync();
}
}
private async Task OnWriteAsync()
{
if (!_throttlingChecked)
{
_throttlingChecked = true;
var maxValue = await _options.Value.ThrottlingProvider.Invoke(_httpContext);
_throttlingStream = new ThrottlingStream(_innerStream, maxValue < 0 ? 0 : maxValue);
}
if (_throttlingStream != null && _options?.Value?.ThrottlingRefreshCycle > 0)
{
if (_throttlingRefreshCycleCount >= _options.Value.ThrottlingRefreshCycle)
{
_throttlingRefreshCycleCount = 0;
var maxValue = await _options.Value.ThrottlingProvider.Invoke(_httpContext);
_throttlingStream.MaximumBytesPerSecond = maxValue < 0 ? 0 : maxValue;
}
else
{
_throttlingRefreshCycleCount++;
}
}
}
}
自定义的响应正文类必须实现IHttpResponseBodyFeature接口才能作为应用的底层响应流使用,设计和实现参考ASP.NET Core的ResponseCompressionBody.
using Microsoft.AspNetCore.Http.Features;
using Microsoft.Extensions.Options;
using Timer = System.Timers.Timer;
namespace AccessControlElementary;
public class ResponseThrottlingMiddleware
{
private readonly RequestDelegate _next;
public ResponseThrottlingMiddleware(RequestDelegate next)
{
_next = next;
}
public async Task Invoke(HttpContext context, IOptionsSnapshot<ResponseThrottlingOptions> options, ILogger<ResponseThrottlingMiddleware> logger)
{
ThrottlingResponseBody throttlingBody = null;
IHttpResponseBodyFeature originalBodyFeature = null;
var shouldThrottling = await options?.Value?.ShouldThrottling?.Invoke(context);
if (shouldThrottling == true)
{
//获取原始输出Body
originalBodyFeature = context.Features.Get<IHttpResponseBodyFeature>();
//初始化限流Body
throttlingBody = new ThrottlingResponseBody(originalBodyFeature, context, options);
//设置成限流Body
context.Features.Set<IHttpResponseBodyFeature>(throttlingBody);
context.Features.Set<IHttpResponseThrottlingFeature>(throttlingBody);
// 用定时器定期向外汇报信息,这可能导致性能下降,仅用于演示目的
var timer = new Timer(1000);
timer.AutoReset = true;
long? currentBitsPerSecond = null;
var traceIdentifier = context.TraceIdentifier;
timer.Elapsed += (sender, arg) =>
{
if (throttlingBody.CurrentBitsPerSecond != currentBitsPerSecond)
{
currentBitsPerSecond = throttlingBody.CurrentBitsPerSecond;
var bps = (double)(throttlingBody.CurrentBitsPerSecond ?? 0);
var (unitBps, unit) = bps switch
{
< 1000 => (bps, "bps"),
< 1000_000 => (bps / 1000, "kbps"),
_ => (bps / 1000_000, "mbps"),
};
logger.LogDebug("请求:{RequestTraceIdentifier} 当前响应发送速率:{CurrentBitsPerSecond} {Unit}。", traceIdentifier, unitBps, unit);
}
};
// 开始发送响应后启动定时器
context.Response.OnStarting(async () =>
{
logger.LogInformation("请求:{RequestTraceIdentifier} 开始发送响应。", traceIdentifier);
timer.Start();
});
// 响应发送完成后销毁定时器
context.Response.OnCompleted(async () =>
{
logger.LogInformation("请求:{RequestTraceIdentifier} 响应发送完成。", traceIdentifier);
timer.Stop();
timer?.Dispose();
});
// 请求取消后销毁定时器
context.RequestAborted.Register(() =>
{
logger.LogInformation("请求:{RequestTraceIdentifier} 已中止。", traceIdentifier);
timer.Stop();
timer?.Dispose();
});
}
try
{
await _next(context);
if (shouldThrottling == true)
{
// 刷新响应流,确保所有数据都发送到网卡
await throttlingBody.FinishThrottlingAsync();
}
}
finally
{
if (shouldThrottling == true)
{
//限流发生错误,恢复原始Body
context.Features.Set(originalBodyFeature);
}
}
}
}
中间件负责把基础响应流替换为限流响应流,并为每个请求重新读取选项,使每个请求都能够独立控制限流的速率,然后在响应发送启动后记录响应的发送速率.
namespace AccessControlElementary;
public class ResponseThrottlingOptions
{
/// <summary>
/// 获取或设置流量限制的值的刷新周期,刷新时会重新调用<see cref="ThrottlingProvider"/>设置限制值。
/// 值越大刷新间隔越久,0或负数表示永不刷新。
/// </summary>
public int ThrottlingRefreshCycle { get; set; }
/// <summary>
/// 获取或设置指示是否应该启用流量控制的委托
/// </summary>
public Func<HttpContext, Task<bool>> ShouldThrottling { get; set; }
/// <summary>
/// 获取或设置指示流量限制大小的委托(单位:Byte/s)
/// </summary>
public Func<HttpContext, Task<int>> ThrottlingProvider { get; set; }
}
namespace AccessControlElementary;
// 配置中间件用的辅助类和扩展方法
public static class ResponseThrottlingMiddlewareExtensions
{
public static IApplicationBuilder UseResponseThrottling(this IApplicationBuilder app)
{
return app.UseMiddleware<ResponseThrottlingMiddleware>();
}
}
// 注册中间件需要的服务的辅助类和扩展方法
public static class ResponseThrottlingServicesExtensions
{
public static IServiceCollection AddResponseThrottling(this IServiceCollection services, Action<ResponseThrottlingOptions> configureOptions = null)
{
services.Configure(configureOptions);
return services;
}
}
Startup启动配置 。
namespace AccessControlElementary;
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
// 注册限流服务和选项
services.AddResponseThrottling(options =>
{
options.ThrottlingRefreshCycle = 100;
options.ShouldThrottling = static async _ => true;
options.ThrottlingProvider = static async _ => 100 * 1024; // 100KB/s
});
services.AddRazorPages();
}
public void Configure(IApplicationBuilder app)
{
// 配置响应限流中间件
app.UseResponseThrottling();
app.UseStaticFiles();
app.UseRouting();
app.UseAuthentication();
app.UseAuthorization();
app.UseEndpoints(endpoints =>
{
endpoints.MapRazorPages();
});
}
}
示例展示了如何配置和启用响应限流。ThrottlingRefreshCycle设置为每100次响应流写入周期刷新一次流量限制的值,使限流值能在响应发送中动态调整;ShouldThrottling设置为无条件启用限流;ThrottlingProvider设置为限速100 KB/s。 请求只有在UseResponseThrottling之前配置的短路中间件处被处理时不会受影响,请求没有被短路的话,只要经过限流中间件,基础响应流就被替换了。如果同时使用了响应压缩,会变成限流响应包裹压缩响应(或者相反),压缩响应(或者限流响应)又包裹基础响应的嵌套结构.
本书在介绍.NET 6基础知识时会尽可能使用具有现实意义的示例避免学习和实践脱节,本文就是其中之一,如果本文对您有价值,欢迎继续了解和购买本书。 《C#与.NET6 开发从入门到实践》预售,作者亲自来打广告了! 。
本文地址: https://www.cnblogs.com/coredx/p/17195492.html 。
最后此篇关于ASP.NETCore中如何限制响应发送速率(不是调用频率)的文章就讲到这里了,如果你想了解更多关于ASP.NETCore中如何限制响应发送速率(不是调用频率)的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
在一个新的 ASP.NET Core RC2 类库中,我有以下 project.json 文件,我试图在其中遵循 How to trim your package dependencies 上的文档.
我在本地提要上将 nuget 包从 2.2.x 更新到 3.1.0。 然后,在构建机器上尝试构建项目,但是: Project.csproj(0,0): Error NU1102: Unable to
快递100的物流信息查询接口,官方提供了一些demo;还好官方提供的代码是.netcore版本写的,不过写的有点low;根据官方提供的代码,我按照.netcore 的风格重构了代码;核心代码如下:
1、docker介绍 docker是用go语言编写基于linux操作系统的一些特性开发的,其提供了操作系统级别的抽象,是一种容器管理技术,它隔离了应用程序对基础架构(操作系统等)的依赖。相较于虚
demo运行在windows的docker中,系统是win10,所以需要先下载docker for windows,安装完毕后系统会重启,然后桌面上可以找到docker for windows的快捷
NetCore WebSocket 即时通讯示例,供大家参考,具体内容如下 1.新建Netcore Web项目 2.创建简易通讯协议 ?
需求场景: 我需要部署的项目是在Windows上开发的,目标框架为.net core 6.0 因此我们需要先在kylin上部署项目运行所需要的环境。 借助百
我正在 .NET Core 中重写一个调用外部 Web 服务的控制台应用程序。 我目前收到以下错误: One or more errors occurred. (The HTTP request is
关闭。此题需要details or clarity 。目前不接受答案。 想要改进这个问题吗?通过 editing this post 添加详细信息并澄清问题. 已关闭 6 年前。 Improve th
我有一组库,我想将其从 PCL 转移到 netcore。通过此举,我想简化 DI 系统并更新一些内部工作方式。 我想添加的其中一件事是内部对象的配置,就像在 Asp.Net Core 中一样(即 se
注:本文隶属于《理解ASP.NET Core》系列文章,请查看置顶博客或 点击此处查看全文目录 概述 在微服务化的架构设计中,网关扮演着重要的看门人角色,它所提供的功能之一
对于有多个应用系统的企业来说,每一个应用系统都有自己的用户体系,这就造成用户在切换不同应用系统时,就要多次输入账号密码,导致体验非常不好,也造成使用上非常不便。 针对这个问题,我们就可以采用单
就像 Web Api 接口可以对入参进行验证,避免用户传入非法的或者不符合我们预期的参数一样,选项也可以对配置源的内容进行验证,避免配置中的值与选项类中的属性不对应或者不满足预期,毕竟大部分配置
.NET Core 选项系统的主要实现在 Microsoft.Extensions.Options 和 Microsoft.Extensions.Options.ConfigurationExten
漏洞说明: 跨站脚本攻击(Cross Site Scripting),为了不和层叠样式表(Cascading Style Sheets, CSS)的缩写混淆,故将跨站脚本攻击缩写为XSS。恶意攻击
分布式缓存是由多个应用服务器共享的缓存,通常作为访问它的应用服务器的外部服务进行维护。 分布式缓存可以提高 ASP.NET Core 应用的性能和可伸缩性,尤其是当应用由云服务或服务器场托管时。
一个应用要运行起来,往往需要读取很多的预设好的配置信息,根据约定好的信息或方式执行一定的行为。 配置的本质就是软件运行的参数,在一个软件实现中需要的参数非常多,如果我们以 Hard Code(
2. 配置添加 配置系统可以读取到配置文件中的信息,那必然有某个地方可以将配置文件添加到配置系统中。之前的文章中讲到 ASP.NET Core 入口文件中,builder(WebApplica
3. 配置提供程序 上面提到,通过 IConfigurationBuilder 的实现类对象,我们可以自由地往配置系统中添加不同的配置提供程序,从而获取不同来源的配置信息。.NET Core
4. 自定义配置提供程序 在 .NET Core 配置系统中封装一个配置提供程序关键在于提供相应的 IconfigurationSource 实现和 IConfigurationProvide
我是一名优秀的程序员,十分优秀!