找回密码
 立即注册
首页 业界区 业界 http流量镜像

http流量镜像

膏包 2025-7-5 06:58:32
http流量镜像

“流量镜像”是指将网络中的数据流量复制一份,并将这份复制流量发送到另一个目的地(如监控、分析或安全检测系统)。这项技术常用于网络安全、故障排查、业务灰度发布等场景。
主要应用场景

  • 安全监控与威胁检测
    将生产环境的流量镜像到安全分析设备(如IDS/IPS),用于实时监控和威胁检测。
  • 性能分析与故障排查
    将流量镜像到分析平台,对网络异常、延迟、丢包等问题进行实时排查和定位。
  • 灰度发布和A/B测试
    将真实用户流量镜像到新版本服务,进行灰度环境验证和兼容性测试,不影响真实用户体验。
  • 合规与审计
    对重要业务流量进行行为记录,以满足合规和审计要求。
VKProxy 目前只支持 http流量镜像, 需注意由于会再一次发送http 请求,请求body会临时暂存内存,所以无论内存还是请求延迟都会受到影响,特别body很大的请求
设置

大家可以在Metadata中设置缓存, 具体设置项如下

  • MirrorCluster
    镜像流量发送到的集群id
配置示例:
  1. {
  2.   "ReverseProxy": {
  3.     "Routes": {
  4.       "a": {
  5.         "Order": 0,
  6.         "Match": {
  7.             "Hosts": [ "api.com" ],
  8.             "Paths": [ "*" ]
  9.         },
  10.         "ClusterId": "apidemo",
  11.         "Metadata": {
  12.           "MirrorCluster": "apidemoMirror"
  13.         }
  14.       }
  15.     },
  16.     "Clusters": {
  17.       "apidemo": {
  18.         "LoadBalancingPolicy": "Hash",
  19.         "Metadata": {
  20.           "HashBy": "header",
  21.           "Key": "X-forwarded-For"
  22.         },
  23.         "Destinations": [
  24.           {
  25.             "Address": "https://xxx.lt"
  26.           }
  27.         ]
  28.       },
  29.       "apidemoMirror": {
  30.         "LoadBalancingPolicy": "Hash",
  31.         "Metadata": {
  32.           "HashBy": "header",
  33.           "Key": "X-forwarded-For"
  34.         },
  35.         "Destinations": [
  36.           {
  37.             "Address": "http://xxx.org/"
  38.           }
  39.         ]
  40.       }
  41.     }
  42.   }
  43. }
复制代码
具体实现

首先需要缓存body 内容,这里实现一个简单的 ReadBufferingStream
  1. public class ReadBufferingStream : Stream, IDisposable
  2. {
  3.     private readonly SparseBufferWriter<byte> bufferWriter;
  4.     protected Stream innerStream;
  5.     public ReadBufferingStream(Stream innerStream)
  6.     {
  7.         this.innerStream = innerStream;
  8.         bufferWriter = new SparseBufferWriter<byte>();
  9.     }
  10.     public override bool CanRead => innerStream.CanRead;
  11.     public override bool CanSeek => innerStream.CanSeek;
  12.     public override bool CanWrite => innerStream.CanWrite;
  13.     public override long Length => innerStream.Length;
  14.     public override long Position
  15.     {
  16.         get => innerStream.Position;
  17.         set => innerStream.Position = value;
  18.     }
  19.     public override int WriteTimeout
  20.     {
  21.         get => innerStream.WriteTimeout;
  22.         set => innerStream.WriteTimeout = value;
  23.     }
  24.     public Stream BufferingStream => bufferWriter.AsStream(true);
  25.     public override void Flush()
  26.     {
  27.         innerStream.Flush();
  28.     }
  29.     public override Task FlushAsync(CancellationToken cancellationToken)
  30.     {
  31.         return innerStream.FlushAsync(cancellationToken);
  32.     }
  33.     public override int Read(byte[] buffer, int offset, int count)
  34.     {
  35.         var res = innerStream.Read(buffer, offset, count);
  36.         // Zero-byte reads (where the passed in buffer has 0 length) can occur when using PipeReader, we don't want to accidentally complete the RequestBody logging in this case
  37.         if (count == 0)
  38.         {
  39.             return res;
  40.         }
  41.         bufferWriter.Write(buffer.AsSpan(offset, res));
  42.         return res;
  43.     }
  44.     public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
  45.     {
  46.         var res = await innerStream.ReadAsync(buffer.AsMemory(offset, count), cancellationToken);
  47.         if (count == 0)
  48.         {
  49.             return res;
  50.         }
  51.         bufferWriter.Write(buffer.AsSpan(offset, res));
  52.         return res;
  53.     }
  54.     public override long Seek(long offset, SeekOrigin origin)
  55.     {
  56.         return innerStream.Seek(offset, origin);
  57.     }
  58.     public override void SetLength(long value)
  59.     {
  60.         innerStream.SetLength(value);
  61.     }
  62.     public override void Write(byte[] buffer, int offset, int count)
  63.     {
  64.         innerStream.Write(buffer, offset, count);
  65.     }
  66.     public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
  67.     {
  68.         return innerStream.WriteAsync(buffer, offset, count, cancellationToken);
  69.     }
  70.     public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
  71.     {
  72.         return innerStream.WriteAsync(buffer, cancellationToken);
  73.     }
  74.     public override void Write(ReadOnlySpan<byte> buffer)
  75.     {
  76.         innerStream.Write(buffer);
  77.     }
  78.     public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
  79.     {
  80.         return innerStream.BeginRead(buffer, offset, count, callback, state);
  81.     }
  82.     public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
  83.     {
  84.         return innerStream.BeginWrite(buffer, offset, count, callback, state);
  85.     }
  86.     public override int EndRead(IAsyncResult asyncResult)
  87.     {
  88.         return innerStream.EndRead(asyncResult);
  89.     }
  90.     public override void EndWrite(IAsyncResult asyncResult)
  91.     {
  92.         innerStream.EndWrite(asyncResult);
  93.     }
  94.     public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
  95.     {
  96.         var res = await innerStream.ReadAsync(buffer, cancellationToken);
  97.         if (buffer.IsEmpty)
  98.         {
  99.             return res;
  100.         }
  101.         bufferWriter.Write(buffer.Slice(0, res).Span);
  102.         return res;
  103.     }
  104.     public override ValueTask DisposeAsync()
  105.     {
  106.         return innerStream.DisposeAsync();
  107.     }
  108.     protected override void Dispose(bool disposing)
  109.     {
  110.         if (disposing)
  111.         {
  112.             bufferWriter.Dispose();
  113.         }
  114.     }
  115. }
复制代码
然后利用中间件进行镜像处理
  1. public class MirrorFunc : IHttpFunc
  2. {
  3.     private readonly IServiceProvider serviceProvider;
  4.     private readonly IHttpForwarder forwarder;
  5.     private readonly ILoadBalancingPolicyFactory loadBalancing;
  6.     private readonly IForwarderHttpClientFactory forwarderHttpClientFactory;
  7.     private readonly ProxyLogger logger;
  8.     public int Order => int.MinValue;
  9.     public MirrorFunc(IServiceProvider serviceProvider, IHttpForwarder forwarder, ILoadBalancingPolicyFactory loadBalancing, IForwarderHttpClientFactory forwarderHttpClientFactory, ProxyLogger logger)
  10.     {
  11.         this.serviceProvider = serviceProvider;
  12.         this.forwarder = forwarder;
  13.         this.loadBalancing = loadBalancing;
  14.         this.forwarderHttpClientFactory = forwarderHttpClientFactory;
  15.         this.logger = logger;
  16.     }
  17.     public RequestDelegate Create(RouteConfig config, RequestDelegate next)
  18.     {
  19.         if (config.Metadata == null || !config.Metadata.TryGetValue("MirrorCluster", out var mirrorCluster) || string.IsNullOrWhiteSpace(mirrorCluster)) return next;
  20.         return c => Mirror(c, mirrorCluster, next);
  21.     }
  22.     private async Task Mirror(HttpContext c, string mirrorCluster, RequestDelegate next)
  23.     {
  24.         var config = serviceProvider.GetRequiredService<IConfigSource<IProxyConfig>>();
  25.         if (config.CurrentSnapshot == null || config.CurrentSnapshot.Clusters == null || !config.CurrentSnapshot.Clusters.TryGetValue(mirrorCluster, out var cluster) || cluster == null)
  26.         {
  27.             await next(c);
  28.             return;
  29.         }
  30.         var originBody = c.Request.Body;
  31.         using var buffer = new ReadBufferingStream(originBody);
  32.         c.Request.Body = buffer;
  33.         try
  34.         {
  35.             await next(c);
  36.         }
  37.         finally
  38.         {
  39.             c.Request.Body = buffer.BufferingStream;
  40.             try
  41.             {
  42.                 var proxyFeature = c.Features.GetRequiredFeature<IReverseProxyFeature>();
  43.                 var origin = proxyFeature.SelectedDestination;
  44.                 var selectedDestination = loadBalancing.PickDestination(proxyFeature, cluster);
  45.                 proxyFeature.SelectedDestination = origin;
  46.                 if (selectedDestination != null)
  47.                 {
  48.                     cluster.InitHttp(forwarderHttpClientFactory);
  49.                     await forwarder.SendAsync(c, proxyFeature, selectedDestination, cluster, new NonHttpTransformer(proxyFeature.Route.Transformer));
  50.                 }
  51.             }
  52.             catch (Exception ex)
  53.             {
  54.                 logger.LogWarning(ex, "Mirror failed");
  55.             }
  56.             finally
  57.             {
  58.                 c.Request.Body = originBody;
  59.             }
  60.         }
  61.     }
  62. }
复制代码
所以说会再一次发送http 请求,请求body会临时暂存内存,所以无论内存还是请求延迟都会受到影响,特别body很大的请求
VKProxy 是使用c#开发的基于 Kestrel 实现 L4/L7的代理(感兴趣的同学烦请点个github小赞赞呢)

来源:豆瓜网用户自行投稿发布,如果侵权,请联系站长删除

相关推荐

您需要登录后才可以回帖 登录 | 立即注册