LOGO OA教程 ERP教程 模切知识交流 PMS教程 CRM教程 开发文档 其他文档  
 
网站管理员

.NET WebSocket高并发通信阻塞问题

freeflydom
2024年9月5日 11:59 本文热度 741

项目上遇到使用WebSocket超时问题,具体情况是这样的,OTA升级过程中,解压zip文件会有解压进度事件,将解压进度通过进程通信传给另一进程,通信提示超时异常

小伙伴堂园发现大文件使用Zip解压,解压进度事件间隔竟然是1ms,简直超大频率啊

但是,解压事件超频也不应该通信异常啊,于是我通过1ms定时发送通信事件,测试了下进程间通信流程。

WebSocketSharp

当前进程间通信组件是基于kaistseo/UnitySocketIO-WebSocketSharp实现,主机内设置一服务端,多个客户端连接服务端,客户端通信由服务端转发数据。客户端A发送给B后,客户端B会将执行结果反馈给客户A。

那在定位中发现,各个链路发送延时都是正常的,包括服务端发送反馈数据给到客户端A,但客户端A接收数据延时很大,下面是部分返回数据:

并且通信时间久了之后,延时会越来越大

 

这里是WebSocketSharp.WebSocket对外事件OnMessage: 

private void WebSocketOnMessage(object sender, MessageEventArgs e)

    {

        if (!e.IsText)

        {

            //暂时不支持

            return;

        }

        Debug.WriteLine($"{DateTime.Now.ToString("HH:mm:ss fff")},{e.Data}");


        var receivedMessage = JsonConvertSlim.Decode<ChannelServerMessage>(e.Data);

        xxxxx

    }

我们继续往下看,OnMessage是由WebSocket.message()触发,从_messageEventQueue队列中获取数据: 

private void message ()

    {

      MessageEventArgs e = null;

      lock (_forMessageEventQueue) {

        if (_inMessage || _messageEventQueue.Count == 0 || _readyState != WebSocketState.Open)

          return;


        _inMessage = true;

        e = _messageEventQueue.Dequeue ();

      }


      _message (e);

    }

循环接收数据是这样拿的: 

private void startReceiving ()

    {

      xxxx

      _receivingExited = new ManualResetEvent (false);

      Action receive = () => WebSocketFrame.ReadFrameAsync (_stream, false,

            frame => {

              if (!processReceivedFrame (frame) || _readyState == WebSocketState.Closed) {

                var exited = _receivingExited;

                if (exited != null)

                  exited.Set ();

                return;

              }

              // Receive next asap because the Ping or Close needs a response to it.

              receive ();

              xxxx

              message ();

            },

            xxxx

          );

      receive ();

    }


这里我看到了ManualResetEvent。。。数据量那么大,这里搞个同步信号锁,肯定会堵住咯

为何设置线程同步锁呢?我们往下看

WebSocketSharp数据发送是基于TCPClient实现的:

     _tcpClient = new TcpClient (_proxyUri.DnsSafeHost, _proxyUri.Port);

    _stream = _tcpClient.GetStream ();

初始化后通过_stream.Write (bytes, 0, bytes.Length);发送数据

接收数据,也是通过_stream读取,可以看上方的startReceiving()方法里,WebSocketFrame.ReadFrameAsync (_stream, false,...)

我们知道,TCP是面向连接,提供可靠、顺序的数据流传输。用于一对一的通信,即一个TCP连接只能有一个发送方和一个接收方。具体的可以看我之前写的文章:.NET TCP、UDP、Socket、WebSocket

但接收时在高并发场景下,适当的同步措施依然是必需的。我们可以使用lock也可以用SemaphoreSlim来实现复杂的同步需求,这里使用的是信号锁ManualResetEvent

我们再看看发送端代码,也是用了lock一个object来限制并发操作:

private bool send (Opcode opcode, Stream stream)

    {

      lock (_forSend) {

        var src = stream;

        var compressed = false;

        var sent = false;

        xxxxx

        sent = send (opcode, stream, compressed);

        xxxxx

        return sent;

      }

    }


所以WebSocketSharp在高并发场景下是存在通信阻塞问题的。当然,WebSocketSharp已经实现的很好了,正常的话几ms都不会遇到阻塞问题,如下设置3ms定时超频发送、发送一段时间后:

客户端A发送消息,由服务端转发至客户B,再将客户端B的反馈结果由服务端转发回客户端A,真正延时才0-2ms!

 

所以上方项目中遇到的ZIP文件解压进度超快1ms,只能要ZIP解压处优化下,设置并发操作10ms内保留最后一个操作,可以参考 .NET异步并发操作,只保留最后一次操作,即10ms最多触发一次解压进度事件。确实也应该这么优化,通信即使撑住这种高并发,UI刷新这么高帧率也有点浪费CPU/GPU资源。

WebSocket

我们再看看原生的WebSocket,写个WebSocket通信Demo kybs00/WebSocketDemo (github.com)

服务端定时1ms使劲往客户端发送Message消息,结果竟然是:

System.InvalidOperationException:“There is already one outstanding 'SendAsync' call for this WebSocket instance. ReceiveAsync and SendAsync can be called simultaneously, but at most one outstanding operation for each of them is allowed at the same time.”

看来发送事件外部也要处理好高并发的场景,1ms真的是太猛了

private SemaphoreSlim _sendLock = new SemaphoreSlim(1);

    private async void Timer_Elapsed(object sender, ElapsedEventArgs e)

    {

        var message = $"{DateTime.Now.ToString("HH:mm:ss fff")},hello from server";


        await _sendLock.WaitAsync();

        await BroadcastAsync("test", message);

        _sendLock.Release();

        Console.WriteLine(message);

    }


加完信号量同步,服务端就能正常发送了。下面是10分钟后客户端接收数据打印,传输几乎无延时:

另外,也尝试了单独在客户端接收添加信号量同步,依然是提示服务端发送不支持并行的异常。

所以原生WebSocket在发送端加个需要串行处理比如上面的SemaphoreSlim信号量,保证完整的写入完数据、执行_stream.FlushAsync()。

 

作者:唐宋元明清2188

出处:http://www.cnblogs.com/kybs0/

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须在文章页面给出原文连接,否则保留追究法律责任的权利。



该文章在 2024/9/5 12:01:31 编辑过
关键字查询
相关文章
正在查询...
点晴ERP是一款针对中小制造业的专业生产管理软件系统,系统成熟度和易用性得到了国内大量中小企业的青睐。
点晴PMS码头管理系统主要针对港口码头集装箱与散货日常运作、调度、堆场、车队、财务费用、相关报表等业务管理,结合码头的业务特点,围绕调度、堆场作业而开发的。集技术的先进性、管理的有效性于一体,是物流码头及其他港口类企业的高效ERP管理信息系统。
点晴WMS仓储管理系统提供了货物产品管理,销售管理,采购管理,仓储管理,仓库管理,保质期管理,货位管理,库位管理,生产管理,WMS管理系统,标签打印,条形码,二维码管理,批号管理软件。
点晴免费OA是一款软件和通用服务都免费,不限功能、不限时间、不限用户的免费OA协同办公管理系统。
Copyright 2010-2024 ClickSun All Rights Reserved