Gracefully handle a disconnection from an idle secondary DC

(don't reconnect)
This commit is contained in:
Wizou 2021-11-21 00:43:39 +01:00
parent e8a98a5799
commit 934895a81c
4 changed files with 20 additions and 18 deletions

View file

@ -63,6 +63,7 @@ namespace WTelegram
private CancellationTokenSource _cts;
private int _reactorReconnects = 0;
private const int FilePartSize = 512 * 1024;
private const string ConnectionShutDown = "Could not read payload length : Connection shut down";
private readonly SemaphoreSlim _parallelTransfers = new(10); // max parallel part uploads/downloads
#if MTPROTO1
private readonly SHA1 _sha1 = SHA1.Create();
@ -391,7 +392,7 @@ namespace WTelegram
try
{
if (await FullReadAsync(stream, data, 4, cts.Token) != 4)
throw new ApplicationException("Could not read payload length : Connection shut down");
throw new ApplicationException(ConnectionShutDown);
int payloadLen = BinaryPrimitives.ReadInt32LittleEndian(data);
if (payloadLen > data.Length)
data = new byte[payloadLen];
@ -405,17 +406,20 @@ namespace WTelegram
catch (Exception ex) // an exception in RecvAsync is always fatal
{
if (cts.IsCancellationRequested) return;
Helpers.Log(5, $"An exception occured in the reactor: {ex}");
Helpers.Log(5, $"{_dcSession.DcID}>An exception occured in the reactor: {ex}");
var oldSemaphore = _sendSemaphore;
await oldSemaphore.WaitAsync(cts.Token); // prevent any sending while we reconnect
var reactorError = new ReactorError { Exception = ex };
try
{
lock (_msgsToAck) _msgsToAck.Clear();
Reset(false, false);
_reactorReconnects = (_reactorReconnects + 1) % MaxAutoReconnects;
if (!IsMainDC && _pendingRequests.Count <= 1 && ex is ApplicationException { Message: ConnectionShutDown } or IOException { InnerException: SocketException })
if (_pendingRequests.Values.FirstOrDefault() is var (type, tcs) && (type is null || type == typeof(Pong)))
_reactorReconnects = 0;
if (_reactorReconnects != 0)
{
lock (_msgsToAck) _msgsToAck.Clear();
Reset(false, false);
await Task.Delay(5000);
await ConnectAsync(); // start a new reactor after 5 secs
lock (_pendingRequests) // retry all pending requests
@ -450,7 +454,6 @@ namespace WTelegram
{
oldSemaphore.Release();
}
cts.Cancel(); // always stop the reactor
}
if (obj != null)
await HandleMessageAsync(obj);
@ -584,8 +587,8 @@ namespace WTelegram
writer.Write(0L); // int64 auth_key_id = 0 (Unencrypted)
writer.Write(msgId); // int64 message_id
writer.Write(0); // int32 message_data_length (to be patched)
writer.WriteTLObject(msg); // bytes message_data
Helpers.Log(1, $"{_dcSession.DcID}>Sending {msg.GetType().Name.TrimEnd('_')}...");
writer.WriteTLObject(msg); // bytes message_data
BinaryPrimitives.WriteInt32LittleEndian(memStream.GetBuffer().AsSpan(20), (int)memStream.Length - 24); // patch message_data_length
}
else
@ -603,11 +606,11 @@ namespace WTelegram
clearWriter.Write(msgId); // int64 message_id
clearWriter.Write(seqno); // int32 msg_seqno
clearWriter.Write(0); // int32 message_data_length (to be patched)
clearWriter.WriteTLObject(msg); // bytes message_data
if ((seqno & 1) != 0)
Helpers.Log(1, $"{_dcSession.DcID}>Sending {msg.GetType().Name.TrimEnd('_'),-40} #{(short)msgId.GetHashCode():X4}");
else
Helpers.Log(1, $"{_dcSession.DcID}>Sending {msg.GetType().Name.TrimEnd('_'),-40} {MsgIdToStamp(msgId):u} (svc)");
clearWriter.WriteTLObject(msg); // bytes message_data
int clearLength = (int)clearStream.Length - prepend; // length before padding (= 32 + message_data_length)
int padding = (0x7FFFFFF0 - clearLength) % 16;
#if !MTPROTO1

View file

@ -178,9 +178,9 @@ namespace TL
writer.Write(0); // patched below
writer.WriteTLObject(msg.body);
if ((msg.seqno & 1) != 0)
WTelegram.Helpers.Log(1, $" Sending → {msg.body.GetType().Name.TrimEnd('_'),-40} #{(short)msg.msg_id.GetHashCode():X4}");
WTelegram.Helpers.Log(1, $" → {msg.body.GetType().Name.TrimEnd('_'),-38} #{(short)msg.msg_id.GetHashCode():X4}");
else
WTelegram.Helpers.Log(1, $" Sending → {msg.body.GetType().Name.TrimEnd('_'),-40}");
WTelegram.Helpers.Log(1, $" → {msg.body.GetType().Name.TrimEnd('_'),-38}");
writer.BaseStream.Position = patchPos;
writer.Write((int)(writer.BaseStream.Length - patchPos - 4)); // patch bytes field
writer.Seek(0, SeekOrigin.End);