detect/detect.device/DeviceClientSocket.cs

139 lines
4.0 KiB
C#
Raw Normal View History

2024-11-13 17:09:15 +08:00
using System.Collections.Concurrent;
using System.Net.Sockets;
using System.Text;
using detect.device.Utils;
using Serilog;
namespace detect.device;
using TcpClient = NetCoreServer.TcpClient;
public class DeviceClientSocket : TcpClient, IDeviceClient
{
private bool _stop;
private readonly ConcurrentDictionary<string, string> _deviceRequests = new();
public event EventHandler<DeviceEvent>? DeviceEvent;
public DeviceClientSocket(string address, int port = 13000) : base(address, port) {}
public bool Connected()
{
return IsConnected;
}
public bool DisConnectAsync()
{
_stop = true;
var flag = DisconnectAsync();
while (IsConnected)
Thread.Yield();
return flag;
}
public new bool ConnectAsync()
{
return base.ConnectAsync();
}
protected override void OnConnecting()
{
Log.Information("({address}:{port})OnConnecting...", Address, Port);
}
protected override void OnConnected()
{
Log.Information("({address}:{port})OnConnected.", Address, Port);
2024-11-15 10:05:49 +08:00
PublishEvent(new DeviceEvent
{
Name = detect.device.DeviceEvent.EventDeviceConnected
});
2024-11-13 17:09:15 +08:00
}
protected override void OnDisconnected()
{
Log.Information("({address}:{port})OnDisconnected", Address, Port);
2024-11-15 10:05:49 +08:00
PublishEvent(new DeviceEvent
{
Name = detect.device.DeviceEvent.EventDeviceDisConnected
});
2024-11-13 17:09:15 +08:00
Thread.Sleep(1000);
if (!_stop)
ConnectAsync();
}
protected override void OnError(SocketError error)
{
Log.Error("({address}:{port})OnError:{error}", Address, Port, error);
}
public void PublishEvent(DeviceEvent deviceEvent)
{
DeviceEvent?.Invoke(this, deviceEvent);
}
protected override void OnReceived(byte[] buffer, long offset, long size)
{
var received = Encoding.UTF8.GetString(buffer, (int)offset, (int)size);
Log.Debug("({address}:{port})received: {content}", Address, Port, received);
var jsons = received.Split("}{");
if (jsons.Length > 1)
{
for (var i = 0; i < jsons.Length; i++)
{
var json = jsons[i];
var content = "{" + json + "}";
if (i == 0)
{
content = json + "}";
} else if (i == jsons.Length - 1)
{
content = "{" + json;
}
ProcessReceived(content);
}
}
else
{
ProcessReceived(received);
}
}
private void ProcessReceived(string content)
{
var result = JsonUtil.DeserializeObject<DeviceClientResponse<object>>(content);
if ("event".Equals(result.Type))
{
var deviceEvent =
JsonUtil.DeserializeObject<DeviceEvent>(JsonUtil.SerializeObject(content));
DeviceEvent?.Invoke(this, deviceEvent);
}
else
{
_deviceRequests.TryAdd(result.RequestId!, content);
}
}
public async Task<DeviceClientResponse<T>> RequestAction<T>(DeviceClientRequestBuilder builder)
{
var requestId = builder.RequestId;
_deviceRequests.TryRemove(requestId, out _);
Send(builder.Build());
var task = Task.Run(() =>
{
_deviceRequests.TryGetValue(requestId, out var result);
while (string.IsNullOrEmpty(result))
{
Thread.Sleep(100);
_deviceRequests.TryGetValue(requestId, out result);
}
return result;
});
var result = await task;
_deviceRequests.TryRemove(requestId, out _);
return JsonUtil.DeserializeObject<DeviceClientResponse<T>>(result);
}
public DeviceClientResponse<object> RequestActionSync(DeviceClientRequestBuilder builder)
{
throw new NotImplementedException();
}
}