http流量镜像
“流量镜像”是指将网络中的数据流量复制一份,并将这份复制流量发送到另一个目的地(如监控、分析或安全检测系统)。这项技术常用于网络安全、故障排查、业务灰度发布等场景。
主要应用场景
- 安全监控与威胁检测
将生产环境的流量镜像到安全分析设备(如IDS/IPS),用于实时监控和威胁检测。
- 性能分析与故障排查
将流量镜像到分析平台,对网络异常、延迟、丢包等问题进行实时排查和定位。
- 灰度发布和A/B测试
将真实用户流量镜像到新版本服务,进行灰度环境验证和兼容性测试,不影响真实用户体验。
- 合规与审计
对重要业务流量进行行为记录,以满足合规和审计要求。
VKProxy 目前只支持 http流量镜像, 需注意由于会再一次发送http 请求,请求body会临时暂存内存,所以无论内存还是请求延迟都会受到影响,特别body很大的请求
设置
大家可以在Metadata中设置缓存, 具体设置项如下
- MirrorCluster
镜像流量发送到的集群id
配置示例:- {
- "ReverseProxy": {
- "Routes": {
- "a": {
- "Order": 0,
- "Match": {
- "Hosts": [ "api.com" ],
- "Paths": [ "*" ]
- },
- "ClusterId": "apidemo",
- "Metadata": {
- "MirrorCluster": "apidemoMirror"
- }
- }
- },
- "Clusters": {
- "apidemo": {
- "LoadBalancingPolicy": "Hash",
- "Metadata": {
- "HashBy": "header",
- "Key": "X-forwarded-For"
- },
- "Destinations": [
- {
- "Address": "https://xxx.lt"
- }
- ]
- },
- "apidemoMirror": {
- "LoadBalancingPolicy": "Hash",
- "Metadata": {
- "HashBy": "header",
- "Key": "X-forwarded-For"
- },
- "Destinations": [
- {
- "Address": "http://xxx.org/"
- }
- ]
- }
- }
- }
- }
复制代码 具体实现
首先需要缓存body 内容,这里实现一个简单的 ReadBufferingStream- public class ReadBufferingStream : Stream, IDisposable
- {
- private readonly SparseBufferWriter<byte> bufferWriter;
- protected Stream innerStream;
- public ReadBufferingStream(Stream innerStream)
- {
- this.innerStream = innerStream;
- bufferWriter = new SparseBufferWriter<byte>();
- }
- public override bool CanRead => innerStream.CanRead;
- public override bool CanSeek => innerStream.CanSeek;
- public override bool CanWrite => innerStream.CanWrite;
- public override long Length => innerStream.Length;
- public override long Position
- {
- get => innerStream.Position;
- set => innerStream.Position = value;
- }
- public override int WriteTimeout
- {
- get => innerStream.WriteTimeout;
- set => innerStream.WriteTimeout = value;
- }
- public Stream BufferingStream => bufferWriter.AsStream(true);
- public override void Flush()
- {
- innerStream.Flush();
- }
- public override Task FlushAsync(CancellationToken cancellationToken)
- {
- return innerStream.FlushAsync(cancellationToken);
- }
- public override int Read(byte[] buffer, int offset, int count)
- {
- var res = innerStream.Read(buffer, offset, count);
- // Zero-byte reads (where the passed in buffer has 0 length) can occur when using PipeReader, we don't want to accidentally complete the RequestBody logging in this case
- if (count == 0)
- {
- return res;
- }
- bufferWriter.Write(buffer.AsSpan(offset, res));
- return res;
- }
- public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
- {
- var res = await innerStream.ReadAsync(buffer.AsMemory(offset, count), cancellationToken);
- if (count == 0)
- {
- return res;
- }
- bufferWriter.Write(buffer.AsSpan(offset, res));
- return res;
- }
- public override long Seek(long offset, SeekOrigin origin)
- {
- return innerStream.Seek(offset, origin);
- }
- public override void SetLength(long value)
- {
- innerStream.SetLength(value);
- }
- public override void Write(byte[] buffer, int offset, int count)
- {
- innerStream.Write(buffer, offset, count);
- }
- public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
- {
- return innerStream.WriteAsync(buffer, offset, count, cancellationToken);
- }
- public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
- {
- return innerStream.WriteAsync(buffer, cancellationToken);
- }
- public override void Write(ReadOnlySpan<byte> buffer)
- {
- innerStream.Write(buffer);
- }
- public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
- {
- return innerStream.BeginRead(buffer, offset, count, callback, state);
- }
- public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
- {
- return innerStream.BeginWrite(buffer, offset, count, callback, state);
- }
- public override int EndRead(IAsyncResult asyncResult)
- {
- return innerStream.EndRead(asyncResult);
- }
- public override void EndWrite(IAsyncResult asyncResult)
- {
- innerStream.EndWrite(asyncResult);
- }
- public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
- {
- var res = await innerStream.ReadAsync(buffer, cancellationToken);
- if (buffer.IsEmpty)
- {
- return res;
- }
- bufferWriter.Write(buffer.Slice(0, res).Span);
- return res;
- }
- public override ValueTask DisposeAsync()
- {
- return innerStream.DisposeAsync();
- }
- protected override void Dispose(bool disposing)
- {
- if (disposing)
- {
- bufferWriter.Dispose();
- }
- }
- }
复制代码 然后利用中间件进行镜像处理- public class MirrorFunc : IHttpFunc
- {
- private readonly IServiceProvider serviceProvider;
- private readonly IHttpForwarder forwarder;
- private readonly ILoadBalancingPolicyFactory loadBalancing;
- private readonly IForwarderHttpClientFactory forwarderHttpClientFactory;
- private readonly ProxyLogger logger;
- public int Order => int.MinValue;
- public MirrorFunc(IServiceProvider serviceProvider, IHttpForwarder forwarder, ILoadBalancingPolicyFactory loadBalancing, IForwarderHttpClientFactory forwarderHttpClientFactory, ProxyLogger logger)
- {
- this.serviceProvider = serviceProvider;
- this.forwarder = forwarder;
- this.loadBalancing = loadBalancing;
- this.forwarderHttpClientFactory = forwarderHttpClientFactory;
- this.logger = logger;
- }
- public RequestDelegate Create(RouteConfig config, RequestDelegate next)
- {
- if (config.Metadata == null || !config.Metadata.TryGetValue("MirrorCluster", out var mirrorCluster) || string.IsNullOrWhiteSpace(mirrorCluster)) return next;
- return c => Mirror(c, mirrorCluster, next);
- }
- private async Task Mirror(HttpContext c, string mirrorCluster, RequestDelegate next)
- {
- var config = serviceProvider.GetRequiredService<IConfigSource<IProxyConfig>>();
- if (config.CurrentSnapshot == null || config.CurrentSnapshot.Clusters == null || !config.CurrentSnapshot.Clusters.TryGetValue(mirrorCluster, out var cluster) || cluster == null)
- {
- await next(c);
- return;
- }
- var originBody = c.Request.Body;
- using var buffer = new ReadBufferingStream(originBody);
- c.Request.Body = buffer;
- try
- {
- await next(c);
- }
- finally
- {
- c.Request.Body = buffer.BufferingStream;
- try
- {
- var proxyFeature = c.Features.GetRequiredFeature<IReverseProxyFeature>();
- var origin = proxyFeature.SelectedDestination;
- var selectedDestination = loadBalancing.PickDestination(proxyFeature, cluster);
- proxyFeature.SelectedDestination = origin;
- if (selectedDestination != null)
- {
- cluster.InitHttp(forwarderHttpClientFactory);
- await forwarder.SendAsync(c, proxyFeature, selectedDestination, cluster, new NonHttpTransformer(proxyFeature.Route.Transformer));
- }
- }
- catch (Exception ex)
- {
- logger.LogWarning(ex, "Mirror failed");
- }
- finally
- {
- c.Request.Body = originBody;
- }
- }
- }
- }
复制代码 所以说会再一次发送http 请求,请求body会临时暂存内存,所以无论内存还是请求延迟都会受到影响,特别body很大的请求
VKProxy 是使用c#开发的基于 Kestrel 实现 L4/L7的代理(感兴趣的同学烦请点个github小赞赞呢)
来源:豆瓜网用户自行投稿发布,如果侵权,请联系站长删除 |