Added auto-reconnect system, hoping it will help with connection shutdown issues

README: added Troubleshooting guide
This commit is contained in:
Wizou 2021-09-16 04:47:15 +02:00
parent dcd384ed27
commit b17349bd75
3 changed files with 224 additions and 162 deletions

View file

@ -21,6 +21,7 @@ namespace WTelegram
{
public event Action<ITLObject> Update;
public Config TLConfig { get; private set; }
public int MaxAutoReconnects { get; set; } = 5; // number of automatic reconnections on connection/reactor failure
private readonly Func<string, string> _config;
private readonly int _apiId;
@ -37,7 +38,7 @@ namespace WTelegram
private Task _reactorTask;
private long _bareRequest;
private readonly Dictionary<long, (Type type, TaskCompletionSource<object> tcs)> _pendingRequests = new();
private readonly SemaphoreSlim _sendSemaphore = new(1);
private SemaphoreSlim _sendSemaphore = new(0);
private CancellationTokenSource _cts;
/// <summary>Welcome to WTelegramClient! 😀</summary>
@ -86,6 +87,7 @@ namespace WTelegram
public void Dispose()
{
Helpers.Log(2, "Disposing the client");
if (CheckMsgsToAck() is MsgsAck msgsAck)
SendAsync(MakeFunction(msgsAck), false).Wait(1000);
_cts?.Cancel();
@ -93,12 +95,15 @@ namespace WTelegram
_tcpClient?.Dispose();
}
public void Reset() // disconnect and reset session (forget server address, current user and authkey)
// disconnect and eventually reset session (forget server address, current user and authkey)
public void Reset(bool resetSession = true)
{
_cts?.Cancel();
_sendSemaphore = new(0);
_reactorTask = null;
_tcpClient?.Dispose();
_session.Reset();
if (resetSession)
_session.Reset();
}
/// <summary>Establish connection to Telegram servers. Config callback is queried for: server_address</summary>
@ -116,7 +121,8 @@ namespace WTelegram
_networkStream = _tcpClient.GetStream();
_frame_seqTx = _frame_seqRx = 0;
_cts = new();
_reactorTask = Reactor(_cts.Token);
_reactorTask = Reactor(_networkStream, _cts.Token);
_sendSemaphore.Release();
if (_session.AuthKey == null)
await CreateAuthorizationKey(this, _session);
@ -130,6 +136,7 @@ namespace WTelegram
Config("lang_pack"),
Config("lang_code"),
Help_GetConfig));
Helpers.Log(2, $"Connected to {(TLConfig.test_mode ? "Test DC" : "DC")} {TLConfig.this_dc}... {TLConfig.flags & (Config.Flags)~0xE00}");
}
private async Task MigrateDCAsync(int dcId)
@ -139,9 +146,7 @@ namespace WTelegram
if (_session.User != null)
exported = await Auth_ExportAuthorization(dcId);
var prevFamily = _tcpClient.Client.RemoteEndPoint.AddressFamily;
_cts.Cancel();
_reactorTask = null;
_tcpClient.Dispose();
Reset(false);
var dcOptions = TLConfig.dc_options.Where(dc => dc.id == dcId && (dc.flags & (DcOption.Flags.media_only | DcOption.Flags.cdn)) == 0);
if (prevFamily == AddressFamily.InterNetworkV6) // try to stay in the same connectivity
dcOptions = dcOptions.OrderByDescending(dc => dc.flags & DcOption.Flags.ipv6); // list ipv6 first
@ -159,14 +164,185 @@ namespace WTelegram
}
}
private static ITLFunction MakeFunction(ITLObject msg)
=> writer =>
private async Task Reactor(NetworkStream stream, CancellationToken ct)
{
int reconnects = 0;
while (!ct.IsCancellationRequested)
{
writer.WriteTLObject(msg);
return msg.GetType().Name;
ITLObject obj;
try
{
obj = await RecvAsync(stream, ct);
}
catch (Exception ex) // an exception in RecvAsync is always fatal
{
if (ct.IsCancellationRequested) return;
Helpers.Log(5, $"An exception occured in the reactor: {ex}");
var oldSemaphore = _sendSemaphore;
await oldSemaphore.WaitAsync(ct); // prevent any sending while we reconnect
try
{
lock (_pendingRequests) // abort all pending requests
{
foreach (var (_, tcs) in _pendingRequests.Values)
_ = Task.Run(() => tcs.SetException(ex), default);
_pendingRequests.Clear();
_bareRequest = 0;
}
reconnects = (reconnects + 1) % MaxAutoReconnects;
if (reconnects != 0)
{
Reset(false);
await ConnectAsync(); // start a new reactor
}
else
OnUpdate(new ReactorError { Exception = ex });
}
finally
{
oldSemaphore.Release();
}
return; // always stop the reactor
}
if (obj != null)
await HandleMessageAsync(obj);
}
}
internal async Task<ITLObject> RecvAsync(NetworkStream stream, CancellationToken ct)
{
var data = await RecvFrameAsync(stream, ct);
if (data.Length == 4 && data[3] == 0xFF)
{
int error_code = -BinaryPrimitives.ReadInt32LittleEndian(data);
throw new RpcException(error_code, TransportError(error_code));
}
if (data.Length < 24) // authKeyId+msgId+length+ctorNb | authKeyId+msgKey
throw new ApplicationException($"Packet payload too small: {data.Length}");
long authKeyId = BinaryPrimitives.ReadInt64LittleEndian(data);
if (authKeyId != _session.AuthKeyID)
throw new ApplicationException($"Received a packet encrypted with unexpected key {authKeyId:X}");
if (authKeyId == 0) // Unencrypted message
{
using var reader = new TL.BinaryReader(new MemoryStream(data, 8, data.Length - 8), this);
long msgId = _lastRecvMsgId = reader.ReadInt64();
if ((msgId & 1) == 0) throw new ApplicationException($"Invalid server msgId {msgId}");
int length = reader.ReadInt32();
if (length != data.Length - 20) throw new ApplicationException($"Unexpected unencrypted length {length} != {data.Length - 20}");
var obj = reader.ReadTLObject();
Helpers.Log(1, $"Receiving {obj.GetType().Name,-50} {_session.MsgIdToStamp(msgId):u} {((msgId & 2) == 0 ? "" : "NAR")} unencrypted");
return obj;
}
else
{
#if MTPROTO1
byte[] msgKeyLarge = data[4..24];
#else
byte[] msgKeyLarge = data[0..24];
#endif
byte[] decrypted_data = EncryptDecryptMessage(data.AsSpan(24), false, _session.AuthKey, msgKeyLarge);
if (decrypted_data.Length < 36) // header below+ctorNb
throw new ApplicationException($"Decrypted packet too small: {decrypted_data.Length}");
using var reader = new TL.BinaryReader(new MemoryStream(decrypted_data), this);
var serverSalt = reader.ReadInt64(); // int64 salt
var sessionId = reader.ReadInt64(); // int64 session_id
var msgId = _lastRecvMsgId = reader.ReadInt64();// int64 message_id
var seqno = reader.ReadInt32(); // int32 msg_seqno
var length = reader.ReadInt32(); // int32 message_data_length
var msgStamp = _session.MsgIdToStamp(msgId);
if (serverSalt != _session.Salt)
{
Helpers.Log(2, $"Server salt has changed: {_session.Salt:X8} -> {serverSalt:X8}");
_session.Salt = serverSalt;
if (++_unexpectedSaltChange >= 30)
throw new ApplicationException($"Server salt changed unexpectedly more than 30 times during this session");
}
if (sessionId != _session.Id) throw new ApplicationException($"Unexpected session ID {_session.Id} != {_session.Id}");
if ((msgId & 1) == 0) throw new ApplicationException($"Invalid server msgId {msgId}");
if ((seqno & 1) != 0) lock (_msgsToAck) _msgsToAck.Add(msgId);
if ((msgStamp - DateTime.UtcNow).Ticks / TimeSpan.TicksPerSecond is > 30 or < -300)
return null;
#if MTPROTO1
if (decrypted_data.Length - 32 - length is < 0 or > 15) throw new ApplicationException($"Unexpected decrypted message_data_length {length} / {decrypted_data.Length - 32}");
if (!data.AsSpan(8, 16).SequenceEqual(Sha1Recv.ComputeHash(decrypted_data, 0, 32 + length).AsSpan(4)))
throw new ApplicationException($"Mismatch between MsgKey & decrypted SHA1");
#else
if (decrypted_data.Length - 32 - length is < 12 or > 1024) throw new ApplicationException($"Unexpected decrypted message_data_length {length} / {decrypted_data.Length - 32}");
Sha256Recv.Initialize();
Sha256Recv.TransformBlock(_session.AuthKey, 96, 32, null, 0);
Sha256Recv.TransformFinalBlock(decrypted_data, 0, decrypted_data.Length);
if (!data.AsSpan(8, 16).SequenceEqual(Sha256Recv.Hash.AsSpan(8, 16)))
throw new ApplicationException($"Mismatch between MsgKey & decrypted SHA1");
#endif
var ctorNb = reader.ReadUInt32();
if (ctorNb == Schema.MsgContainer)
{
Helpers.Log(1, $"Receiving {"MsgContainer",-50} {msgStamp:u} (svc)");
return ReadMsgContainer(reader);
}
else if (ctorNb == Schema.RpcResult)
{
Helpers.Log(1, $"Receiving {"RpcResult",-50} {msgStamp:u}");
return ReadRpcResult(reader);
}
else
{
var obj = reader.ReadTLObject(ctorNb);
Helpers.Log(1, $"Receiving {obj.GetType().Name,-50} {msgStamp:u} {((seqno & 1) != 0 ? "" : "(svc)")} {((msgId & 2) == 0 ? "" : "NAR")}");
return obj;
}
}
static string TransportError(int error_code) => error_code switch
{
404 => "Auth key not found",
429 => "Transport flood",
_ => ((HttpStatusCode)error_code).ToString(),
};
}
private async Task<byte[]> RecvFrameAsync(NetworkStream stream, CancellationToken ct)
{
byte[] frame = new byte[8];
if (await FullReadAsync(stream, frame, 8, ct) != 8)
throw new ApplicationException("Could not read frame prefix : Connection shut down");
int length = BinaryPrimitives.ReadInt32LittleEndian(frame) - 12;
if (length <= 0 || length >= 0x10000)
throw new ApplicationException("Invalid frame_len");
int seqno = BinaryPrimitives.ReadInt32LittleEndian(frame.AsSpan(4));
if (seqno != _frame_seqRx++)
{
Trace.TraceWarning($"Unexpected frame_seq received: {seqno} instead of {_frame_seqRx}");
_frame_seqRx = seqno + 1;
}
var payload = new byte[length];
if (await FullReadAsync(stream, payload, length, ct) != length)
throw new ApplicationException("Could not read frame data : Connection shut down");
uint crc32 = Compat.UpdateCrc32(0, frame, 0, 8);
crc32 = Compat.UpdateCrc32(crc32, payload, 0, payload.Length);
if (await FullReadAsync(stream, frame, 4, ct) != 4)
throw new ApplicationException("Could not read frame CRC : Connection shut down");
if (crc32 != BinaryPrimitives.ReadUInt32LittleEndian(frame))
throw new ApplicationException("Invalid envelope CRC32");
return payload;
}
#pragma warning disable CA1835 // necessary for .NET Standard 2.0 compilation
private static async Task<int> FullReadAsync(Stream stream, byte[] buffer, int length, CancellationToken ct = default)
{
for (int offset = 0; offset != length;)
{
var read = await stream.ReadAsync(buffer, offset, length - offset, ct);
if (read == 0) return offset;
offset += read;
}
return length;
}
private async Task<long> SendAsync(ITLFunction func, bool isContent)
{
if (_session.AuthKeyID != 0 && isContent && CheckMsgsToAck() is MsgsAck msgsAck)
@ -252,139 +428,14 @@ namespace WTelegram
}
return msgId;
}
private static async Task<int> FullReadAsync(Stream stream, byte[] buffer, int length, CancellationToken ct = default)
{
for (int offset = 0; offset != length;)
{
var read = await stream.ReadAsync(buffer, offset, length - offset, ct);
if (read == 0) return offset;
offset += read;
}
return length;
}
#pragma warning restore CA1835
private async Task<byte[]> RecvFrameAsync(CancellationToken ct)
{
byte[] frame = new byte[8];
if (await FullReadAsync(_networkStream, frame, 8, ct) != 8)
throw new ApplicationException("Could not read frame prefix : Connection shut down");
int length = BinaryPrimitives.ReadInt32LittleEndian(frame) - 12;
if (length <= 0 || length >= 0x10000)
throw new ApplicationException("Invalid frame_len");
int seqno = BinaryPrimitives.ReadInt32LittleEndian(frame.AsSpan(4));
if (seqno != _frame_seqRx++)
private static ITLFunction MakeFunction(ITLObject msg)
=> writer =>
{
Trace.TraceWarning($"Unexpected frame_seq received: {seqno} instead of {_frame_seqRx}");
_frame_seqRx = seqno + 1;
}
var payload = new byte[length];
if (await FullReadAsync(_networkStream, payload, length, ct) != length)
throw new ApplicationException("Could not read frame data : Connection shut down");
uint crc32 = Compat.UpdateCrc32(0, frame, 0, 8);
crc32 = Compat.UpdateCrc32(crc32, payload, 0, payload.Length);
if (await FullReadAsync(_networkStream, frame, 4, ct) != 4)
throw new ApplicationException("Could not read frame CRC : Connection shut down");
if (crc32 != BinaryPrimitives.ReadUInt32LittleEndian(frame))
throw new ApplicationException("Invalid envelope CRC32");
return payload;
}
internal async Task<ITLObject> RecvAsync(CancellationToken ct)
{
var data = await RecvFrameAsync(ct);
if (data.Length == 4 && data[3] == 0xFF)
{
int error_code = -BinaryPrimitives.ReadInt32LittleEndian(data);
throw new RpcException(error_code, TransportError(error_code));
}
if (data.Length < 24) // authKeyId+msgId+length+ctorNb | authKeyId+msgKey
throw new ApplicationException($"Packet payload too small: {data.Length}");
long authKeyId = BinaryPrimitives.ReadInt64LittleEndian(data);
if (authKeyId != _session.AuthKeyID)
throw new ApplicationException($"Received a packet encrypted with unexpected key {authKeyId:X}");
if (authKeyId == 0) // Unencrypted message
{
using var reader = new TL.BinaryReader(new MemoryStream(data, 8, data.Length - 8), this);
long msgId = _lastRecvMsgId = reader.ReadInt64();
if ((msgId & 1) == 0) throw new ApplicationException($"Invalid server msgId {msgId}");
int length = reader.ReadInt32();
if (length != data.Length - 20) throw new ApplicationException($"Unexpected unencrypted length {length} != {data.Length - 20}");
var obj = reader.ReadTLObject();
Helpers.Log(1, $"Receiving {obj.GetType().Name,-50} {_session.MsgIdToStamp(msgId):u} {((msgId & 2) == 0 ? "": "NAR")} unencrypted");
return obj;
}
else
{
#if MTPROTO1
byte[] msgKeyLarge = data[4..24];
#else
byte[] msgKeyLarge = data[0..24];
#endif
byte[] decrypted_data = EncryptDecryptMessage(data.AsSpan(24), false, _session.AuthKey, msgKeyLarge);
if (decrypted_data.Length < 36) // header below+ctorNb
throw new ApplicationException($"Decrypted packet too small: {decrypted_data.Length}");
using var reader = new TL.BinaryReader(new MemoryStream(decrypted_data), this);
var serverSalt = reader.ReadInt64(); // int64 salt
var sessionId = reader.ReadInt64(); // int64 session_id
var msgId = _lastRecvMsgId = reader.ReadInt64();// int64 message_id
var seqno = reader.ReadInt32(); // int32 msg_seqno
var length = reader.ReadInt32(); // int32 message_data_length
var msgStamp = _session.MsgIdToStamp(msgId);
if (serverSalt != _session.Salt)
{
Helpers.Log(3, $"Server salt has changed: {_session.Salt:X8} -> {serverSalt:X8}");
_session.Salt = serverSalt;
if (++_unexpectedSaltChange >= 30)
throw new ApplicationException($"Server salt changed unexpectedly more than 30 times during this session");
}
if (sessionId != _session.Id) throw new ApplicationException($"Unexpected session ID {_session.Id} != {_session.Id}");
if ((msgId & 1) == 0) throw new ApplicationException($"Invalid server msgId {msgId}");
if ((seqno & 1) != 0) lock(_msgsToAck) _msgsToAck.Add(msgId);
if ((msgStamp - DateTime.UtcNow).Ticks / TimeSpan.TicksPerSecond is > 30 or < -300)
return null;
#if MTPROTO1
if (decrypted_data.Length - 32 - length is < 0 or > 15) throw new ApplicationException($"Unexpected decrypted message_data_length {length} / {decrypted_data.Length - 32}");
if (!data.AsSpan(8, 16).SequenceEqual(Sha1Recv.ComputeHash(decrypted_data, 0, 32 + length).AsSpan(4)))
throw new ApplicationException($"Mismatch between MsgKey & decrypted SHA1");
#else
if (decrypted_data.Length - 32 - length is < 12 or > 1024) throw new ApplicationException($"Unexpected decrypted message_data_length {length} / {decrypted_data.Length - 32}");
Sha256Recv.Initialize();
Sha256Recv.TransformBlock(_session.AuthKey, 96, 32, null, 0);
Sha256Recv.TransformFinalBlock(decrypted_data, 0, decrypted_data.Length);
if (!data.AsSpan(8, 16).SequenceEqual(Sha256Recv.Hash.AsSpan(8, 16)))
throw new ApplicationException($"Mismatch between MsgKey & decrypted SHA1");
#endif
var ctorNb = reader.ReadUInt32();
if (ctorNb == Schema.MsgContainer)
{
Helpers.Log(1, $"Receiving {"MsgContainer",-50} {msgStamp:u} (svc)");
return ReadMsgContainer(reader);
}
else if (ctorNb == Schema.RpcResult)
{
Helpers.Log(1, $"Receiving {"RpcResult",-50} {msgStamp:u}");
return ReadRpcResult(reader);
}
else
{
var obj = reader.ReadTLObject(ctorNb);
Helpers.Log(1, $"Receiving {obj.GetType().Name,-50} {msgStamp:u} {((seqno & 1) != 0 ? "" : "(svc)")} {((msgId & 2) == 0 ? "" : "NAR")}");
return obj;
}
}
static string TransportError(int error_code) => error_code switch
{
404 => "Auth key not found",
429 => "Transport flood",
_ => ((HttpStatusCode)error_code).ToString(),
writer.WriteTLObject(msg);
return msg.GetType().Name;
};
}
internal MsgContainer ReadMsgContainer(TL.BinaryReader reader)
{
@ -464,6 +515,7 @@ namespace WTelegram
internal async Task<X> CallBareAsync<X>(ITLFunction request)
{
if (_bareRequest != 0) throw new ApplicationException("A bare request is already undergoing");
var msgId = await SendAsync(request, false);
var tcs = new TaskCompletionSource<object>();
lock (_pendingRequests)
@ -545,27 +597,6 @@ namespace WTelegram
return "as MsgContainer";
};
private async Task Reactor(CancellationToken ct)
{
try
{
while (!ct.IsCancellationRequested)
{
var obj = await RecvAsync(ct);
if (obj == null) continue; // ignored message :|
await HandleMessageAsync(obj);
}
}
catch (OperationCanceledException)
{ }
catch (Exception ex)
{
if (!ct.IsCancellationRequested)
Helpers.Log(5, $"An exception occured in the reactor: {ex}");
}
}
private async Task HandleMessageAsync(ITLObject obj)
{
switch (obj)

View file

@ -287,6 +287,11 @@ namespace TL
public RpcException(int code, string message) : base(message) => Code = code;
}
public class ReactorError : ITLObject
{
public Exception Exception;
}
// Below TL types are commented "parsed manually" from https://github.com/telegramdesktop/tdesktop/blob/dev/Telegram/Resources/tl/mtproto.tl
[TLDef(0xF35C6D01)] //rpc_result#f35c6d01 req_msg_id:long result:Object = RpcResult