diff --git a/src/Client.cs b/src/Client.cs index f351a3a..9f71f84 100644 --- a/src/Client.cs +++ b/src/Client.cs @@ -25,12 +25,13 @@ namespace WTelegram public event Action Update; public Config TLConfig { get; private set; } public int MaxAutoReconnects { get; set; } = 5; // number of automatic reconnections on connection/reactor failure + public bool IsMainDC => (_dcSession?.DataCenter?.id ?? 0) == _session.MainDC; private readonly Func _config; private readonly int _apiId; private readonly string _apiHash; private readonly Session _session; - private Session.DCSession DCSession => _session.CurrentDCSession; + private Session.DCSession _dcSession; private static readonly byte[] IntermediateHeader = new byte[4] { 0xee, 0xee, 0xee, 0xee }; private TcpClient _tcpClient; private NetworkStream _networkStream; @@ -43,6 +44,8 @@ namespace WTelegram private long _bareRequest; private readonly Dictionary tcs)> _pendingRequests = new(); private SemaphoreSlim _sendSemaphore = new(0); + private readonly SemaphoreSlim _semaphore = new(1); + private Task _connecting; private CancellationTokenSource _cts; private int _reactorReconnects = 0; @@ -54,6 +57,18 @@ namespace WTelegram _apiId = int.Parse(Config("api_id")); _apiHash = Config("api_hash"); _session = Session.LoadOrCreate(Config("session_pathname"), Convert.FromHexString(_apiHash)); + if (_session.MainDC != 0) _session.DCSessions.TryGetValue(_session.MainDC, out _dcSession); + _dcSession ??= new() { Id = Helpers.RandomLong() }; + _dcSession.Client = this; + } + + private Client(Client cloneOf, Session.DCSession dcSession) + { + _config = cloneOf._config; + _apiId = cloneOf._apiId; + _apiHash = cloneOf._apiHash; + _session = cloneOf._session; + _dcSession = dcSession; } public string Config(string config) @@ -92,36 +107,44 @@ namespace WTelegram public void Dispose() { - Helpers.Log(2, "Disposing the client"); - if (CheckMsgsToAck() is MsgsAck msgsAck) - SendAsync(MakeFunction(msgsAck), false).Wait(1000); - _cts?.Cancel(); - _reactorTask = null; - _tcpClient?.Dispose(); + Helpers.Log(2, $"{_dcSession.DcID}>Disposing the client"); + Reset(IsMainDC); } // disconnect and eventually reset sessions (forget servers, current user) public void Reset(bool resetSessions = true) { + if (CheckMsgsToAck() is MsgsAck msgsAck) + SendAsync(MakeFunction(msgsAck), false).Wait(1000); _cts?.Cancel(); _sendSemaphore = new(0); _reactorTask = null; _tcpClient?.Dispose(); + _connecting = null; if (resetSessions) { - _session.DCSessions.Clear(); + foreach (var altSession in _session.DCSessions.Values) + if (altSession.Client != null && altSession.Client != this) + { + altSession.Client.Dispose(); + altSession.Client = null; + } _session.User = null; } } /// Establish connection to Telegram servers. Config callback is queried for: server_address /// Most methods of this class are async Task, so please use - public async Task ConnectAsync(int dc = default) + public async Task ConnectAsync() { - if (_reactorTask != null) - throw new ApplicationException("Already connected!"); - _session.ChangeDC(dc); - var endpoint = DCSession?.EndPoint ?? Compat.IPEndPoint_Parse(Config("server_address")); + lock (this) + _connecting ??= DoConnectAsync(); + await _connecting; + } + + private async Task DoConnectAsync() + { + var endpoint = _dcSession?.EndPoint ?? Compat.IPEndPoint_Parse(Config("server_address")); Helpers.Log(2, $"Connecting to {endpoint}..."); _tcpClient = new TcpClient(endpoint.AddressFamily); await _tcpClient.ConnectAsync(endpoint.Address, endpoint.Port); @@ -134,8 +157,8 @@ namespace WTelegram try { - if (DCSession.AuthKeyID == 0) - await CreateAuthorizationKey(this, DCSession); + if (_dcSession.AuthKeyID == 0) + await CreateAuthorizationKey(this, _dcSession); var keepAliveTask = KeepAlive(_cts.Token); TLConfig = await this.InvokeWithLayer(Layer.Version, @@ -148,12 +171,12 @@ namespace WTelegram Config("lang_code"), Schema.Help_GetConfig)); _saltChangeCounter = 0; - if (DCSession.DataCenter == null) + if (_dcSession.DataCenter == null) { - DCSession.DataCenter = TLConfig.dc_options.Where(dc => dc.id == TLConfig.this_dc) + _dcSession.DataCenter = TLConfig.dc_options.Where(dc => dc.id == TLConfig.this_dc) .OrderByDescending(dc => dc.ip_address == endpoint.Address.ToString()) .ThenByDescending(dc => dc.port == endpoint.Port).First(); - _session.DCSessions[TLConfig.this_dc] = DCSession; + _session.DCSessions[TLConfig.this_dc] = _dcSession; } if (_session.MainDC == 0) _session.MainDC = TLConfig.this_dc; } @@ -164,34 +187,71 @@ namespace WTelegram Helpers.Log(2, $"Connected to {(TLConfig.test_mode ? "Test DC" : "DC")} {TLConfig.this_dc}... {TLConfig.flags & (Config.Flags)~0xE00}"); } - public async Task MigrateDCAsync(int dcId = 0) + public async Task GetClientForDC(int dcId, bool connect = true) { - if (dcId == 0) dcId = _session.MainDC; - if (DCSession.DataCenter?.id == dcId) return; - Helpers.Log(2, $"Migrate to DC {dcId}..."); - Auth_ExportedAuthorization exported = null; - if (_session.User != null && DCSession.DataCenter.id == _session.MainDC && _session.DCSessions.GetValueOrDefault(dcId)?.UserId != _session.User.id) - exported = await this.Auth_ExportAuthorization(dcId); - if (CheckMsgsToAck() is MsgsAck msgsAck) - await SendAsync(MakeFunction(msgsAck), false); + if (_dcSession.DataCenter?.id == dcId) return this; + Session.DCSession altSession; + lock (_session) + { + altSession = GetOrCreateDCSession(dcId); + altSession.Client ??= new Client(this, altSession); + } + Helpers.Log(2, $"Requested connection to DC {dcId}..."); + if (connect) + { + await _semaphore.WaitAsync(); + try + { + Auth_ExportedAuthorization exported = null; + if (_session.User != null && IsMainDC && altSession.UserId != _session.User.id) + exported = await this.Auth_ExportAuthorization(dcId); + await altSession.Client.ConnectAsync(); + if (exported != null) + { + var authorization = await altSession.Client.Auth_ImportAuthorization(exported.id, exported.bytes); + if (authorization is not Auth_Authorization { user: User user }) + throw new ApplicationException("Failed to get Authorization: " + authorization.GetType().Name); + _session.User = user; + altSession.UserId = user.id; + } + } + finally + { + _semaphore.Release(); + } + } + return altSession.Client; + } + + private Session.DCSession GetOrCreateDCSession(int dcId) + { + if (_session.DCSessions.TryGetValue(dcId, out var dcSession)) + return dcSession; var prevFamily = _tcpClient.Client.RemoteEndPoint.AddressFamily; - Reset(false); var dcOptions = TLConfig.dc_options.Where(dc => dc.id == dcId && (dc.flags & (DcOption.Flags.media_only | DcOption.Flags.cdn)) == 0); if (prevFamily == AddressFamily.InterNetworkV6) // try to stay in the same connectivity dcOptions = dcOptions.OrderByDescending(dc => dc.flags & DcOption.Flags.ipv6); // list ipv6 first else - dcOptions = dcOptions.OrderBy(dc => dc.flags & DcOption.Flags.ipv6); // list ipv4 first - var dcOption = dcOptions.FirstOrDefault() ?? throw new ApplicationException($"Could not find adequate dcOption for DC {dcId}"); - _session.DCSessions.GetOrCreate(dcId).DataCenter = dcOption; - await ConnectAsync(dcId); - if (exported != null) + dcOptions = dcOptions.OrderBy(dc => dc.flags & DcOption.Flags.ipv6); // list ipv4 first + var dcOption = dcOptions.FirstOrDefault() ?? throw new ApplicationException($"Could not find adequate dc_option for DC {dcId}"); + return dcSession = _session.DCSessions[dcId] = new Session.DCSession { DataCenter = dcOption, Id = Helpers.RandomLong() }; + } + + internal DateTime MsgIdToStamp(long serverMsgId) + => new((serverMsgId >> 32) * 10000000 - _dcSession.ServerTicksOffset + 621355968000000000L, DateTimeKind.Utc); + + internal (long msgId, int seqno) NewMsgId(bool isContent) + { + int seqno; + long msgId = DateTime.UtcNow.Ticks + _dcSession.ServerTicksOffset - 621355968000000000L; + msgId = msgId * 428 + (msgId >> 24) * 25110956; // approximately unixtime*2^32 and divisible by 4 + lock (_session) { - var authorization = await this.Auth_ImportAuthorization(exported.id, exported.bytes); - if (authorization is not Auth_Authorization { user: User user }) - throw new ApplicationException("Failed to get Authorization: " + authorization.GetType().Name); - _session.User = user; - DCSession.UserId = user.id; + if (msgId <= _dcSession.LastSentMsgId) msgId = _dcSession.LastSentMsgId += 4; else _dcSession.LastSentMsgId = msgId; + seqno = isContent ? _dcSession.Seqno++ * 2 + 1 : _dcSession.Seqno * 2; + _session.Save(); } + return (msgId, seqno); } private async Task KeepAlive(CancellationToken ct) @@ -236,7 +296,28 @@ namespace WTelegram Helpers.Log(5, $"An exception occured in the reactor: {ex}"); var oldSemaphore = _sendSemaphore; await oldSemaphore.WaitAsync(cts.Token); // prevent any sending while we reconnect + var reactorError = new ReactorError { Exception = ex }; try + { + _reactorReconnects = (_reactorReconnects + 1) % MaxAutoReconnects; + if (_reactorReconnects != 0) + { + lock (_msgsToAck) _msgsToAck.Clear(); + Reset(false); + await Task.Delay(5000); + await ConnectAsync(); // start a new reactor after 5 secs + lock (_pendingRequests) // retry all pending requests + { + foreach (var (_, tcs) in _pendingRequests.Values) + tcs.SetResult(reactorError); + _pendingRequests.Clear(); + _bareRequest = 0; + } + } + else + throw; + } + catch { lock (_pendingRequests) // abort all pending requests { @@ -245,14 +326,7 @@ namespace WTelegram _pendingRequests.Clear(); _bareRequest = 0; } - _reactorReconnects = (_reactorReconnects + 1) % MaxAutoReconnects; - if (_reactorReconnects != 0) - { - Reset(false); - await ConnectAsync(); // start a new reactor - } - else - OnUpdate(new ReactorError { Exception = ex }); + OnUpdate(reactorError); } finally { @@ -276,7 +350,7 @@ namespace WTelegram throw new ApplicationException($"Packet payload too small: {dataLen}"); long authKeyId = BinaryPrimitives.ReadInt64LittleEndian(data); - if (authKeyId != DCSession.AuthKeyID) + if (authKeyId != _dcSession.AuthKeyID) throw new ApplicationException($"Received a packet encrypted with unexpected key {authKeyId:X}"); if (authKeyId == 0) // Unencrypted message { @@ -287,12 +361,12 @@ namespace WTelegram if (length != dataLen - 20) throw new ApplicationException($"Unexpected unencrypted length {length} != {dataLen - 20}"); var obj = reader.ReadTLObject(); - Helpers.Log(1, $"Receiving {obj.GetType().Name,-50} {_session.MsgIdToStamp(msgId):u} {((msgId & 2) == 0 ? "" : "NAR")} unencrypted"); + Helpers.Log(1, $"{_dcSession.DcID}>Receiving {obj.GetType().Name,-40} {MsgIdToStamp(msgId):u} clear{((msgId & 2) == 0 ? "" : " NAR")}"); return obj; } else { - byte[] decrypted_data = EncryptDecryptMessage(data.AsSpan(24, dataLen - 24), false, DCSession.AuthKey, data, 8); + byte[] decrypted_data = EncryptDecryptMessage(data.AsSpan(24, dataLen - 24), false, _dcSession.AuthKey, data, 8); if (decrypted_data.Length < 36) // header below+ctorNb throw new ApplicationException($"Decrypted packet too small: {decrypted_data.Length}"); using var reader = new TL.BinaryReader(new MemoryStream(decrypted_data), this); @@ -301,17 +375,17 @@ namespace WTelegram var msgId = _lastRecvMsgId = reader.ReadInt64();// int64 message_id var seqno = reader.ReadInt32(); // int32 msg_seqno var length = reader.ReadInt32(); // int32 message_data_length - var msgStamp = _session.MsgIdToStamp(msgId); + var msgStamp = MsgIdToStamp(msgId); - if (serverSalt != DCSession.Salt) // salt change happens every 30 min + if (serverSalt != _dcSession.Salt) // salt change happens every 30 min { - Helpers.Log(2, $"Server salt has changed: {DCSession.Salt:X} -> {serverSalt:X}"); - DCSession.Salt = serverSalt; + Helpers.Log(2, $"{_dcSession.DcID}>Server salt has changed: {_dcSession.Salt:X} -> {serverSalt:X}"); + _dcSession.Salt = serverSalt; _saltChangeCounter += 20; // counter is decreased by KeepAlive every minute (we have margin of 10) if (_saltChangeCounter >= 30) throw new ApplicationException($"Server salt changed too often! Security issue?"); } - if (sessionId != _session.Id) throw new ApplicationException($"Unexpected session ID {_session.Id} != {_session.Id}"); + if (sessionId != _dcSession.Id) throw new ApplicationException($"Unexpected session ID {sessionId} != {_dcSession.Id}"); if ((msgId & 1) == 0) throw new ApplicationException($"Invalid server msgId {msgId}"); if ((seqno & 1) != 0) lock (_msgsToAck) _msgsToAck.Add(msgId); if ((msgStamp - DateTime.UtcNow).Ticks / TimeSpan.TicksPerSecond is > 30 or < -300) @@ -323,7 +397,7 @@ namespace WTelegram #else if (decrypted_data.Length - 32 - length is < 12 or > 1024) throw new ApplicationException($"Unexpected decrypted message_data_length {length} / {decrypted_data.Length - 32}"); Sha256Recv.Initialize(); - Sha256Recv.TransformBlock(DCSession.AuthKey, 96, 32, null, 0); + Sha256Recv.TransformBlock(_dcSession.AuthKey, 96, 32, null, 0); Sha256Recv.TransformFinalBlock(decrypted_data, 0, decrypted_data.Length); if (!data.AsSpan(8, 16).SequenceEqual(Sha256Recv.Hash.AsSpan(8, 16))) throw new ApplicationException($"Mismatch between MsgKey & decrypted SHA1"); @@ -331,18 +405,18 @@ namespace WTelegram var ctorNb = reader.ReadUInt32(); if (ctorNb == Layer.MsgContainerCtor) { - Helpers.Log(1, $"Receiving {"MsgContainer",-50} {msgStamp:u} (svc)"); + Helpers.Log(1, $"{_dcSession.DcID}>Receiving {"MsgContainer",-40} {msgStamp:u} (svc)"); return ReadMsgContainer(reader); } else if (ctorNb == Layer.RpcResultCtor) { - Helpers.Log(1, $"Receiving {"RpcResult",-50} {msgStamp:u}"); + Helpers.Log(1, $"{_dcSession.DcID}>Receiving {"RpcResult",-40} {msgStamp:u}"); return ReadRpcResult(reader); } else { var obj = reader.ReadTLObject(ctorNb); - Helpers.Log(1, $"Receiving {obj.GetType().Name,-50} {msgStamp:u} {((seqno & 1) != 0 ? "" : "(svc)")} {((msgId & 2) == 0 ? "" : "NAR")}"); + Helpers.Log(1, $"{_dcSession.DcID}>Receiving {obj.GetType().Name,-40} {msgStamp:u} {((seqno & 1) != 0 ? "" : "(svc)")} {((msgId & 2) == 0 ? "" : "NAR")}"); return obj; } } @@ -368,14 +442,14 @@ namespace WTelegram private async Task SendAsync(ITLFunction func, bool isContent) { - if (DCSession.AuthKeyID != 0 && isContent && CheckMsgsToAck() is MsgsAck msgsAck) + if (_dcSession.AuthKeyID != 0 && isContent && CheckMsgsToAck() is MsgsAck msgsAck) { - var ackMsg = _session.NewMsg(false); - var mainMsg = _session.NewMsg(true); + var ackMsg = NewMsgId(false); + var mainMsg = NewMsgId(true); await SendAsync(MakeContainer((MakeFunction(msgsAck), ackMsg), (func, mainMsg)), false); return mainMsg.msgId; } - (long msgId, int seqno) = _session.NewMsg(isContent && DCSession.AuthKeyID != 0); + (long msgId, int seqno) = NewMsgId(isContent && _dcSession.AuthKeyID != 0); await _sendSemaphore.WaitAsync(); try { @@ -383,13 +457,13 @@ namespace WTelegram using var writer = new BinaryWriter(memStream, Encoding.UTF8); writer.Write(0); // int32 payload_len (to be patched with payload length) - if (DCSession.AuthKeyID == 0) // send unencrypted message + if (_dcSession.AuthKeyID == 0) // send unencrypted message { writer.Write(0L); // int64 auth_key_id = 0 (Unencrypted) writer.Write(msgId); // int64 message_id writer.Write(0); // int32 message_data_length (to be patched) var typeName = func(writer); // bytes message_data - Helpers.Log(1, $"Sending {typeName}..."); + Helpers.Log(1, $"{_dcSession.DcID}>Sending {typeName}..."); BinaryPrimitives.WriteInt32LittleEndian(memStream.GetBuffer().AsSpan(20), (int)memStream.Length - 24); // patch message_data_length } else @@ -400,18 +474,18 @@ namespace WTelegram const int prepend = 0; #else const int prepend = 32; - clearWriter.Write(DCSession.AuthKey, 88, prepend); + clearWriter.Write(_dcSession.AuthKey, 88, prepend); #endif - clearWriter.Write(DCSession.Salt); // int64 salt - clearWriter.Write(_session.Id); // int64 session_id + clearWriter.Write(_dcSession.Salt); // int64 salt + clearWriter.Write(_dcSession.Id); // int64 session_id clearWriter.Write(msgId); // int64 message_id clearWriter.Write(seqno); // int32 msg_seqno clearWriter.Write(0); // int32 message_data_length (to be patched) var typeName = func(clearWriter); // bytes message_data if ((seqno & 1) != 0) - Helpers.Log(1, $"Sending {typeName,-50} #{(short)msgId.GetHashCode():X4}"); + Helpers.Log(1, $"{_dcSession.DcID}>Sending {typeName,-40} #{(short)msgId.GetHashCode():X4}"); else - Helpers.Log(1, $"Sending {typeName,-50} {_session.MsgIdToStamp(msgId):u} (svc)"); + Helpers.Log(1, $"{_dcSession.DcID}>Sending {typeName,-40} {MsgIdToStamp(msgId):u} (svc)"); int clearLength = (int)clearStream.Length - prepend; // length before padding (= 32 + message_data_length) int padding = (0x7FFFFFF0 - clearLength) % 16; #if !MTPROTO1 @@ -428,9 +502,9 @@ namespace WTelegram var msgKeyLarge = Sha256.ComputeHash(clearBuffer, 0, prepend + clearLength + padding); const int msgKeyOffset = 8; // msg_key = middle 128-bits of SHA256(authkey_part+plaintext+padding) #endif - byte[] encrypted_data = EncryptDecryptMessage(clearBuffer.AsSpan(prepend, clearLength + padding), true, DCSession.AuthKey, msgKeyLarge, msgKeyOffset); + byte[] encrypted_data = EncryptDecryptMessage(clearBuffer.AsSpan(prepend, clearLength + padding), true, _dcSession.AuthKey, msgKeyLarge, msgKeyOffset); - writer.Write(DCSession.AuthKeyID); // int64 auth_key_id + writer.Write(_dcSession.AuthKeyID); // int64 auth_key_id writer.Write(msgKeyLarge, msgKeyOffset, 16); // int128 msg_key writer.Write(encrypted_data); // bytes encrypted_data } @@ -475,13 +549,13 @@ namespace WTelegram var ctorNb = reader.ReadUInt32(); if (ctorNb == Layer.RpcResultCtor) { - Helpers.Log(1, $" → {"RpcResult",-48} {_session.MsgIdToStamp(msg.msg_id):u}"); + Helpers.Log(1, $" → {"RpcResult",-38} {MsgIdToStamp(msg.msg_id):u}"); msg.body = ReadRpcResult(reader); } else { var obj = msg.body = reader.ReadTLObject(ctorNb); - Helpers.Log(1, $" → {obj.GetType().Name,-48} {_session.MsgIdToStamp(msg.msg_id):u} {((msg.seqno & 1) != 0 ? "" : "(svc)")} {((msg.msg_id & 2) == 0 ? "" : "NAR")}"); + Helpers.Log(1, $" → {obj.GetType().Name,-38} {MsgIdToStamp(msg.msg_id):u} {((msg.seqno & 1) != 0 ? "" : "(svc)")} {((msg.msg_id & 2) == 0 ? "" : "NAR")}"); } } catch (Exception ex) @@ -508,7 +582,7 @@ namespace WTelegram else { result = reader.ReadTLObject(); - if (_session.MsgIdToStamp(msgId) >= _session.SessionStart) + if (MsgIdToStamp(msgId) >= _session.SessionStart) Log(4, "for unknown msgId "); else Log(1, "for past msgId "); @@ -518,9 +592,9 @@ namespace WTelegram void Log(int level, string msgIdprefix) { if (result is RpcError rpcError) - Helpers.Log(4, $" → RpcError {rpcError.error_code,3} {rpcError.error_message,-34} {msgIdprefix}#{(short)msgId.GetHashCode():X4}"); + Helpers.Log(4, $" → RpcError {rpcError.error_code,3} {rpcError.error_message,-24} {msgIdprefix}#{(short)msgId.GetHashCode():X4}"); else - Helpers.Log(level, $" → {result?.GetType().Name,-47} {msgIdprefix}#{(short)msgId.GetHashCode():X4}"); + Helpers.Log(level, $" → {result?.GetType().Name,-37} {msgIdprefix}#{(short)msgId.GetHashCode():X4}"); } } @@ -559,10 +633,21 @@ namespace WTelegram int number; if (rpcError.error_code == 303 && ((number = rpcError.error_message.IndexOf("_MIGRATE_")) > 0)) { - number = int.Parse(rpcError.error_message[(number + 9)..]); - if (!rpcError.error_message.StartsWith("FILE_")) _session.MainDC = number; - await MigrateDCAsync(number); - goto retry; + if (!rpcError.error_message.StartsWith("FILE_")) + { + number = int.Parse(rpcError.error_message[(number + 9)..]); + // this is a hack to migrate _dcSession in-place (staying in same Client): + Session.DCSession dcSession; + lock (_session) + dcSession = GetOrCreateDCSession(number); + Reset(false); + _session.MainDC = number; + _dcSession.Client = null; + _dcSession = dcSession; + _dcSession.Client = this; + await ConnectAsync(); + goto retry; + } } else if (rpcError.error_code == 420 && ((number = rpcError.error_message.IndexOf("_WAIT_")) > 0)) { @@ -579,6 +664,8 @@ namespace WTelegram _session.Save(); } throw new RpcException(rpcError.error_code, rpcError.error_message); + case ReactorError: + goto retry; default: throw new ApplicationException($"{request.GetType().Name} call got a result of type {result.GetType().Name} instead of {typeof(X).Name}"); } @@ -608,9 +695,9 @@ namespace WTelegram writer.Write(0); var typeName = func(writer); if ((seqno & 1) != 0) - Helpers.Log(1, $"Sending → {typeName,-50} #{(short)msgId.GetHashCode():X4}"); + Helpers.Log(1, $" Sending → {typeName,-40} #{(short)msgId.GetHashCode():X4}"); else - Helpers.Log(1, $"Sending → {typeName,-50} {_session.MsgIdToStamp(msgId):u} (svc)"); + Helpers.Log(1, $" Sending → {typeName,-40} {MsgIdToStamp(msgId):u} (svc)"); writer.BaseStream.Position = patchPos; writer.Write((int)(writer.BaseStream.Length - patchPos - 4)); // patch bytes field writer.Seek(0, SeekOrigin.End); @@ -632,8 +719,8 @@ namespace WTelegram await HandleMessageAsync(msgCopy.orig_message.body); break; case BadServerSalt badServerSalt: - DCSession.Salt = badServerSalt.new_server_salt; - if (badServerSalt.bad_msg_id == DCSession.LastSentMsgId) + _dcSession.Salt = badServerSalt.new_server_salt; + if (badServerSalt.bad_msg_id == _dcSession.LastSentMsgId) { var newMsgId = await SendAsync(_lastSentMsg, true); lock (_pendingRequests) @@ -728,12 +815,13 @@ namespace WTelegram Helpers.Log(4, $"Error deserializing User! ({ex.Message}) Proceeding to login..."); } await this.Auth_LogOut(); + _dcSession.UserId = 0; } var authorization = await this.Auth_ImportBotAuthorization(0, _apiId, _apiHash, botToken); if (authorization is not Auth_Authorization { user: User user }) throw new ApplicationException("Failed to get Authorization: " + authorization.GetType().Name); _session.User = user; - DCSession.UserId = user.id; + _dcSession.UserId = user.id; _session.Save(); return user; } @@ -773,10 +861,10 @@ namespace WTelegram Helpers.Log(4, $"Error deserializing User! ({ex.Message}) Proceeding to login..."); } await this.Auth_LogOut(); - _session.User = null; + _dcSession.UserId = 0; } phone_number ??= Config("phone_number"); - var sentCode = await this.Auth_SendCode(phone_number, _apiId, _apiHash, settings ?? new()); + var sentCode = await this.Auth_SendCode(phone_number, _apiId, _apiHash, settings ??= new()); Helpers.Log(3, $"A verification code has been sent via {sentCode.type.GetType().Name[17..]}"); var verification_code = Config("verification_code"); Auth_AuthorizationBase authorization; @@ -806,7 +894,7 @@ namespace WTelegram throw new ApplicationException("Failed to get Authorization: " + authorization.GetType().Name); //TODO: find better serialization for User not subject to TL changes? _session.User = user; - DCSession.UserId = user.id; + _dcSession.UserId = user.id; _session.Save(); return user; } @@ -944,24 +1032,28 @@ namespace WTelegram const int ChunkSize = 128 * 1024; int fileSize = 0; Upload_File fileData; - try + var client = fileDC == 0 ? this : await GetClientForDC(fileDC); + do { - if (fileDC != 0) await MigrateDCAsync(fileDC); - do + Upload_FileBase fileBase; + try { - var fileBase = await this.Upload_GetFile(fileLocation, fileSize, ChunkSize); - fileData = fileBase as Upload_File; - if (fileData == null) - throw new ApplicationException("Upload_GetFile returned unsupported " + fileBase.GetType().Name); - await outputStream.WriteAsync(fileData.bytes, 0, fileData.bytes.Length); - fileSize += fileData.bytes.Length; - - } while (fileData.bytes.Length == ChunkSize); - } - finally - { - await MigrateDCAsync(); // migrate back to main DC - } + // TODO: speed-up download with multiple parallel getFile (share 10-parallel semaphore with upload) + fileBase = await client.Upload_GetFile(fileLocation, fileSize, ChunkSize); + } + catch (RpcException ex) when (ex.Code == 303 && ex.Message.StartsWith("FILE_MIGRATE_")) + { + var dcId = int.Parse(ex.Message[13..]); + client = await GetClientForDC(dcId); + fileBase = await client.Upload_GetFile(fileLocation, fileSize, ChunkSize); + } + fileData = fileBase as Upload_File; + if (fileData == null) + throw new ApplicationException("Upload_GetFile returned unsupported " + fileBase.GetType().Name); + await outputStream.WriteAsync(fileData.bytes, 0, fileData.bytes.Length); + fileSize += fileData.bytes.Length; + } while (fileData.bytes.Length == ChunkSize); + await outputStream.FlushAsync(); return fileData.type; } #endregion diff --git a/src/Encryption.cs b/src/Encryption.cs index c0859ec..c567f4d 100644 --- a/src/Encryption.cs +++ b/src/Encryption.cs @@ -196,7 +196,7 @@ namespace WTelegram private static void ValidityChecks(BigInteger p, int g) { - Helpers.Log(2, "Verifying encryption key safety... (this should happen only once)"); + Helpers.Log(2, "Verifying encryption key safety... (this should happen only once per DC)"); // check that 2^2047 <= p < 2^2048 if (p.GetBitLength() != 2048) throw new ApplicationException("p is not 2048-bit number"); // check that g generates a cyclic subgroup of prime order (p - 1) / 2, i.e. is a quadratic residue mod p. diff --git a/src/Session.cs b/src/Session.cs index 2165835..ff112ee 100644 --- a/src/Session.cs +++ b/src/Session.cs @@ -5,6 +5,7 @@ using System.Linq; using System.Net; using System.Security.Cryptography; using System.Text.Json; +using System.Threading; namespace WTelegram { @@ -13,10 +14,10 @@ namespace WTelegram public TL.User User; public int MainDC; public Dictionary DCSessions = new(); - public long Id; public class DCSession { + public long Id; public long AuthKeyID; public byte[] AuthKey; // 2048-bit = 256 bytes public long UserId; @@ -26,11 +27,13 @@ namespace WTelegram public long LastSentMsgId; public TL.DcOption DataCenter; + internal Client Client; + internal int DcID => DataCenter?.id ?? 0; internal IPEndPoint EndPoint => DataCenter == null ? null : new(IPAddress.Parse(DataCenter.ip_address), DataCenter.port); } public DateTime SessionStart => _sessionStart; - internal DCSession CurrentDCSession; + public readonly SemaphoreSlim _sem = new(1); private readonly DateTime _sessionStart = DateTime.UtcNow; private string _pathname; private byte[] _apiHash; // used as AES key for encryption of session file @@ -60,7 +63,7 @@ namespace WTelegram throw new ApplicationException($"Exception while reading session file: {ex.Message}\nDelete the file to start a new session", ex); } } - return new Session { _pathname = pathname, _apiHash = apiHash, Id = Helpers.RandomLong() }; + return new Session { _pathname = pathname, _apiHash = apiHash }; } internal static Session Load(string pathname, byte[] apiHash) @@ -95,28 +98,5 @@ namespace WTelegram File.Replace(tempPathname, _pathname, null); } } - - internal (long msgId, int seqno) NewMsg(bool isContent) - { - int seqno; - long msgId = DateTime.UtcNow.Ticks + CurrentDCSession.ServerTicksOffset - 621355968000000000L; - msgId = msgId * 428 + (msgId >> 24) * 25110956; // approximately unixtime*2^32 and divisible by 4 - lock (this) - { - if (msgId <= CurrentDCSession.LastSentMsgId) msgId = CurrentDCSession.LastSentMsgId += 4; else CurrentDCSession.LastSentMsgId = msgId; - seqno = isContent ? CurrentDCSession.Seqno++ * 2 + 1 : CurrentDCSession.Seqno * 2; - Save(); - } - return (msgId, seqno); - } - - internal DateTime MsgIdToStamp(long serverMsgId) - => new((serverMsgId >> 32) * 10000000 - CurrentDCSession.ServerTicksOffset + 621355968000000000L, DateTimeKind.Utc); - - internal void ChangeDC(int dc) - { - if (dc == 0) dc = MainDC; - CurrentDCSession = dc != 0 ? DCSessions[dc] : new(); - } } } \ No newline at end of file