using System.Collections.Concurrent; using System.Net.Sockets; using System.Text; using detect.device.Utils; using Serilog; namespace detect.device; using TcpClient = NetCoreServer.TcpClient; public class DeviceClientSocket : TcpClient, IDeviceClient { private bool _stop; private readonly ConcurrentDictionary _deviceRequests = new(); public event EventHandler? DeviceEvent; public DeviceClientSocket(string address, int port = 13000) : base(address, port) { OptionSendBufferSize = 1024 * 1024 * 64; OptionReceiveBufferSize = 1024 * 1024 * 64; } public bool Connected() { return IsConnected; } public bool DisConnectAsync() { _stop = true; var flag = DisconnectAsync(); while (IsConnected) Thread.Yield(); return flag; } public new bool ConnectAsync() { return base.ConnectAsync(); } protected override void OnConnecting() { Log.Information("({address}:{port})OnConnecting...", Address, Port); } protected override void OnConnected() { Log.Information("({address}:{port})OnConnected.", Address, Port); PublishEvent(new DeviceEvent { Name = detect.device.DeviceEvent.EventDeviceConnected }); } protected override void OnDisconnected() { Log.Information("({address}:{port})OnDisconnected", Address, Port); PublishEvent(new DeviceEvent { Name = detect.device.DeviceEvent.EventDeviceDisConnected }); Thread.Sleep(1000); if (!_stop) ConnectAsync(); } protected override void OnError(SocketError error) { Log.Error("({address}:{port})OnError:{error}", Address, Port, error); } public void PublishEvent(DeviceEvent deviceEvent) { DeviceEvent?.Invoke(this, deviceEvent); } protected override void OnReceived(byte[] buffer, long offset, long size) { var received = Encoding.UTF8.GetString(buffer, (int)offset, (int)size); Log.Debug("({address}:{port})received: {content}", Address, Port, received); var jsons = received.Split("}{"); if (jsons.Length > 1) { for (var i = 0; i < jsons.Length; i++) { var json = jsons[i]; var content = "{" + json + "}"; if (i == 0) { content = json + "}"; } else if (i == jsons.Length - 1) { content = "{" + json; } ProcessReceived(content); } } else { ProcessReceived(received); } } private void ProcessReceived(string content) { var result = JsonUtil.DeserializeObject>(content); if ("event".Equals(result.Type)) { var deviceEvent = JsonUtil.DeserializeObject(JsonUtil.SerializeObject(content)); DeviceEvent?.Invoke(this, deviceEvent); } else { _deviceRequests.TryAdd(result.RequestId!, content); } } public async Task> RequestAction(DeviceClientRequestBuilder builder) { var requestId = builder.RequestId; _deviceRequests.TryRemove(requestId, out _); Send(builder.Build()); var task = Task.Run(() => { _deviceRequests.TryGetValue(requestId, out var result); while (string.IsNullOrEmpty(result)) { Thread.Sleep(100); _deviceRequests.TryGetValue(requestId, out result); } return result; }); var result = await task; _deviceRequests.TryRemove(requestId, out _); return JsonUtil.DeserializeObject>(result); } public DeviceClientResponse RequestActionSync(DeviceClientRequestBuilder builder) { throw new NotImplementedException(); } }