From 6d238dc528e28e490f877ccedb722774ca91d6ea Mon Sep 17 00:00:00 2001 From: Wizou <11647984+wiz0u@users.noreply.github.com> Date: Sun, 6 Apr 2025 19:48:43 +0200 Subject: [PATCH] Store less stuff in session data and reduce save frequency for better performance. --- Examples/Program_Heroku.cs | 22 +++------- src/Client.cs | 88 ++++++++++++++++++++------------------ src/Encryption.cs | 8 ++-- src/Session.cs | 19 ++++---- 4 files changed, 67 insertions(+), 70 deletions(-) diff --git a/Examples/Program_Heroku.cs b/Examples/Program_Heroku.cs index 5ae5fe7..4ad3740 100644 --- a/Examples/Program_Heroku.cs +++ b/Examples/Program_Heroku.cs @@ -62,10 +62,8 @@ namespace WTelegramClientTest { private readonly NpgsqlConnection _sql; private readonly string _sessionName; - private byte[] _data; - private int _dataLen; - private DateTime _lastWrite; - private Task _delayedWrite; + private readonly byte[] _data; + private readonly int _dataLen; /// Heroku DB URL of the form "postgres://user:password@host:port/database" /// Entry name for the session data in the WTelegram_sessions table (default: "Heroku") @@ -85,7 +83,6 @@ namespace WTelegramClientTest protected override void Dispose(bool disposing) { - _delayedWrite?.Wait(); _sql.Dispose(); } @@ -97,18 +94,9 @@ namespace WTelegramClientTest public override void Write(byte[] buffer, int offset, int count) // Write call and buffer modifications are done within a lock() { - _data = buffer; _dataLen = count; - if (_delayedWrite != null) return; - var left = 1000 - (int)(DateTime.UtcNow - _lastWrite).TotalMilliseconds; - if (left < 0) - { - using var cmd = new NpgsqlCommand($"INSERT INTO WTelegram_sessions (name, data) VALUES ('{_sessionName}', @data) ON CONFLICT (name) DO UPDATE SET data = EXCLUDED.data", _sql); - cmd.Parameters.AddWithValue("data", count == buffer.Length ? buffer : buffer[offset..(offset + count)]); - cmd.ExecuteNonQuery(); - _lastWrite = DateTime.UtcNow; - } - else // delay writings for a full second - _delayedWrite = Task.Delay(left).ContinueWith(t => { lock (this) { _delayedWrite = null; Write(_data, 0, _dataLen); } }); + using var cmd = new NpgsqlCommand($"INSERT INTO WTelegram_sessions (name, data) VALUES ('{_sessionName}', @data) ON CONFLICT (name) DO UPDATE SET data = EXCLUDED.data", _sql); + cmd.Parameters.AddWithValue("data", count == buffer.Length ? buffer : buffer[offset..(offset + count)]); + cmd.ExecuteNonQuery(); } public override long Length => _dataLen; diff --git a/src/Client.cs b/src/Client.cs index 29d50b7..74e0b7a 100644 --- a/src/Client.cs +++ b/src/Client.cs @@ -115,7 +115,7 @@ namespace WTelegram _session = Session.LoadOrCreate(sessionStore, Convert.FromHexString(session_key)); if (_session.ApiId == 0) _session.ApiId = int.Parse(Config("api_id")); if (_session.MainDC != 0) _session.DCSessions.TryGetValue(_session.MainDC, out _dcSession); - _dcSession ??= new() { Id = Helpers.RandomLong() }; + _dcSession ??= new(); _dcSession.Client = this; var version = Assembly.GetExecutingAssembly().GetCustomAttribute().InformationalVersion; Helpers.Log(1, $"WTelegramClient {version} running under {System.Runtime.InteropServices.RuntimeInformation.FrameworkDescription}"); @@ -272,8 +272,8 @@ namespace WTelegram // we have already negociated an AuthKey with this DC if (dcSession.DataCenter.flags == flags && _session.DCSessions.Remove(-dcId)) return _session.DCSessions[dcId] = dcSession; // we found a misclassed DC, change its sign - dcSession = new Session.DCSession { Id = Helpers.RandomLong(), // clone AuthKey for a session on the matching media_only DC - AuthKeyID = dcSession.AuthKeyID, AuthKey = dcSession.AuthKey, UserId = dcSession.UserId }; + dcSession = new Session.DCSession { // clone AuthKey for a session on the matching media_only DC + authKeyID = dcSession.authKeyID, AuthKey = dcSession.AuthKey, UserId = dcSession.UserId }; } // try to find the most appropriate DcOption for this DC if (dcSession?.AuthKey == null) // we'll need to negociate an AuthKey => can't use media_only DC @@ -283,7 +283,7 @@ namespace WTelegram } var dcOptions = GetDcOptions(Math.Abs(dcId), flags); var dcOption = dcOptions.FirstOrDefault() ?? throw new WTException($"Could not find adequate dc_option for DC {dcId}"); - dcSession ??= new Session.DCSession { Id = Helpers.RandomLong() }; // create new session only if not already existing + dcSession ??= new(); // create new session only if not already existing dcSession.DataCenter = dcOption; return _session.DCSessions[dcId] = dcSession; } @@ -302,6 +302,7 @@ namespace WTelegram var flags = _dcSession.DataCenter.flags; if (dcId < 0) flags = (flags & DcOption.Flags.ipv6) | DcOption.Flags.media_only; altSession = GetOrCreateDCSession(dcId, flags); + _session.Save(); if (altSession.Client?.Disconnected ?? false) { altSession.Client.Dispose(); altSession.Client = null; } altSession.Client ??= new Client(this, altSession); } @@ -321,6 +322,7 @@ namespace WTelegram if (authorization is not Auth_Authorization { user: User user }) throw new WTException("Failed to get Authorization: " + authorization.GetType().Name); altSession.UserId = user.id; + lock (_session) _session.Save(); } } finally @@ -419,7 +421,7 @@ namespace WTelegram } internal DateTime MsgIdToStamp(long serverMsgId) - => new((serverMsgId >> 32) * 10000000 - _dcSession.ServerTicksOffset + 621355968000000000L, DateTimeKind.Utc); + => new((serverMsgId >> 32) * 10000000 - _dcSession.serverTicksOffset + 621355968000000000L, DateTimeKind.Utc); internal IObject ReadFrame(byte[] data, int dataLen) { @@ -432,7 +434,7 @@ namespace WTelegram throw new WTException($"Packet payload too small: {dataLen}"); long authKeyId = BinaryPrimitives.ReadInt64LittleEndian(data); - if (authKeyId != _dcSession.AuthKeyID) + if (authKeyId != _dcSession.authKeyID) throw new WTException($"Received a packet encrypted with unexpected key {authKeyId:X}"); if (authKeyId == 0) // Unencrypted message { @@ -468,7 +470,7 @@ namespace WTelegram if (length < 0 || length % 4 != 0) throw new WTException($"Invalid message_data_length: {length}"); if (decrypted_data.Length - 32 - length is < 12 or > 1024) throw new WTException($"Invalid message padding length: {decrypted_data.Length - 32}-{length}"); - if (sessionId != _dcSession.Id) throw new WTException($"Unexpected session ID: {sessionId} != {_dcSession.Id}"); + if (sessionId != _dcSession.id) throw new WTException($"Unexpected session ID: {sessionId} != {_dcSession.id}"); if ((msgId & 1) == 0) throw new WTException($"msg_id is not odd: {msgId}"); if (!_dcSession.CheckNewMsgId(msgId)) { @@ -477,19 +479,20 @@ namespace WTelegram } var utcNow = DateTime.UtcNow; if (_lastRecvMsgId == 0) // resync ServerTicksOffset on first message - _dcSession.ServerTicksOffset = (msgId >> 32) * 10000000 - utcNow.Ticks + 621355968000000000L; + _dcSession.serverTicksOffset = (msgId >> 32) * 10000000 - utcNow.Ticks + 621355968000000000L; var msgStamp = MsgIdToStamp(_lastRecvMsgId = msgId); long deltaTicks = (msgStamp - utcNow).Ticks; if (deltaTicks is > 0) if (deltaTicks < Ticks5Secs) // resync if next message is less than 5 seconds in the future - _dcSession.ServerTicksOffset += deltaTicks; - else if (_dcSession.ServerTicksOffset < -Ticks5Secs && deltaTicks + _dcSession.ServerTicksOffset < 0) - _dcSession.ServerTicksOffset += deltaTicks; + _dcSession.serverTicksOffset += deltaTicks; + else if (_dcSession.serverTicksOffset < -Ticks5Secs && deltaTicks + _dcSession.serverTicksOffset < 0) + _dcSession.serverTicksOffset += deltaTicks; if (serverSalt != _dcSession.Salt && serverSalt != _dcSession.OldSalt && serverSalt != _dcSession.Salts?.Values.ElementAtOrDefault(1)) { Helpers.Log(3, $"{_dcSession.DcID}>Server salt has changed: {_dcSession.Salt:X} -> {serverSalt:X}"); _dcSession.OldSalt = _dcSession.Salt; _dcSession.Salt = serverSalt; + lock (_session) _session.Save(); if (++_saltChangeCounter >= 10) throw new WTException("Server salt changed too often! Security issue?"); CheckSalt(); @@ -499,7 +502,7 @@ namespace WTelegram var ctorNb = reader.ReadUInt32(); if (ctorNb != Layer.BadMsgCtor && deltaTicks / TimeSpan.TicksPerSecond is > 30 or < -300) { // msg_id values that belong over 30 seconds in the future or over 300 seconds in the past are to be ignored. - Helpers.Log(1, $"{_dcSession.DcID}>Ignoring 0x{ctorNb:X8} because of wrong timestamp {msgStamp:u} - {utcNow:u} Δ={new TimeSpan(_dcSession.ServerTicksOffset):c}"); + Helpers.Log(1, $"{_dcSession.DcID}>Ignoring 0x{ctorNb:X8} because of wrong timestamp {msgStamp:u} - {utcNow:u} Δ={new TimeSpan(_dcSession.serverTicksOffset):c}"); return null; } try @@ -546,9 +549,11 @@ namespace WTelegram { var keys = _dcSession.Salts.Keys; if (keys[^1] == DateTime.MaxValue) return; // GetFutureSalts ongoing - var now = DateTime.UtcNow.AddTicks(_dcSession.ServerTicksOffset); - for (; keys.Count > 1 && keys[1] < now; _dcSession.OldSalt = _dcSession.Salt, _dcSession.Salt = _dcSession.Salts.Values[0]) + var now = DateTime.UtcNow.AddTicks(_dcSession.serverTicksOffset); + bool removed = false; + for (; keys.Count > 1 && keys[1] < now; _dcSession.OldSalt = _dcSession.Salt, _dcSession.Salt = _dcSession.Salts.Values[0], removed = true) _dcSession.Salts.RemoveAt(0); + if (removed) _session.Save(); if (_dcSession.Salts.Count > 48) return; } _dcSession.Salts[DateTime.MaxValue] = 0; @@ -694,7 +699,7 @@ namespace WTelegram rpc.tcs.SetResult(obj); return; } - else if (_dcSession.AuthKeyID == 0) + else if (_dcSession.authKeyID == 0) throw new WTException($"Received a {obj.GetType()} incompatible with expected bare {rpc?.type}"); lock (_pendingRpcs) _pendingRpcs[_bareRpc.msgId] = _bareRpc; @@ -726,7 +731,7 @@ namespace WTelegram break; // we don't do anything with these, for now case BadMsgNotification badMsgNotification: await _sendSemaphore.WaitAsync(); - bool retryLast = badMsgNotification.bad_msg_id == _dcSession.LastSentMsgId; + bool retryLast = badMsgNotification.bad_msg_id == _dcSession.lastSentMsgId; var lastSentMsg = _lastSentMsg; _sendSemaphore.Release(); var logLevel = badMsgNotification.error_code == 48 ? 2 : 4; @@ -735,14 +740,14 @@ namespace WTelegram { case 16: // msg_id too low (most likely, client time is wrong; synchronize it using msg_id notifications and re-send the original message) case 17: // msg_id too high (similar to the previous case, the client time has to be synchronized, and the message re-sent with the correct msg_id) - _dcSession.LastSentMsgId = 0; + _dcSession.lastSentMsgId = 0; var localTime = DateTime.UtcNow; - _dcSession.ServerTicksOffset = (_lastRecvMsgId >> 32) * 10000000 - localTime.Ticks + 621355968000000000L; - Helpers.Log(1, $"Time offset: {_dcSession.ServerTicksOffset} | Server: {MsgIdToStamp(_lastRecvMsgId).AddTicks(_dcSession.ServerTicksOffset).TimeOfDay} UTC | Local: {localTime.TimeOfDay} UTC"); + _dcSession.serverTicksOffset = (_lastRecvMsgId >> 32) * 10000000 - localTime.Ticks + 621355968000000000L; + Helpers.Log(1, $"Time offset: {_dcSession.serverTicksOffset} | Server: {MsgIdToStamp(_lastRecvMsgId).AddTicks(_dcSession.serverTicksOffset).TimeOfDay} UTC | Local: {localTime.TimeOfDay} UTC"); break; case 32: // msg_seqno too low (the server has already received a message with a lower msg_id but with either a higher or an equal and odd seqno) case 33: // msg_seqno too high (similarly, there is a message with a higher msg_id but with either a lower or an equal and odd seqno) - if (_dcSession.Seqno <= 1) + if (_dcSession.seqno <= 1) retryLast = false; else { @@ -754,6 +759,7 @@ namespace WTelegram case 48: // incorrect server salt (in this case, the bad_server_salt response is received with the correct salt, and the message is to be re-sent with it) _dcSession.OldSalt = _dcSession.Salt; _dcSession.Salt = ((BadServerSalt)badMsgNotification).new_server_salt; + lock (_session) _session.Save(); CheckSalt(); break; default: @@ -942,7 +948,7 @@ namespace WTelegram // is it address for a known DCSession? _dcSession = _session.DCSessions.Values.FirstOrDefault(dcs => dcs.EndPoint.Equals(endpoint)); if (defaultDc != 0) _dcSession ??= _session.DCSessions.GetValueOrDefault(defaultDc); - _dcSession ??= new() { Id = Helpers.RandomLong() }; + _dcSession ??= new(); _dcSession.Client = this; _dcSession.DataCenter = null; Helpers.Log(2, $"Connecting to {endpoint}..."); @@ -976,7 +982,7 @@ namespace WTelegram try { - if (_dcSession.AuthKeyID == 0) + if (_dcSession.authKeyID == 0) await CreateAuthorizationKey(this, _dcSession); if (_networkStream != null) _ = KeepAlive(_cts.Token); @@ -1125,6 +1131,7 @@ namespace WTelegram if (self.id == long.Parse(botToken.Split(':')[0])) { _session.UserId = _dcSession.UserId = self.id; + lock (_session) _session.Save(); RaiseUpdates(self); return User = self; } @@ -1164,6 +1171,7 @@ namespace WTelegram self.phone == string.Concat((phone_number = Config("phone_number")).Where(char.IsDigit))) { _session.UserId = _dcSession.UserId = self.id; + lock (_session) _session.Save(); RaiseUpdates(self); return User = self; } @@ -1420,21 +1428,17 @@ namespace WTelegram internal (long msgId, int seqno) NewMsgId(bool isContent) { int seqno; - long msgId = DateTime.UtcNow.Ticks + _dcSession.ServerTicksOffset - 621355968000000000L; + long msgId = DateTime.UtcNow.Ticks + _dcSession.serverTicksOffset - 621355968000000000L; msgId = msgId * 428 + (msgId >> 24) * 25110956; // approximately unixtime*2^32 and divisible by 4 - lock (_session) - { - if (msgId <= _dcSession.LastSentMsgId) msgId = _dcSession.LastSentMsgId += 4; else _dcSession.LastSentMsgId = msgId; - seqno = isContent ? _dcSession.Seqno++ * 2 + 1 : _dcSession.Seqno * 2; - _session.Save(); - } + if (msgId <= _dcSession.lastSentMsgId) msgId = _dcSession.lastSentMsgId += 4; else _dcSession.lastSentMsgId = msgId; + seqno = isContent ? _dcSession.seqno++ * 2 + 1 : _dcSession.seqno * 2; return (msgId, seqno); } private async Task SendAsync(IObject msg, bool isContent, Rpc rpc = null) { if (_reactorTask == null) throw new WTException("You must connect to Telegram first"); - isContent &= _dcSession.AuthKeyID != 0; + isContent &= _dcSession.authKeyID != 0; var (msgId, seqno) = NewMsgId(isContent); if (rpc != null) lock (_pendingRpcs) @@ -1462,7 +1466,7 @@ namespace WTelegram using var writer = new BinaryWriter(memStream); writer.Write(0); // int32 payload_len (to be patched with payload length) - if (_dcSession.AuthKeyID == 0) // send unencrypted message + if (_dcSession.authKeyID == 0) // send unencrypted message { if (_bareRpc == null) throw new WTException($"Shouldn't send a {msg.GetType().Name} unencrypted"); writer.Write(0L); // int64 auth_key_id = 0 (Unencrypted) @@ -1479,7 +1483,7 @@ namespace WTelegram using var clearWriter = new BinaryWriter(clearStream); clearWriter.Write(_dcSession.AuthKey, 88, 32); clearWriter.Write(_dcSession.Salt); // int64 salt - clearWriter.Write(_dcSession.Id); // int64 session_id + clearWriter.Write(_dcSession.id); // int64 session_id clearWriter.Write(msgId); // int64 message_id clearWriter.Write(seqno); // int32 msg_seqno clearWriter.Write(0); // int32 message_data_length (to be patched) @@ -1499,13 +1503,13 @@ namespace WTelegram const int msgKeyOffset = 8; // msg_key = middle 128-bits of SHA256(authkey_part+plaintext+padding) byte[] encrypted_data = EncryptDecryptMessage(clearBuffer.AsSpan(32, clearLength + padding), true, 0, _dcSession.AuthKey, msgKeyLarge, msgKeyOffset, _sha256); - writer.Write(_dcSession.AuthKeyID); // int64 auth_key_id + writer.Write(_dcSession.authKeyID); // int64 auth_key_id writer.Write(msgKeyLarge, msgKeyOffset, 16); // int128 msg_key writer.Write(encrypted_data); // bytes encrypted_data } if (_paddedMode) // Padded intermediate mode => append random padding { - var padding = new byte[_random.Next(_dcSession.AuthKeyID == 0 ? 257 : 16)]; + var padding = new byte[_random.Next(_dcSession.authKeyID == 0 ? 257 : 16)]; RNG.GetBytes(padding); writer.Write(padding); } @@ -1572,7 +1576,7 @@ namespace WTelegram /// Wait for the reply and return the resulting object, or throws an RpcException if an error was replied public async Task Invoke(IMethod query) { - if (_dcSession.WithoutUpdates && query is not IMethod and not IMethod) + if (_dcSession.withoutUpdates && query is not IMethod and not IMethod) query = new TL.Methods.InvokeWithoutUpdates { query = query }; bool got503 = false; retry: @@ -1610,7 +1614,7 @@ namespace WTelegram { if (x <= FloodRetryThreshold) { - if (x == 0) x =1; + if (x == 0) x = 1; await Task.Delay(x * 1000); goto retry; } @@ -1623,14 +1627,16 @@ namespace WTelegram else if (code == 400 && message == "CONNECTION_NOT_INITED") { await InitConnection(); + lock (_session) _session.Save(); goto retry; } else if (code == 500 && message == "AUTH_RESTART") - { - _session.UserId = 0; // force a full login authorization flow, next time - User = null; - lock (_session) _session.Save(); - } + lock (_session) + { + _session.UserId = 0; // force a full login authorization flow, next time + User = null; + _session.Save(); + } throw new RpcException(code, message, x); case ReactorError: goto retry; diff --git a/src/Encryption.cs b/src/Encryption.cs index d943e5d..957c537 100644 --- a/src/Encryption.cs +++ b/src/Encryption.cs @@ -110,9 +110,9 @@ namespace WTelegram var g_a = BigEndianInteger(serverDHinnerData.g_a); var dh_prime = BigEndianInteger(serverDHinnerData.dh_prime); CheckGoodPrime(dh_prime, serverDHinnerData.g); - session.LastSentMsgId = 0; - session.ServerTicksOffset = (serverDHinnerData.server_time - localTime).Ticks; - Helpers.Log(1, $"Time offset: {session.ServerTicksOffset} | Server: {serverDHinnerData.server_time.TimeOfDay} UTC | Local: {localTime.TimeOfDay} UTC"); + session.lastSentMsgId = 0; + session.serverTicksOffset = (serverDHinnerData.server_time - localTime).Ticks; + Helpers.Log(1, $"Time offset: {session.serverTicksOffset} | Server: {serverDHinnerData.server_time.TimeOfDay} UTC | Local: {localTime.TimeOfDay} UTC"); //6) var salt = new byte[256]; RNG.GetBytes(salt); @@ -159,7 +159,7 @@ namespace WTelegram if (!Enumerable.SequenceEqual(dhGenOk.new_nonce_hash1.raw, sha1.ComputeHash(expected_new_nonceN).Skip(4))) throw new WTException("setClientDHparamsAnswer.new_nonce_hashN mismatch"); - session.AuthKeyID = BinaryPrimitives.ReadInt64LittleEndian(authKeyHash.AsSpan(12)); + session.authKeyID = BinaryPrimitives.ReadInt64LittleEndian(authKeyHash.AsSpan(12)); session.AuthKey = authKey; session.Salt = BinaryPrimitives.ReadInt64LittleEndian(pqInnerData.new_nonce.raw) ^ BinaryPrimitives.ReadInt64LittleEndian(resPQ.server_nonce.raw); session.OldSalt = session.Salt; diff --git a/src/Session.cs b/src/Session.cs index 86a0fec..c12d65a 100644 --- a/src/Session.cs +++ b/src/Session.cs @@ -21,25 +21,25 @@ namespace WTelegram public sealed class DCSession { - public long Id; - public long AuthKeyID; public byte[] AuthKey; // 2048-bit = 256 bytes public long UserId; public long OldSalt; // still accepted for a further 1800 seconds public long Salt; public SortedList Salts; - public int Seqno; - public long ServerTicksOffset; - public long LastSentMsgId; public TL.DcOption DataCenter; - public bool WithoutUpdates; public int Layer; + internal long id = Helpers.RandomLong(); + internal long authKeyID; + internal int seqno; + internal long serverTicksOffset; + internal long lastSentMsgId; + internal bool withoutUpdates; internal Client Client; internal int DcID => DataCenter?.id ?? 0; internal IPEndPoint EndPoint => DataCenter == null ? null : new(IPAddress.Parse(DataCenter.ip_address), DataCenter.port); - internal void Renew() { Helpers.Log(3, $"Renewing session on DC {DcID}..."); Id = Helpers.RandomLong(); Seqno = 0; LastSentMsgId = 0; } - public void DisableUpdates(bool disable = true) { if (WithoutUpdates != disable) { WithoutUpdates = disable; Renew(); } } + internal void Renew() { Helpers.Log(3, $"Renewing session on DC {DcID}..."); id = Helpers.RandomLong(); seqno = 0; lastSentMsgId = 0; } + public void DisableUpdates(bool disable = true) { if (withoutUpdates != disable) { withoutUpdates = disable; Renew(); } } const int MsgIdsN = 512; private long[] _msgIds; @@ -117,6 +117,9 @@ namespace WTelegram throw new WTException("Integrity check failed in session loading"); session = JsonSerializer.Deserialize(utf8Json.AsSpan(32), Helpers.JsonOptions); Helpers.Log(2, "Loaded previous session"); + using var sha1 = SHA1.Create(); + foreach (var dcs in session.DCSessions.Values) + dcs.authKeyID = BinaryPrimitives.ReadInt64LittleEndian(sha1.ComputeHash(dcs.AuthKey).AsSpan(12)); } session ??= new Session(); session._store = store;