MULTIPLE-CONNECTION! First version that implement parallel active connections to DCs (through Client instances dependent of the main Client instance)

Also improved on:
- reconnection/retry/resent strategy
- start of multiple parallel downloads triggering a new DC connection
This commit is contained in:
Wizou 2021-09-28 16:12:20 +02:00
parent 66757ccd0b
commit da5098e8d5
3 changed files with 203 additions and 131 deletions

View file

@ -25,12 +25,13 @@ namespace WTelegram
public event Action<ITLObject> Update; public event Action<ITLObject> Update;
public Config TLConfig { get; private set; } public Config TLConfig { get; private set; }
public int MaxAutoReconnects { get; set; } = 5; // number of automatic reconnections on connection/reactor failure public int MaxAutoReconnects { get; set; } = 5; // number of automatic reconnections on connection/reactor failure
public bool IsMainDC => (_dcSession?.DataCenter?.id ?? 0) == _session.MainDC;
private readonly Func<string, string> _config; private readonly Func<string, string> _config;
private readonly int _apiId; private readonly int _apiId;
private readonly string _apiHash; private readonly string _apiHash;
private readonly Session _session; private readonly Session _session;
private Session.DCSession DCSession => _session.CurrentDCSession; private Session.DCSession _dcSession;
private static readonly byte[] IntermediateHeader = new byte[4] { 0xee, 0xee, 0xee, 0xee }; private static readonly byte[] IntermediateHeader = new byte[4] { 0xee, 0xee, 0xee, 0xee };
private TcpClient _tcpClient; private TcpClient _tcpClient;
private NetworkStream _networkStream; private NetworkStream _networkStream;
@ -43,6 +44,8 @@ namespace WTelegram
private long _bareRequest; private long _bareRequest;
private readonly Dictionary<long, (Type type, TaskCompletionSource<object> tcs)> _pendingRequests = new(); private readonly Dictionary<long, (Type type, TaskCompletionSource<object> tcs)> _pendingRequests = new();
private SemaphoreSlim _sendSemaphore = new(0); private SemaphoreSlim _sendSemaphore = new(0);
private readonly SemaphoreSlim _semaphore = new(1);
private Task _connecting;
private CancellationTokenSource _cts; private CancellationTokenSource _cts;
private int _reactorReconnects = 0; private int _reactorReconnects = 0;
@ -54,6 +57,18 @@ namespace WTelegram
_apiId = int.Parse(Config("api_id")); _apiId = int.Parse(Config("api_id"));
_apiHash = Config("api_hash"); _apiHash = Config("api_hash");
_session = Session.LoadOrCreate(Config("session_pathname"), Convert.FromHexString(_apiHash)); _session = Session.LoadOrCreate(Config("session_pathname"), Convert.FromHexString(_apiHash));
if (_session.MainDC != 0) _session.DCSessions.TryGetValue(_session.MainDC, out _dcSession);
_dcSession ??= new() { Id = Helpers.RandomLong() };
_dcSession.Client = this;
}
private Client(Client cloneOf, Session.DCSession dcSession)
{
_config = cloneOf._config;
_apiId = cloneOf._apiId;
_apiHash = cloneOf._apiHash;
_session = cloneOf._session;
_dcSession = dcSession;
} }
public string Config(string config) public string Config(string config)
@ -92,36 +107,44 @@ namespace WTelegram
public void Dispose() public void Dispose()
{ {
Helpers.Log(2, "Disposing the client"); Helpers.Log(2, $"{_dcSession.DcID}>Disposing the client");
if (CheckMsgsToAck() is MsgsAck msgsAck) Reset(IsMainDC);
SendAsync(MakeFunction(msgsAck), false).Wait(1000);
_cts?.Cancel();
_reactorTask = null;
_tcpClient?.Dispose();
} }
// disconnect and eventually reset sessions (forget servers, current user) // disconnect and eventually reset sessions (forget servers, current user)
public void Reset(bool resetSessions = true) public void Reset(bool resetSessions = true)
{ {
if (CheckMsgsToAck() is MsgsAck msgsAck)
SendAsync(MakeFunction(msgsAck), false).Wait(1000);
_cts?.Cancel(); _cts?.Cancel();
_sendSemaphore = new(0); _sendSemaphore = new(0);
_reactorTask = null; _reactorTask = null;
_tcpClient?.Dispose(); _tcpClient?.Dispose();
_connecting = null;
if (resetSessions) if (resetSessions)
{ {
_session.DCSessions.Clear(); foreach (var altSession in _session.DCSessions.Values)
if (altSession.Client != null && altSession.Client != this)
{
altSession.Client.Dispose();
altSession.Client = null;
}
_session.User = null; _session.User = null;
} }
} }
/// <summary>Establish connection to Telegram servers. Config callback is queried for: server_address</summary> /// <summary>Establish connection to Telegram servers. Config callback is queried for: server_address</summary>
/// <returns>Most methods of this class are async Task, so please use <see langword="await"/></returns> /// <returns>Most methods of this class are async Task, so please use <see langword="await"/></returns>
public async Task ConnectAsync(int dc = default) public async Task ConnectAsync()
{ {
if (_reactorTask != null) lock (this)
throw new ApplicationException("Already connected!"); _connecting ??= DoConnectAsync();
_session.ChangeDC(dc); await _connecting;
var endpoint = DCSession?.EndPoint ?? Compat.IPEndPoint_Parse(Config("server_address")); }
private async Task DoConnectAsync()
{
var endpoint = _dcSession?.EndPoint ?? Compat.IPEndPoint_Parse(Config("server_address"));
Helpers.Log(2, $"Connecting to {endpoint}..."); Helpers.Log(2, $"Connecting to {endpoint}...");
_tcpClient = new TcpClient(endpoint.AddressFamily); _tcpClient = new TcpClient(endpoint.AddressFamily);
await _tcpClient.ConnectAsync(endpoint.Address, endpoint.Port); await _tcpClient.ConnectAsync(endpoint.Address, endpoint.Port);
@ -134,8 +157,8 @@ namespace WTelegram
try try
{ {
if (DCSession.AuthKeyID == 0) if (_dcSession.AuthKeyID == 0)
await CreateAuthorizationKey(this, DCSession); await CreateAuthorizationKey(this, _dcSession);
var keepAliveTask = KeepAlive(_cts.Token); var keepAliveTask = KeepAlive(_cts.Token);
TLConfig = await this.InvokeWithLayer<Config>(Layer.Version, TLConfig = await this.InvokeWithLayer<Config>(Layer.Version,
@ -148,12 +171,12 @@ namespace WTelegram
Config("lang_code"), Config("lang_code"),
Schema.Help_GetConfig)); Schema.Help_GetConfig));
_saltChangeCounter = 0; _saltChangeCounter = 0;
if (DCSession.DataCenter == null) if (_dcSession.DataCenter == null)
{ {
DCSession.DataCenter = TLConfig.dc_options.Where(dc => dc.id == TLConfig.this_dc) _dcSession.DataCenter = TLConfig.dc_options.Where(dc => dc.id == TLConfig.this_dc)
.OrderByDescending(dc => dc.ip_address == endpoint.Address.ToString()) .OrderByDescending(dc => dc.ip_address == endpoint.Address.ToString())
.ThenByDescending(dc => dc.port == endpoint.Port).First(); .ThenByDescending(dc => dc.port == endpoint.Port).First();
_session.DCSessions[TLConfig.this_dc] = DCSession; _session.DCSessions[TLConfig.this_dc] = _dcSession;
} }
if (_session.MainDC == 0) _session.MainDC = TLConfig.this_dc; if (_session.MainDC == 0) _session.MainDC = TLConfig.this_dc;
} }
@ -164,34 +187,71 @@ namespace WTelegram
Helpers.Log(2, $"Connected to {(TLConfig.test_mode ? "Test DC" : "DC")} {TLConfig.this_dc}... {TLConfig.flags & (Config.Flags)~0xE00}"); Helpers.Log(2, $"Connected to {(TLConfig.test_mode ? "Test DC" : "DC")} {TLConfig.this_dc}... {TLConfig.flags & (Config.Flags)~0xE00}");
} }
public async Task MigrateDCAsync(int dcId = 0) public async Task<Client> GetClientForDC(int dcId, bool connect = true)
{ {
if (dcId == 0) dcId = _session.MainDC; if (_dcSession.DataCenter?.id == dcId) return this;
if (DCSession.DataCenter?.id == dcId) return; Session.DCSession altSession;
Helpers.Log(2, $"Migrate to DC {dcId}..."); lock (_session)
Auth_ExportedAuthorization exported = null; {
if (_session.User != null && DCSession.DataCenter.id == _session.MainDC && _session.DCSessions.GetValueOrDefault(dcId)?.UserId != _session.User.id) altSession = GetOrCreateDCSession(dcId);
exported = await this.Auth_ExportAuthorization(dcId); altSession.Client ??= new Client(this, altSession);
if (CheckMsgsToAck() is MsgsAck msgsAck) }
await SendAsync(MakeFunction(msgsAck), false); Helpers.Log(2, $"Requested connection to DC {dcId}...");
if (connect)
{
await _semaphore.WaitAsync();
try
{
Auth_ExportedAuthorization exported = null;
if (_session.User != null && IsMainDC && altSession.UserId != _session.User.id)
exported = await this.Auth_ExportAuthorization(dcId);
await altSession.Client.ConnectAsync();
if (exported != null)
{
var authorization = await altSession.Client.Auth_ImportAuthorization(exported.id, exported.bytes);
if (authorization is not Auth_Authorization { user: User user })
throw new ApplicationException("Failed to get Authorization: " + authorization.GetType().Name);
_session.User = user;
altSession.UserId = user.id;
}
}
finally
{
_semaphore.Release();
}
}
return altSession.Client;
}
private Session.DCSession GetOrCreateDCSession(int dcId)
{
if (_session.DCSessions.TryGetValue(dcId, out var dcSession))
return dcSession;
var prevFamily = _tcpClient.Client.RemoteEndPoint.AddressFamily; var prevFamily = _tcpClient.Client.RemoteEndPoint.AddressFamily;
Reset(false);
var dcOptions = TLConfig.dc_options.Where(dc => dc.id == dcId && (dc.flags & (DcOption.Flags.media_only | DcOption.Flags.cdn)) == 0); var dcOptions = TLConfig.dc_options.Where(dc => dc.id == dcId && (dc.flags & (DcOption.Flags.media_only | DcOption.Flags.cdn)) == 0);
if (prevFamily == AddressFamily.InterNetworkV6) // try to stay in the same connectivity if (prevFamily == AddressFamily.InterNetworkV6) // try to stay in the same connectivity
dcOptions = dcOptions.OrderByDescending(dc => dc.flags & DcOption.Flags.ipv6); // list ipv6 first dcOptions = dcOptions.OrderByDescending(dc => dc.flags & DcOption.Flags.ipv6); // list ipv6 first
else else
dcOptions = dcOptions.OrderBy(dc => dc.flags & DcOption.Flags.ipv6); // list ipv4 first dcOptions = dcOptions.OrderBy(dc => dc.flags & DcOption.Flags.ipv6); // list ipv4 first
var dcOption = dcOptions.FirstOrDefault() ?? throw new ApplicationException($"Could not find adequate dcOption for DC {dcId}"); var dcOption = dcOptions.FirstOrDefault() ?? throw new ApplicationException($"Could not find adequate dc_option for DC {dcId}");
_session.DCSessions.GetOrCreate(dcId).DataCenter = dcOption; return dcSession = _session.DCSessions[dcId] = new Session.DCSession { DataCenter = dcOption, Id = Helpers.RandomLong() };
await ConnectAsync(dcId); }
if (exported != null)
internal DateTime MsgIdToStamp(long serverMsgId)
=> new((serverMsgId >> 32) * 10000000 - _dcSession.ServerTicksOffset + 621355968000000000L, DateTimeKind.Utc);
internal (long msgId, int seqno) NewMsgId(bool isContent)
{
int seqno;
long msgId = DateTime.UtcNow.Ticks + _dcSession.ServerTicksOffset - 621355968000000000L;
msgId = msgId * 428 + (msgId >> 24) * 25110956; // approximately unixtime*2^32 and divisible by 4
lock (_session)
{ {
var authorization = await this.Auth_ImportAuthorization(exported.id, exported.bytes); if (msgId <= _dcSession.LastSentMsgId) msgId = _dcSession.LastSentMsgId += 4; else _dcSession.LastSentMsgId = msgId;
if (authorization is not Auth_Authorization { user: User user }) seqno = isContent ? _dcSession.Seqno++ * 2 + 1 : _dcSession.Seqno * 2;
throw new ApplicationException("Failed to get Authorization: " + authorization.GetType().Name); _session.Save();
_session.User = user;
DCSession.UserId = user.id;
} }
return (msgId, seqno);
} }
private async Task KeepAlive(CancellationToken ct) private async Task KeepAlive(CancellationToken ct)
@ -236,7 +296,28 @@ namespace WTelegram
Helpers.Log(5, $"An exception occured in the reactor: {ex}"); Helpers.Log(5, $"An exception occured in the reactor: {ex}");
var oldSemaphore = _sendSemaphore; var oldSemaphore = _sendSemaphore;
await oldSemaphore.WaitAsync(cts.Token); // prevent any sending while we reconnect await oldSemaphore.WaitAsync(cts.Token); // prevent any sending while we reconnect
var reactorError = new ReactorError { Exception = ex };
try try
{
_reactorReconnects = (_reactorReconnects + 1) % MaxAutoReconnects;
if (_reactorReconnects != 0)
{
lock (_msgsToAck) _msgsToAck.Clear();
Reset(false);
await Task.Delay(5000);
await ConnectAsync(); // start a new reactor after 5 secs
lock (_pendingRequests) // retry all pending requests
{
foreach (var (_, tcs) in _pendingRequests.Values)
tcs.SetResult(reactorError);
_pendingRequests.Clear();
_bareRequest = 0;
}
}
else
throw;
}
catch
{ {
lock (_pendingRequests) // abort all pending requests lock (_pendingRequests) // abort all pending requests
{ {
@ -245,14 +326,7 @@ namespace WTelegram
_pendingRequests.Clear(); _pendingRequests.Clear();
_bareRequest = 0; _bareRequest = 0;
} }
_reactorReconnects = (_reactorReconnects + 1) % MaxAutoReconnects; OnUpdate(reactorError);
if (_reactorReconnects != 0)
{
Reset(false);
await ConnectAsync(); // start a new reactor
}
else
OnUpdate(new ReactorError { Exception = ex });
} }
finally finally
{ {
@ -276,7 +350,7 @@ namespace WTelegram
throw new ApplicationException($"Packet payload too small: {dataLen}"); throw new ApplicationException($"Packet payload too small: {dataLen}");
long authKeyId = BinaryPrimitives.ReadInt64LittleEndian(data); long authKeyId = BinaryPrimitives.ReadInt64LittleEndian(data);
if (authKeyId != DCSession.AuthKeyID) if (authKeyId != _dcSession.AuthKeyID)
throw new ApplicationException($"Received a packet encrypted with unexpected key {authKeyId:X}"); throw new ApplicationException($"Received a packet encrypted with unexpected key {authKeyId:X}");
if (authKeyId == 0) // Unencrypted message if (authKeyId == 0) // Unencrypted message
{ {
@ -287,12 +361,12 @@ namespace WTelegram
if (length != dataLen - 20) throw new ApplicationException($"Unexpected unencrypted length {length} != {dataLen - 20}"); if (length != dataLen - 20) throw new ApplicationException($"Unexpected unencrypted length {length} != {dataLen - 20}");
var obj = reader.ReadTLObject(); var obj = reader.ReadTLObject();
Helpers.Log(1, $"Receiving {obj.GetType().Name,-50} {_session.MsgIdToStamp(msgId):u} {((msgId & 2) == 0 ? "" : "NAR")} unencrypted"); Helpers.Log(1, $"{_dcSession.DcID}>Receiving {obj.GetType().Name,-40} {MsgIdToStamp(msgId):u} clear{((msgId & 2) == 0 ? "" : " NAR")}");
return obj; return obj;
} }
else else
{ {
byte[] decrypted_data = EncryptDecryptMessage(data.AsSpan(24, dataLen - 24), false, DCSession.AuthKey, data, 8); byte[] decrypted_data = EncryptDecryptMessage(data.AsSpan(24, dataLen - 24), false, _dcSession.AuthKey, data, 8);
if (decrypted_data.Length < 36) // header below+ctorNb if (decrypted_data.Length < 36) // header below+ctorNb
throw new ApplicationException($"Decrypted packet too small: {decrypted_data.Length}"); throw new ApplicationException($"Decrypted packet too small: {decrypted_data.Length}");
using var reader = new TL.BinaryReader(new MemoryStream(decrypted_data), this); using var reader = new TL.BinaryReader(new MemoryStream(decrypted_data), this);
@ -301,17 +375,17 @@ namespace WTelegram
var msgId = _lastRecvMsgId = reader.ReadInt64();// int64 message_id var msgId = _lastRecvMsgId = reader.ReadInt64();// int64 message_id
var seqno = reader.ReadInt32(); // int32 msg_seqno var seqno = reader.ReadInt32(); // int32 msg_seqno
var length = reader.ReadInt32(); // int32 message_data_length var length = reader.ReadInt32(); // int32 message_data_length
var msgStamp = _session.MsgIdToStamp(msgId); var msgStamp = MsgIdToStamp(msgId);
if (serverSalt != DCSession.Salt) // salt change happens every 30 min if (serverSalt != _dcSession.Salt) // salt change happens every 30 min
{ {
Helpers.Log(2, $"Server salt has changed: {DCSession.Salt:X} -> {serverSalt:X}"); Helpers.Log(2, $"{_dcSession.DcID}>Server salt has changed: {_dcSession.Salt:X} -> {serverSalt:X}");
DCSession.Salt = serverSalt; _dcSession.Salt = serverSalt;
_saltChangeCounter += 20; // counter is decreased by KeepAlive every minute (we have margin of 10) _saltChangeCounter += 20; // counter is decreased by KeepAlive every minute (we have margin of 10)
if (_saltChangeCounter >= 30) if (_saltChangeCounter >= 30)
throw new ApplicationException($"Server salt changed too often! Security issue?"); throw new ApplicationException($"Server salt changed too often! Security issue?");
} }
if (sessionId != _session.Id) throw new ApplicationException($"Unexpected session ID {_session.Id} != {_session.Id}"); if (sessionId != _dcSession.Id) throw new ApplicationException($"Unexpected session ID {sessionId} != {_dcSession.Id}");
if ((msgId & 1) == 0) throw new ApplicationException($"Invalid server msgId {msgId}"); if ((msgId & 1) == 0) throw new ApplicationException($"Invalid server msgId {msgId}");
if ((seqno & 1) != 0) lock (_msgsToAck) _msgsToAck.Add(msgId); if ((seqno & 1) != 0) lock (_msgsToAck) _msgsToAck.Add(msgId);
if ((msgStamp - DateTime.UtcNow).Ticks / TimeSpan.TicksPerSecond is > 30 or < -300) if ((msgStamp - DateTime.UtcNow).Ticks / TimeSpan.TicksPerSecond is > 30 or < -300)
@ -323,7 +397,7 @@ namespace WTelegram
#else #else
if (decrypted_data.Length - 32 - length is < 12 or > 1024) throw new ApplicationException($"Unexpected decrypted message_data_length {length} / {decrypted_data.Length - 32}"); if (decrypted_data.Length - 32 - length is < 12 or > 1024) throw new ApplicationException($"Unexpected decrypted message_data_length {length} / {decrypted_data.Length - 32}");
Sha256Recv.Initialize(); Sha256Recv.Initialize();
Sha256Recv.TransformBlock(DCSession.AuthKey, 96, 32, null, 0); Sha256Recv.TransformBlock(_dcSession.AuthKey, 96, 32, null, 0);
Sha256Recv.TransformFinalBlock(decrypted_data, 0, decrypted_data.Length); Sha256Recv.TransformFinalBlock(decrypted_data, 0, decrypted_data.Length);
if (!data.AsSpan(8, 16).SequenceEqual(Sha256Recv.Hash.AsSpan(8, 16))) if (!data.AsSpan(8, 16).SequenceEqual(Sha256Recv.Hash.AsSpan(8, 16)))
throw new ApplicationException($"Mismatch between MsgKey & decrypted SHA1"); throw new ApplicationException($"Mismatch between MsgKey & decrypted SHA1");
@ -331,18 +405,18 @@ namespace WTelegram
var ctorNb = reader.ReadUInt32(); var ctorNb = reader.ReadUInt32();
if (ctorNb == Layer.MsgContainerCtor) if (ctorNb == Layer.MsgContainerCtor)
{ {
Helpers.Log(1, $"Receiving {"MsgContainer",-50} {msgStamp:u} (svc)"); Helpers.Log(1, $"{_dcSession.DcID}>Receiving {"MsgContainer",-40} {msgStamp:u} (svc)");
return ReadMsgContainer(reader); return ReadMsgContainer(reader);
} }
else if (ctorNb == Layer.RpcResultCtor) else if (ctorNb == Layer.RpcResultCtor)
{ {
Helpers.Log(1, $"Receiving {"RpcResult",-50} {msgStamp:u}"); Helpers.Log(1, $"{_dcSession.DcID}>Receiving {"RpcResult",-40} {msgStamp:u}");
return ReadRpcResult(reader); return ReadRpcResult(reader);
} }
else else
{ {
var obj = reader.ReadTLObject(ctorNb); var obj = reader.ReadTLObject(ctorNb);
Helpers.Log(1, $"Receiving {obj.GetType().Name,-50} {msgStamp:u} {((seqno & 1) != 0 ? "" : "(svc)")} {((msgId & 2) == 0 ? "" : "NAR")}"); Helpers.Log(1, $"{_dcSession.DcID}>Receiving {obj.GetType().Name,-40} {msgStamp:u} {((seqno & 1) != 0 ? "" : "(svc)")} {((msgId & 2) == 0 ? "" : "NAR")}");
return obj; return obj;
} }
} }
@ -368,14 +442,14 @@ namespace WTelegram
private async Task<long> SendAsync(ITLFunction func, bool isContent) private async Task<long> SendAsync(ITLFunction func, bool isContent)
{ {
if (DCSession.AuthKeyID != 0 && isContent && CheckMsgsToAck() is MsgsAck msgsAck) if (_dcSession.AuthKeyID != 0 && isContent && CheckMsgsToAck() is MsgsAck msgsAck)
{ {
var ackMsg = _session.NewMsg(false); var ackMsg = NewMsgId(false);
var mainMsg = _session.NewMsg(true); var mainMsg = NewMsgId(true);
await SendAsync(MakeContainer((MakeFunction(msgsAck), ackMsg), (func, mainMsg)), false); await SendAsync(MakeContainer((MakeFunction(msgsAck), ackMsg), (func, mainMsg)), false);
return mainMsg.msgId; return mainMsg.msgId;
} }
(long msgId, int seqno) = _session.NewMsg(isContent && DCSession.AuthKeyID != 0); (long msgId, int seqno) = NewMsgId(isContent && _dcSession.AuthKeyID != 0);
await _sendSemaphore.WaitAsync(); await _sendSemaphore.WaitAsync();
try try
{ {
@ -383,13 +457,13 @@ namespace WTelegram
using var writer = new BinaryWriter(memStream, Encoding.UTF8); using var writer = new BinaryWriter(memStream, Encoding.UTF8);
writer.Write(0); // int32 payload_len (to be patched with payload length) writer.Write(0); // int32 payload_len (to be patched with payload length)
if (DCSession.AuthKeyID == 0) // send unencrypted message if (_dcSession.AuthKeyID == 0) // send unencrypted message
{ {
writer.Write(0L); // int64 auth_key_id = 0 (Unencrypted) writer.Write(0L); // int64 auth_key_id = 0 (Unencrypted)
writer.Write(msgId); // int64 message_id writer.Write(msgId); // int64 message_id
writer.Write(0); // int32 message_data_length (to be patched) writer.Write(0); // int32 message_data_length (to be patched)
var typeName = func(writer); // bytes message_data var typeName = func(writer); // bytes message_data
Helpers.Log(1, $"Sending {typeName}..."); Helpers.Log(1, $"{_dcSession.DcID}>Sending {typeName}...");
BinaryPrimitives.WriteInt32LittleEndian(memStream.GetBuffer().AsSpan(20), (int)memStream.Length - 24); // patch message_data_length BinaryPrimitives.WriteInt32LittleEndian(memStream.GetBuffer().AsSpan(20), (int)memStream.Length - 24); // patch message_data_length
} }
else else
@ -400,18 +474,18 @@ namespace WTelegram
const int prepend = 0; const int prepend = 0;
#else #else
const int prepend = 32; const int prepend = 32;
clearWriter.Write(DCSession.AuthKey, 88, prepend); clearWriter.Write(_dcSession.AuthKey, 88, prepend);
#endif #endif
clearWriter.Write(DCSession.Salt); // int64 salt clearWriter.Write(_dcSession.Salt); // int64 salt
clearWriter.Write(_session.Id); // int64 session_id clearWriter.Write(_dcSession.Id); // int64 session_id
clearWriter.Write(msgId); // int64 message_id clearWriter.Write(msgId); // int64 message_id
clearWriter.Write(seqno); // int32 msg_seqno clearWriter.Write(seqno); // int32 msg_seqno
clearWriter.Write(0); // int32 message_data_length (to be patched) clearWriter.Write(0); // int32 message_data_length (to be patched)
var typeName = func(clearWriter); // bytes message_data var typeName = func(clearWriter); // bytes message_data
if ((seqno & 1) != 0) if ((seqno & 1) != 0)
Helpers.Log(1, $"Sending {typeName,-50} #{(short)msgId.GetHashCode():X4}"); Helpers.Log(1, $"{_dcSession.DcID}>Sending {typeName,-40} #{(short)msgId.GetHashCode():X4}");
else else
Helpers.Log(1, $"Sending {typeName,-50} {_session.MsgIdToStamp(msgId):u} (svc)"); Helpers.Log(1, $"{_dcSession.DcID}>Sending {typeName,-40} {MsgIdToStamp(msgId):u} (svc)");
int clearLength = (int)clearStream.Length - prepend; // length before padding (= 32 + message_data_length) int clearLength = (int)clearStream.Length - prepend; // length before padding (= 32 + message_data_length)
int padding = (0x7FFFFFF0 - clearLength) % 16; int padding = (0x7FFFFFF0 - clearLength) % 16;
#if !MTPROTO1 #if !MTPROTO1
@ -428,9 +502,9 @@ namespace WTelegram
var msgKeyLarge = Sha256.ComputeHash(clearBuffer, 0, prepend + clearLength + padding); var msgKeyLarge = Sha256.ComputeHash(clearBuffer, 0, prepend + clearLength + padding);
const int msgKeyOffset = 8; // msg_key = middle 128-bits of SHA256(authkey_part+plaintext+padding) const int msgKeyOffset = 8; // msg_key = middle 128-bits of SHA256(authkey_part+plaintext+padding)
#endif #endif
byte[] encrypted_data = EncryptDecryptMessage(clearBuffer.AsSpan(prepend, clearLength + padding), true, DCSession.AuthKey, msgKeyLarge, msgKeyOffset); byte[] encrypted_data = EncryptDecryptMessage(clearBuffer.AsSpan(prepend, clearLength + padding), true, _dcSession.AuthKey, msgKeyLarge, msgKeyOffset);
writer.Write(DCSession.AuthKeyID); // int64 auth_key_id writer.Write(_dcSession.AuthKeyID); // int64 auth_key_id
writer.Write(msgKeyLarge, msgKeyOffset, 16); // int128 msg_key writer.Write(msgKeyLarge, msgKeyOffset, 16); // int128 msg_key
writer.Write(encrypted_data); // bytes encrypted_data writer.Write(encrypted_data); // bytes encrypted_data
} }
@ -475,13 +549,13 @@ namespace WTelegram
var ctorNb = reader.ReadUInt32(); var ctorNb = reader.ReadUInt32();
if (ctorNb == Layer.RpcResultCtor) if (ctorNb == Layer.RpcResultCtor)
{ {
Helpers.Log(1, $" → {"RpcResult",-48} {_session.MsgIdToStamp(msg.msg_id):u}"); Helpers.Log(1, $" → {"RpcResult",-38} {MsgIdToStamp(msg.msg_id):u}");
msg.body = ReadRpcResult(reader); msg.body = ReadRpcResult(reader);
} }
else else
{ {
var obj = msg.body = reader.ReadTLObject(ctorNb); var obj = msg.body = reader.ReadTLObject(ctorNb);
Helpers.Log(1, $" → {obj.GetType().Name,-48} {_session.MsgIdToStamp(msg.msg_id):u} {((msg.seqno & 1) != 0 ? "" : "(svc)")} {((msg.msg_id & 2) == 0 ? "" : "NAR")}"); Helpers.Log(1, $" → {obj.GetType().Name,-38} {MsgIdToStamp(msg.msg_id):u} {((msg.seqno & 1) != 0 ? "" : "(svc)")} {((msg.msg_id & 2) == 0 ? "" : "NAR")}");
} }
} }
catch (Exception ex) catch (Exception ex)
@ -508,7 +582,7 @@ namespace WTelegram
else else
{ {
result = reader.ReadTLObject(); result = reader.ReadTLObject();
if (_session.MsgIdToStamp(msgId) >= _session.SessionStart) if (MsgIdToStamp(msgId) >= _session.SessionStart)
Log(4, "for unknown msgId "); Log(4, "for unknown msgId ");
else else
Log(1, "for past msgId "); Log(1, "for past msgId ");
@ -518,9 +592,9 @@ namespace WTelegram
void Log(int level, string msgIdprefix) void Log(int level, string msgIdprefix)
{ {
if (result is RpcError rpcError) if (result is RpcError rpcError)
Helpers.Log(4, $" → RpcError {rpcError.error_code,3} {rpcError.error_message,-34} {msgIdprefix}#{(short)msgId.GetHashCode():X4}"); Helpers.Log(4, $" → RpcError {rpcError.error_code,3} {rpcError.error_message,-24} {msgIdprefix}#{(short)msgId.GetHashCode():X4}");
else else
Helpers.Log(level, $" → {result?.GetType().Name,-47} {msgIdprefix}#{(short)msgId.GetHashCode():X4}"); Helpers.Log(level, $" → {result?.GetType().Name,-37} {msgIdprefix}#{(short)msgId.GetHashCode():X4}");
} }
} }
@ -559,10 +633,21 @@ namespace WTelegram
int number; int number;
if (rpcError.error_code == 303 && ((number = rpcError.error_message.IndexOf("_MIGRATE_")) > 0)) if (rpcError.error_code == 303 && ((number = rpcError.error_message.IndexOf("_MIGRATE_")) > 0))
{ {
number = int.Parse(rpcError.error_message[(number + 9)..]); if (!rpcError.error_message.StartsWith("FILE_"))
if (!rpcError.error_message.StartsWith("FILE_")) _session.MainDC = number; {
await MigrateDCAsync(number); number = int.Parse(rpcError.error_message[(number + 9)..]);
goto retry; // this is a hack to migrate _dcSession in-place (staying in same Client):
Session.DCSession dcSession;
lock (_session)
dcSession = GetOrCreateDCSession(number);
Reset(false);
_session.MainDC = number;
_dcSession.Client = null;
_dcSession = dcSession;
_dcSession.Client = this;
await ConnectAsync();
goto retry;
}
} }
else if (rpcError.error_code == 420 && ((number = rpcError.error_message.IndexOf("_WAIT_")) > 0)) else if (rpcError.error_code == 420 && ((number = rpcError.error_message.IndexOf("_WAIT_")) > 0))
{ {
@ -579,6 +664,8 @@ namespace WTelegram
_session.Save(); _session.Save();
} }
throw new RpcException(rpcError.error_code, rpcError.error_message); throw new RpcException(rpcError.error_code, rpcError.error_message);
case ReactorError:
goto retry;
default: default:
throw new ApplicationException($"{request.GetType().Name} call got a result of type {result.GetType().Name} instead of {typeof(X).Name}"); throw new ApplicationException($"{request.GetType().Name} call got a result of type {result.GetType().Name} instead of {typeof(X).Name}");
} }
@ -608,9 +695,9 @@ namespace WTelegram
writer.Write(0); writer.Write(0);
var typeName = func(writer); var typeName = func(writer);
if ((seqno & 1) != 0) if ((seqno & 1) != 0)
Helpers.Log(1, $"Sending → {typeName,-50} #{(short)msgId.GetHashCode():X4}"); Helpers.Log(1, $" Sending → {typeName,-40} #{(short)msgId.GetHashCode():X4}");
else else
Helpers.Log(1, $"Sending → {typeName,-50} {_session.MsgIdToStamp(msgId):u} (svc)"); Helpers.Log(1, $" Sending → {typeName,-40} {MsgIdToStamp(msgId):u} (svc)");
writer.BaseStream.Position = patchPos; writer.BaseStream.Position = patchPos;
writer.Write((int)(writer.BaseStream.Length - patchPos - 4)); // patch bytes field writer.Write((int)(writer.BaseStream.Length - patchPos - 4)); // patch bytes field
writer.Seek(0, SeekOrigin.End); writer.Seek(0, SeekOrigin.End);
@ -632,8 +719,8 @@ namespace WTelegram
await HandleMessageAsync(msgCopy.orig_message.body); await HandleMessageAsync(msgCopy.orig_message.body);
break; break;
case BadServerSalt badServerSalt: case BadServerSalt badServerSalt:
DCSession.Salt = badServerSalt.new_server_salt; _dcSession.Salt = badServerSalt.new_server_salt;
if (badServerSalt.bad_msg_id == DCSession.LastSentMsgId) if (badServerSalt.bad_msg_id == _dcSession.LastSentMsgId)
{ {
var newMsgId = await SendAsync(_lastSentMsg, true); var newMsgId = await SendAsync(_lastSentMsg, true);
lock (_pendingRequests) lock (_pendingRequests)
@ -728,12 +815,13 @@ namespace WTelegram
Helpers.Log(4, $"Error deserializing User! ({ex.Message}) Proceeding to login..."); Helpers.Log(4, $"Error deserializing User! ({ex.Message}) Proceeding to login...");
} }
await this.Auth_LogOut(); await this.Auth_LogOut();
_dcSession.UserId = 0;
} }
var authorization = await this.Auth_ImportBotAuthorization(0, _apiId, _apiHash, botToken); var authorization = await this.Auth_ImportBotAuthorization(0, _apiId, _apiHash, botToken);
if (authorization is not Auth_Authorization { user: User user }) if (authorization is not Auth_Authorization { user: User user })
throw new ApplicationException("Failed to get Authorization: " + authorization.GetType().Name); throw new ApplicationException("Failed to get Authorization: " + authorization.GetType().Name);
_session.User = user; _session.User = user;
DCSession.UserId = user.id; _dcSession.UserId = user.id;
_session.Save(); _session.Save();
return user; return user;
} }
@ -773,10 +861,10 @@ namespace WTelegram
Helpers.Log(4, $"Error deserializing User! ({ex.Message}) Proceeding to login..."); Helpers.Log(4, $"Error deserializing User! ({ex.Message}) Proceeding to login...");
} }
await this.Auth_LogOut(); await this.Auth_LogOut();
_session.User = null; _dcSession.UserId = 0;
} }
phone_number ??= Config("phone_number"); phone_number ??= Config("phone_number");
var sentCode = await this.Auth_SendCode(phone_number, _apiId, _apiHash, settings ?? new()); var sentCode = await this.Auth_SendCode(phone_number, _apiId, _apiHash, settings ??= new());
Helpers.Log(3, $"A verification code has been sent via {sentCode.type.GetType().Name[17..]}"); Helpers.Log(3, $"A verification code has been sent via {sentCode.type.GetType().Name[17..]}");
var verification_code = Config("verification_code"); var verification_code = Config("verification_code");
Auth_AuthorizationBase authorization; Auth_AuthorizationBase authorization;
@ -806,7 +894,7 @@ namespace WTelegram
throw new ApplicationException("Failed to get Authorization: " + authorization.GetType().Name); throw new ApplicationException("Failed to get Authorization: " + authorization.GetType().Name);
//TODO: find better serialization for User not subject to TL changes? //TODO: find better serialization for User not subject to TL changes?
_session.User = user; _session.User = user;
DCSession.UserId = user.id; _dcSession.UserId = user.id;
_session.Save(); _session.Save();
return user; return user;
} }
@ -944,24 +1032,28 @@ namespace WTelegram
const int ChunkSize = 128 * 1024; const int ChunkSize = 128 * 1024;
int fileSize = 0; int fileSize = 0;
Upload_File fileData; Upload_File fileData;
try var client = fileDC == 0 ? this : await GetClientForDC(fileDC);
do
{ {
if (fileDC != 0) await MigrateDCAsync(fileDC); Upload_FileBase fileBase;
do try
{ {
var fileBase = await this.Upload_GetFile(fileLocation, fileSize, ChunkSize); // TODO: speed-up download with multiple parallel getFile (share 10-parallel semaphore with upload)
fileData = fileBase as Upload_File; fileBase = await client.Upload_GetFile(fileLocation, fileSize, ChunkSize);
if (fileData == null) }
throw new ApplicationException("Upload_GetFile returned unsupported " + fileBase.GetType().Name); catch (RpcException ex) when (ex.Code == 303 && ex.Message.StartsWith("FILE_MIGRATE_"))
await outputStream.WriteAsync(fileData.bytes, 0, fileData.bytes.Length); {
fileSize += fileData.bytes.Length; var dcId = int.Parse(ex.Message[13..]);
client = await GetClientForDC(dcId);
} while (fileData.bytes.Length == ChunkSize); fileBase = await client.Upload_GetFile(fileLocation, fileSize, ChunkSize);
} }
finally fileData = fileBase as Upload_File;
{ if (fileData == null)
await MigrateDCAsync(); // migrate back to main DC throw new ApplicationException("Upload_GetFile returned unsupported " + fileBase.GetType().Name);
} await outputStream.WriteAsync(fileData.bytes, 0, fileData.bytes.Length);
fileSize += fileData.bytes.Length;
} while (fileData.bytes.Length == ChunkSize);
await outputStream.FlushAsync();
return fileData.type; return fileData.type;
} }
#endregion #endregion

View file

@ -196,7 +196,7 @@ namespace WTelegram
private static void ValidityChecks(BigInteger p, int g) private static void ValidityChecks(BigInteger p, int g)
{ {
Helpers.Log(2, "Verifying encryption key safety... (this should happen only once)"); Helpers.Log(2, "Verifying encryption key safety... (this should happen only once per DC)");
// check that 2^2047 <= p < 2^2048 // check that 2^2047 <= p < 2^2048
if (p.GetBitLength() != 2048) throw new ApplicationException("p is not 2048-bit number"); if (p.GetBitLength() != 2048) throw new ApplicationException("p is not 2048-bit number");
// check that g generates a cyclic subgroup of prime order (p - 1) / 2, i.e. is a quadratic residue mod p. // check that g generates a cyclic subgroup of prime order (p - 1) / 2, i.e. is a quadratic residue mod p.

View file

@ -5,6 +5,7 @@ using System.Linq;
using System.Net; using System.Net;
using System.Security.Cryptography; using System.Security.Cryptography;
using System.Text.Json; using System.Text.Json;
using System.Threading;
namespace WTelegram namespace WTelegram
{ {
@ -13,10 +14,10 @@ namespace WTelegram
public TL.User User; public TL.User User;
public int MainDC; public int MainDC;
public Dictionary<int, DCSession> DCSessions = new(); public Dictionary<int, DCSession> DCSessions = new();
public long Id;
public class DCSession public class DCSession
{ {
public long Id;
public long AuthKeyID; public long AuthKeyID;
public byte[] AuthKey; // 2048-bit = 256 bytes public byte[] AuthKey; // 2048-bit = 256 bytes
public long UserId; public long UserId;
@ -26,11 +27,13 @@ namespace WTelegram
public long LastSentMsgId; public long LastSentMsgId;
public TL.DcOption DataCenter; public TL.DcOption DataCenter;
internal Client Client;
internal int DcID => DataCenter?.id ?? 0;
internal IPEndPoint EndPoint => DataCenter == null ? null : new(IPAddress.Parse(DataCenter.ip_address), DataCenter.port); internal IPEndPoint EndPoint => DataCenter == null ? null : new(IPAddress.Parse(DataCenter.ip_address), DataCenter.port);
} }
public DateTime SessionStart => _sessionStart; public DateTime SessionStart => _sessionStart;
internal DCSession CurrentDCSession; public readonly SemaphoreSlim _sem = new(1);
private readonly DateTime _sessionStart = DateTime.UtcNow; private readonly DateTime _sessionStart = DateTime.UtcNow;
private string _pathname; private string _pathname;
private byte[] _apiHash; // used as AES key for encryption of session file private byte[] _apiHash; // used as AES key for encryption of session file
@ -60,7 +63,7 @@ namespace WTelegram
throw new ApplicationException($"Exception while reading session file: {ex.Message}\nDelete the file to start a new session", ex); throw new ApplicationException($"Exception while reading session file: {ex.Message}\nDelete the file to start a new session", ex);
} }
} }
return new Session { _pathname = pathname, _apiHash = apiHash, Id = Helpers.RandomLong() }; return new Session { _pathname = pathname, _apiHash = apiHash };
} }
internal static Session Load(string pathname, byte[] apiHash) internal static Session Load(string pathname, byte[] apiHash)
@ -95,28 +98,5 @@ namespace WTelegram
File.Replace(tempPathname, _pathname, null); File.Replace(tempPathname, _pathname, null);
} }
} }
internal (long msgId, int seqno) NewMsg(bool isContent)
{
int seqno;
long msgId = DateTime.UtcNow.Ticks + CurrentDCSession.ServerTicksOffset - 621355968000000000L;
msgId = msgId * 428 + (msgId >> 24) * 25110956; // approximately unixtime*2^32 and divisible by 4
lock (this)
{
if (msgId <= CurrentDCSession.LastSentMsgId) msgId = CurrentDCSession.LastSentMsgId += 4; else CurrentDCSession.LastSentMsgId = msgId;
seqno = isContent ? CurrentDCSession.Seqno++ * 2 + 1 : CurrentDCSession.Seqno * 2;
Save();
}
return (msgId, seqno);
}
internal DateTime MsgIdToStamp(long serverMsgId)
=> new((serverMsgId >> 32) * 10000000 - CurrentDCSession.ServerTicksOffset + 621355968000000000L, DateTimeKind.Utc);
internal void ChangeDC(int dc)
{
if (dc == 0) dc = MainDC;
CurrentDCSession = dc != 0 ? DCSessions[dc] : new();
}
} }
} }