From 35786ef02aec76c8ef4b8083448c8feaae6cffad Mon Sep 17 00:00:00 2001 From: Wizou Date: Thu, 23 Sep 2021 09:27:52 +0200 Subject: [PATCH] Support multi-DC sessions --- src/Client.cs | 102 ++++++++++++++++++++++++++++------------------ src/Encryption.cs | 3 +- src/Session.cs | 43 +++++++++++-------- 3 files changed, 89 insertions(+), 59 deletions(-) diff --git a/src/Client.cs b/src/Client.cs index 3ba48bd..bd47a56 100644 --- a/src/Client.cs +++ b/src/Client.cs @@ -30,6 +30,7 @@ namespace WTelegram private readonly int _apiId; private readonly string _apiHash; private readonly Session _session; + private Session.DCSession DCSession => _session.CurrentDCSession; private static readonly byte[] IntermediateHeader = new byte[4] { 0xee, 0xee, 0xee, 0xee }; private TcpClient _tcpClient; private NetworkStream _networkStream; @@ -99,27 +100,29 @@ namespace WTelegram _tcpClient?.Dispose(); } - // disconnect and eventually reset session (forget server address, current user and authkey) - public void Reset(bool resetSession = true) + // disconnect and eventually reset sessions (forget servers, current user) + public void Reset(bool resetSessions = true) { _cts?.Cancel(); _sendSemaphore = new(0); _reactorTask = null; _tcpClient?.Dispose(); - if (resetSession) - _session.Reset(); + if (resetSessions) + { + _session.DCSessions.Clear(); + _session.User = null; + } } /// Establish connection to Telegram servers. Config callback is queried for: server_address /// Most methods of this class are async Task, so please use - public async Task ConnectAsync() + public async Task ConnectAsync(int dc = default) { if (_reactorTask != null) throw new ApplicationException("Already connected!"); - var endpoint = _session.DataCenter == null ? Compat.IPEndPoint_Parse(Config("server_address")) - : new IPEndPoint(IPAddress.Parse(_session.DataCenter.ip_address), _session.DataCenter.port); + _session.ChangeDC(dc); + var endpoint = DCSession?.EndPoint ?? Compat.IPEndPoint_Parse(Config("server_address")); Helpers.Log(2, $"Connecting to {endpoint}..."); - //TODO: maintain different connections/sessions to different DCs for main API/file download/file upload? _tcpClient = new TcpClient(endpoint.AddressFamily); await _tcpClient.ConnectAsync(endpoint.Address, endpoint.Port); _networkStream = _tcpClient.GetStream(); @@ -128,28 +131,46 @@ namespace WTelegram _reactorTask = Reactor(_networkStream, _cts); _sendSemaphore.Release(); - if (_session.AuthKey == null) - await CreateAuthorizationKey(this, _session); + try + { + if (DCSession.AuthKeyID == 0) + await CreateAuthorizationKey(this, DCSession); - var keepAliveTask = KeepAlive(_cts.Token); - TLConfig = await this.InvokeWithLayer(Layer.Version, - Schema.InitConnection(_apiId, - Config("device_model"), - Config("system_version"), - Config("app_version"), - Config("system_lang_code"), - Config("lang_pack"), - Config("lang_code"), - Schema.Help_GetConfig)); + var keepAliveTask = KeepAlive(_cts.Token); + TLConfig = await this.InvokeWithLayer(Layer.Version, + Schema.InitConnection(_apiId, + Config("device_model"), + Config("system_version"), + Config("app_version"), + Config("system_lang_code"), + Config("lang_pack"), + Config("lang_code"), + Schema.Help_GetConfig)); + if (DCSession.DataCenter == null) + { + DCSession.DataCenter = TLConfig.dc_options.Where(dc => dc.id == TLConfig.this_dc) + .OrderByDescending(dc => dc.ip_address == endpoint.Address.ToString()) + .ThenByDescending(dc => dc.port == endpoint.Port).First(); + _session.DCSessions[TLConfig.this_dc] = DCSession; + } + if (_session.MainDC == 0) _session.MainDC = TLConfig.this_dc; + } + finally + { + _session.Save(); + } Helpers.Log(2, $"Connected to {(TLConfig.test_mode ? "Test DC" : "DC")} {TLConfig.this_dc}... {TLConfig.flags & (Config.Flags)~0xE00}"); } - private async Task MigrateDCAsync(int dcId) + public async Task MigrateDCAsync(int dcId) { + if (DCSession.DataCenter?.id == dcId) return; Helpers.Log(2, $"Migrate to DC {dcId}..."); Auth_ExportedAuthorization exported = null; - if (_session.User != null) + if (_session.User != null && DCSession.DataCenter.id == _session.MainDC) exported = await this.Auth_ExportAuthorization(dcId); + if (CheckMsgsToAck() is MsgsAck msgsAck) + await SendAsync(MakeFunction(msgsAck), false); var prevFamily = _tcpClient.Client.RemoteEndPoint.AddressFamily; Reset(false); var dcOptions = TLConfig.dc_options.Where(dc => dc.id == dcId && (dc.flags & (DcOption.Flags.media_only | DcOption.Flags.cdn)) == 0); @@ -157,9 +178,9 @@ namespace WTelegram dcOptions = dcOptions.OrderByDescending(dc => dc.flags & DcOption.Flags.ipv6); // list ipv6 first else dcOptions = dcOptions.OrderBy(dc => dc.flags & DcOption.Flags.ipv6); // list ipv4 first - var dcOption = dcOptions.FirstOrDefault(); - _session.Reset(dcOption ?? throw new ApplicationException($"Could not find adequate dcOption for DC {dcId}")); - await ConnectAsync(); + var dcOption = dcOptions.FirstOrDefault() ?? throw new ApplicationException($"Could not find adequate dcOption for DC {dcId}"); + _session.DCSessions.GetOrCreate(dcId).DataCenter = dcOption; + await ConnectAsync(dcId); if (exported != null) { var authorization = await this.Auth_ImportAuthorization(exported.id, exported.bytes); @@ -246,7 +267,7 @@ namespace WTelegram throw new ApplicationException($"Packet payload too small: {dataLen}"); long authKeyId = BinaryPrimitives.ReadInt64LittleEndian(data); - if (authKeyId != _session.AuthKeyID) + if (authKeyId != DCSession.AuthKeyID) throw new ApplicationException($"Received a packet encrypted with unexpected key {authKeyId:X}"); if (authKeyId == 0) // Unencrypted message { @@ -262,7 +283,7 @@ namespace WTelegram } else { - byte[] decrypted_data = EncryptDecryptMessage(data.AsSpan(24, dataLen - 24), false, _session.AuthKey, data, 8); + byte[] decrypted_data = EncryptDecryptMessage(data.AsSpan(24, dataLen - 24), false, DCSession.AuthKey, data, 8); if (decrypted_data.Length < 36) // header below+ctorNb throw new ApplicationException($"Decrypted packet too small: {decrypted_data.Length}"); using var reader = new TL.BinaryReader(new MemoryStream(decrypted_data), this); @@ -273,10 +294,10 @@ namespace WTelegram var length = reader.ReadInt32(); // int32 message_data_length var msgStamp = _session.MsgIdToStamp(msgId); - if (serverSalt != _session.Salt) + if (serverSalt != DCSession.Salt) { - Helpers.Log(2, $"Server salt has changed: {_session.Salt:X8} -> {serverSalt:X8}"); - _session.Salt = serverSalt; + Helpers.Log(2, $"Server salt has changed: {DCSession.Salt:X} -> {serverSalt:X}"); + DCSession.Salt = serverSalt; if (++_unexpectedSaltChange >= 30) throw new ApplicationException($"Server salt changed unexpectedly more than 30 times during this session"); } @@ -292,7 +313,7 @@ namespace WTelegram #else if (decrypted_data.Length - 32 - length is < 12 or > 1024) throw new ApplicationException($"Unexpected decrypted message_data_length {length} / {decrypted_data.Length - 32}"); Sha256Recv.Initialize(); - Sha256Recv.TransformBlock(_session.AuthKey, 96, 32, null, 0); + Sha256Recv.TransformBlock(DCSession.AuthKey, 96, 32, null, 0); Sha256Recv.TransformFinalBlock(decrypted_data, 0, decrypted_data.Length); if (!data.AsSpan(8, 16).SequenceEqual(Sha256Recv.Hash.AsSpan(8, 16))) throw new ApplicationException($"Mismatch between MsgKey & decrypted SHA1"); @@ -337,14 +358,14 @@ namespace WTelegram private async Task SendAsync(ITLFunction func, bool isContent) { - if (_session.AuthKeyID != 0 && isContent && CheckMsgsToAck() is MsgsAck msgsAck) + if (DCSession.AuthKeyID != 0 && isContent && CheckMsgsToAck() is MsgsAck msgsAck) { var ackMsg = _session.NewMsg(false); var mainMsg = _session.NewMsg(true); await SendAsync(MakeContainer((MakeFunction(msgsAck), ackMsg), (func, mainMsg)), false); return mainMsg.msgId; } - (long msgId, int seqno) = _session.NewMsg(isContent && _session.AuthKeyID != 0); + (long msgId, int seqno) = _session.NewMsg(isContent && DCSession.AuthKeyID != 0); await _sendSemaphore.WaitAsync(); try { @@ -352,7 +373,7 @@ namespace WTelegram using var writer = new BinaryWriter(memStream, Encoding.UTF8); writer.Write(0); // int32 payload_len (to be patched with payload length) - if (_session.AuthKeyID == 0) // send unencrypted message + if (DCSession.AuthKeyID == 0) // send unencrypted message { writer.Write(0L); // int64 auth_key_id = 0 (Unencrypted) writer.Write(msgId); // int64 message_id @@ -369,9 +390,9 @@ namespace WTelegram const int prepend = 0; #else const int prepend = 32; - clearWriter.Write(_session.AuthKey, 88, prepend); + clearWriter.Write(DCSession.AuthKey, 88, prepend); #endif - clearWriter.Write(_session.Salt); // int64 salt + clearWriter.Write(DCSession.Salt); // int64 salt clearWriter.Write(_session.Id); // int64 session_id clearWriter.Write(msgId); // int64 message_id clearWriter.Write(seqno); // int32 msg_seqno @@ -397,9 +418,9 @@ namespace WTelegram var msgKeyLarge = Sha256.ComputeHash(clearBuffer, 0, prepend + clearLength + padding); const int msgKeyOffset = 8; // msg_key = middle 128-bits of SHA256(authkey_part+plaintext+padding) #endif - byte[] encrypted_data = EncryptDecryptMessage(clearBuffer.AsSpan(prepend, clearLength + padding), true, _session.AuthKey, msgKeyLarge, msgKeyOffset); + byte[] encrypted_data = EncryptDecryptMessage(clearBuffer.AsSpan(prepend, clearLength + padding), true, DCSession.AuthKey, msgKeyLarge, msgKeyOffset); - writer.Write(_session.AuthKeyID); // int64 auth_key_id + writer.Write(DCSession.AuthKeyID); // int64 auth_key_id writer.Write(msgKeyLarge, msgKeyOffset, 16); // int128 msg_key writer.Write(encrypted_data); // bytes encrypted_data } @@ -529,6 +550,7 @@ namespace WTelegram if (rpcError.error_code == 303 && ((number = rpcError.error_message.IndexOf("_MIGRATE_")) > 0)) { number = int.Parse(rpcError.error_message[(number + 9)..]); + if (!rpcError.error_message.StartsWith("FILE_")) _session.MainDC = number; await MigrateDCAsync(number); goto retry; } @@ -600,8 +622,8 @@ namespace WTelegram await HandleMessageAsync(msgCopy.orig_message.body); break; case BadServerSalt badServerSalt: - _session.Salt = badServerSalt.new_server_salt; - if (badServerSalt.bad_msg_id == _session.LastSentMsgId) + DCSession.Salt = badServerSalt.new_server_salt; + if (badServerSalt.bad_msg_id == DCSession.LastSentMsgId) { var newMsgId = await SendAsync(_lastSentMsg, true); lock (_pendingRequests) diff --git a/src/Encryption.cs b/src/Encryption.cs index edd7b88..c0859ec 100644 --- a/src/Encryption.cs +++ b/src/Encryption.cs @@ -24,7 +24,7 @@ namespace WTelegram #endif private static readonly Dictionary PublicKeys = new(); - internal static async Task CreateAuthorizationKey(Client client, Session session) + internal static async Task CreateAuthorizationKey(Client client, Session.DCSession session) { if (PublicKeys.Count == 0) LoadDefaultPublicKeys(); @@ -172,7 +172,6 @@ namespace WTelegram session.AuthKeyID = BinaryPrimitives.ReadInt64LittleEndian(authKeyHash.AsSpan(12)); session.AuthKey = authKey; session.Salt = BinaryPrimitives.ReadInt64LittleEndian(pqInnerData.new_nonce.raw) ^ BinaryPrimitives.ReadInt64LittleEndian(resPQ.server_nonce.raw); - session.Save(); static (byte[] key, byte[] iv) ConstructTmpAESKeyIV(Int128 server_nonce, Int256 new_nonce) { diff --git a/src/Session.cs b/src/Session.cs index d07fd09..04d92ba 100644 --- a/src/Session.cs +++ b/src/Session.cs @@ -1,6 +1,8 @@ using System; +using System.Collections.Generic; using System.IO; using System.Linq; +using System.Net; using System.Security.Cryptography; using System.Text.Json; @@ -8,17 +10,26 @@ namespace WTelegram { internal class Session { - public long AuthKeyID; - public byte[] AuthKey; // 2048-bit = 256 bytes - public long Salt; - public long Id; - public int Seqno; - public long ServerTicksOffset; - public long LastSentMsgId; - public TL.DcOption DataCenter; public TL.User User; + public int MainDC; + public Dictionary DCSessions = new(); + public long Id; + + public class DCSession + { + public long AuthKeyID; + public byte[] AuthKey; // 2048-bit = 256 bytes + public long Salt; + public int Seqno; + public long ServerTicksOffset; + public long LastSentMsgId; + public TL.DcOption DataCenter; + + internal IPEndPoint EndPoint => DataCenter == null ? null : new(IPAddress.Parse(DataCenter.ip_address), DataCenter.port); + } public DateTime SessionStart => _sessionStart; + internal DCSession CurrentDCSession; private readonly DateTime _sessionStart = DateTime.UtcNow; private string _pathname; private byte[] _apiHash; // used as AES key for encryption of session file @@ -80,27 +91,25 @@ namespace WTelegram internal (long msgId, int seqno) NewMsg(bool isContent) { int seqno; - long msgId = DateTime.UtcNow.Ticks + ServerTicksOffset - 621355968000000000L; + long msgId = DateTime.UtcNow.Ticks + CurrentDCSession.ServerTicksOffset - 621355968000000000L; msgId = msgId * 428 + (msgId >> 24) * 25110956; // approximately unixtime*2^32 and divisible by 4 lock (this) { - if (msgId <= LastSentMsgId) msgId = LastSentMsgId += 4; else LastSentMsgId = msgId; + if (msgId <= CurrentDCSession.LastSentMsgId) msgId = CurrentDCSession.LastSentMsgId += 4; else CurrentDCSession.LastSentMsgId = msgId; - seqno = isContent ? Seqno++ * 2 + 1 : Seqno * 2; + seqno = isContent ? CurrentDCSession.Seqno++ * 2 + 1 : CurrentDCSession.Seqno * 2; Save(); } return (msgId, seqno); } internal DateTime MsgIdToStamp(long serverMsgId) - => new((serverMsgId >> 32) * 10000000 - ServerTicksOffset + 621355968000000000L, DateTimeKind.Utc); + => new((serverMsgId >> 32) * 10000000 - CurrentDCSession.ServerTicksOffset + 621355968000000000L, DateTimeKind.Utc); - internal void Reset(TL.DcOption newDC = null) + internal void ChangeDC(int dc) { - DataCenter = newDC; - AuthKeyID = Salt = Seqno = 0; - AuthKey = null; - User = null; + if (dc == 0) dc = MainDC; + CurrentDCSession = dc != 0 ? DCSessions[dc] : new(); } } } \ No newline at end of file