using System;
using System.Buffers.Binary;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Reflection;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using TL;
using static WTelegram.Encryption;
// necessary for .NET Standard 2.0 compilation:
#pragma warning disable CA1835 // Prefer the 'Memory'-based overloads for 'ReadAsync' and 'WriteAsync'
namespace WTelegram
{
public sealed class Client : IDisposable
{
/// This event will be called when an unsollicited update/message is sent by Telegram servers
/// See Examples/Program_ListenUpdate.cs for how to use this
public event Action Update;
public Config TLConfig { get; private set; }
public int MaxAutoReconnects { get; set; } = 5; // number of automatic reconnections on connection/reactor failure
public bool IsMainDC => (_dcSession?.DataCenter?.id ?? 0) == _session.MainDC;
public bool Disconnected => _tcpClient != null && !(_tcpClient.Client?.Connected ?? false);
private readonly Func _config;
private readonly int _apiId;
private readonly string _apiHash;
private readonly Session _session;
private Session.DCSession _dcSession;
private static readonly byte[] IntermediateHeader = new byte[4] { 0xee, 0xee, 0xee, 0xee };
private TcpClient _tcpClient;
private NetworkStream _networkStream;
private ITLFunction _lastSentMsg;
private long _lastRecvMsgId;
private readonly List _msgsToAck = new();
private readonly Random _random = new();
private int _saltChangeCounter;
private Task _reactorTask;
private long _bareRequest;
private readonly Dictionary tcs)> _pendingRequests = new();
private SemaphoreSlim _sendSemaphore = new(0);
private readonly SemaphoreSlim _semaphore = new(1);
private Task _connecting;
private CancellationTokenSource _cts;
private int _reactorReconnects = 0;
private const int FilePartSize = 512 * 1024;
private readonly SemaphoreSlim _parallelTransfers = new(10); // max parallel part uploads/downloads
#if MTPROTO1
private readonly SHA1 _sha1 = SHA1.Create();
private readonly SHA1 _sha1Recv = SHA1.Create();
#else
private readonly SHA256 _sha256 = SHA256.Create();
private readonly SHA256 _sha256Recv = SHA256.Create();
#endif
/// Welcome to WTelegramClient! 😀
/// Config callback, is queried for: api_id, api_hash, session_pathname
public Client(Func configProvider = null)
{
_config = configProvider ?? DefaultConfigOrAsk;
_apiId = int.Parse(Config("api_id"));
_apiHash = Config("api_hash");
_session = Session.LoadOrCreate(Config("session_pathname"), Convert.FromHexString(_apiHash));
if (_session.MainDC != 0) _session.DCSessions.TryGetValue(_session.MainDC, out _dcSession);
_dcSession ??= new() { Id = Helpers.RandomLong() };
_dcSession.Client = this;
}
private Client(Client cloneOf, Session.DCSession dcSession)
{
_config = cloneOf._config;
_apiId = cloneOf._apiId;
_apiHash = cloneOf._apiHash;
_session = cloneOf._session;
_dcSession = dcSession;
}
public string Config(string config)
=> _config(config) ?? DefaultConfig(config) ?? throw new ApplicationException("You must provide a config value for " + config);
public static string DefaultConfig(string config) => config switch
{
"session_pathname" => Path.Combine(
Path.GetDirectoryName(Path.GetDirectoryName(AppDomain.CurrentDomain.BaseDirectory.TrimEnd(Path.DirectorySeparatorChar))),
"WTelegram.session"),
#if DEBUG
"server_address" => "149.154.167.40:443",
#else
"server_address" => "149.154.167.50:443",
#endif
"device_model" => Environment.Is64BitOperatingSystem ? "PC 64bit" : "PC 32bit",
"system_version" => System.Runtime.InteropServices.RuntimeInformation.OSDescription,
"app_version" => Assembly.GetEntryAssembly().GetName().Version.ToString(),
"system_lang_code" => CultureInfo.InstalledUICulture.TwoLetterISOLanguageName,
"lang_pack" => "",
"lang_code" => CultureInfo.CurrentUICulture.TwoLetterISOLanguageName,
"user_id" => "-1",
"verification_code" => AskConfig(config),
_ => null // api_id api_hash phone_number... it's up to you to reply to these correctly
};
public static string DefaultConfigOrAsk(string config) => DefaultConfig(config) ?? AskConfig(config);
private static string AskConfig(string config)
{
if (config == "api_id") Console.WriteLine("Welcome! You can obtain your api_id/api_hash at https://my.telegram.org/apps");
Console.Write($"Enter {config.Replace('_', ' ')}: ");
return Console.ReadLine();
}
[System.Diagnostics.CodeAnalysis.SuppressMessage("Performance", "CA1822")]
public void LoadPublicKey(string pem) => Encryption.LoadPublicKey(pem);
public void Dispose()
{
Helpers.Log(2, $"{_dcSession.DcID}>Disposing the client");
Reset(false, IsMainDC);
}
// disconnect and eventually forget user and disconnect other sessions
public void Reset(bool resetUser = true, bool resetSessions = true)
{
try
{
if (CheckMsgsToAck() is MsgsAck msgsAck)
SendAsync(MakeFunction(msgsAck), false).Wait(1000);
}
catch (Exception)
{
}
_cts?.Cancel();
_sendSemaphore = new(0);
_reactorTask = null;
_tcpClient?.Dispose();
_connecting = null;
if (resetSessions)
{
foreach (var altSession in _session.DCSessions.Values)
if (altSession.Client != null && altSession.Client != this)
{
altSession.Client.Dispose();
altSession.Client = null;
}
}
if (resetUser)
_session.User = null;
}
/// Establish connection to Telegram servers. Config callback is queried for: server_address
/// Most methods of this class are async Task, so please use
public async Task ConnectAsync()
{
lock (this)
_connecting ??= DoConnectAsync();
await _connecting;
}
private async Task DoConnectAsync()
{
var endpoint = _dcSession?.EndPoint ?? Compat.IPEndPoint_Parse(Config("server_address"));
Helpers.Log(2, $"Connecting to {endpoint}...");
var tcpClient = new TcpClient(AddressFamily.InterNetworkV6) { Client = { DualMode = true } }; // this allows both IPv4 & IPv6
try
{
try
{
await tcpClient.ConnectAsync(endpoint.Address, endpoint.Port);
}
catch (SocketException ex) // cannot connect to target endpoint, try to find an alternate
{
Helpers.Log(4, $"SocketException {ex.SocketErrorCode} ({ex.ErrorCode}): {ex.Message}");
if (_dcSession?.DataCenter == null) throw;
var triedEndpoints = new HashSet { endpoint };
if (_session.DcOptions != null)
{
var altOptions = _session.DcOptions.Where(dco => dco.id == _dcSession.DataCenter.id && dco.flags != _dcSession.DataCenter.flags
&& (dco.flags & (DcOption.Flags.cdn | DcOption.Flags.tcpo_only | DcOption.Flags.media_only)) == 0)
.OrderBy(dco => dco.flags);
// try alternate addresses for this DC
foreach (var dcOption in altOptions)
{
endpoint = new(IPAddress.Parse(dcOption.ip_address), dcOption.port);
if (!triedEndpoints.Add(endpoint)) continue;
Helpers.Log(2, $"Connecting to {endpoint}...");
try
{
await tcpClient.ConnectAsync(endpoint.Address, endpoint.Port);
_dcSession.DataCenter = dcOption;
break;
}
catch (SocketException) { }
}
}
if (!tcpClient.Connected)
{
endpoint = Compat.IPEndPoint_Parse(Config("server_address")); // re-ask callback for an address
if (!triedEndpoints.Add(endpoint)) throw;
_dcSession.Client = null;
// is it address for a known DCSession?
_dcSession = _session.DCSessions.Values.FirstOrDefault(dcs => dcs.EndPoint.Equals(endpoint));
_dcSession ??= new() { Id = Helpers.RandomLong() };
_dcSession.Client = this;
Helpers.Log(2, $"Connecting to {endpoint}...");
await tcpClient.ConnectAsync(endpoint.Address, endpoint.Port);
}
}
}
catch (Exception)
{
tcpClient.Dispose();
throw;
}
_tcpClient = tcpClient;
_networkStream = tcpClient.GetStream();
await _networkStream.WriteAsync(IntermediateHeader, 0, 4);
_cts = new();
_saltChangeCounter = 0;
_reactorTask = Reactor(_networkStream, _cts);
_sendSemaphore.Release();
try
{
if (_dcSession.AuthKeyID == 0)
await CreateAuthorizationKey(this, _dcSession);
var keepAliveTask = KeepAlive(_cts.Token);
TLConfig = await this.InvokeWithLayer(Layer.Version,
Schema.InitConnection(_apiId,
Config("device_model"),
Config("system_version"),
Config("app_version"),
Config("system_lang_code"),
Config("lang_pack"),
Config("lang_code"),
Schema.Help_GetConfig));
_session.DcOptions = TLConfig.dc_options;
_saltChangeCounter = 0;
if (_dcSession.DataCenter == null)
{
_dcSession.DataCenter = _session.DcOptions.Where(dc => dc.id == TLConfig.this_dc)
.OrderByDescending(dc => dc.ip_address == endpoint.Address.ToString())
.ThenByDescending(dc => dc.port == endpoint.Port).First();
_session.DCSessions[TLConfig.this_dc] = _dcSession;
}
if (_session.MainDC == 0) _session.MainDC = TLConfig.this_dc;
}
finally
{
_session.Save();
}
Helpers.Log(2, $"Connected to {(TLConfig.test_mode ? "Test DC" : "DC")} {TLConfig.this_dc}... {TLConfig.flags & (Config.Flags)~0xE00}");
}
public async Task GetClientForDC(int dcId, bool media_only = true, bool connect = true)
{
if (_dcSession.DataCenter?.id == dcId) return this;
Session.DCSession altSession;
lock (_session)
{
altSession = GetOrCreateDCSession(dcId, _dcSession.DataCenter.flags | (media_only ? DcOption.Flags.media_only : 0));
if (altSession.Client?.Disconnected ?? false) { altSession.Client.Dispose(); altSession.Client = null; }
altSession.Client ??= new Client(this, altSession);
}
Helpers.Log(2, $"Requested connection to DC {dcId}...");
if (connect)
{
await _semaphore.WaitAsync();
try
{
Auth_ExportedAuthorization exported = null;
if (_session.User != null && IsMainDC && altSession.UserId != _session.User.id)
exported = await this.Auth_ExportAuthorization(dcId);
await altSession.Client.ConnectAsync();
if (exported != null)
{
var authorization = await altSession.Client.Auth_ImportAuthorization(exported.id, exported.bytes);
if (authorization is not Auth_Authorization { user: User user })
throw new ApplicationException("Failed to get Authorization: " + authorization.GetType().Name);
_session.User = user;
altSession.UserId = user.id;
}
}
finally
{
_semaphore.Release();
}
}
return altSession.Client;
}
private Session.DCSession GetOrCreateDCSession(int dcId, DcOption.Flags flags)
{
if (_session.DCSessions.TryGetValue(dcId, out var dcSession))
if (dcSession.Client != null || dcSession.DataCenter.flags == flags)
return dcSession; // if we have already a session with this DC and we are connected or it is a perfect match, use it
// try to find the most appropriate DcOption for this DC
var dcOptions = _session.DcOptions.Where(dc => dc.id == dcId).OrderBy(dc => dc.flags ^ flags);
var dcOption = dcOptions.FirstOrDefault() ?? throw new ApplicationException($"Could not find adequate dc_option for DC {dcId}");
dcSession ??= new Session.DCSession { Id = Helpers.RandomLong() }; // create new session only if not already existing
dcSession.DataCenter = dcOption;
return _session.DCSessions[dcId] = dcSession;
}
internal DateTime MsgIdToStamp(long serverMsgId)
=> new((serverMsgId >> 32) * 10000000 - _dcSession.ServerTicksOffset + 621355968000000000L, DateTimeKind.Utc);
internal (long msgId, int seqno) NewMsgId(bool isContent)
{
int seqno;
long msgId = DateTime.UtcNow.Ticks + _dcSession.ServerTicksOffset - 621355968000000000L;
msgId = msgId * 428 + (msgId >> 24) * 25110956; // approximately unixtime*2^32 and divisible by 4
lock (_session)
{
if (msgId <= _dcSession.LastSentMsgId) msgId = _dcSession.LastSentMsgId += 4; else _dcSession.LastSentMsgId = msgId;
seqno = isContent ? _dcSession.Seqno++ * 2 + 1 : _dcSession.Seqno * 2;
_session.Save();
}
return (msgId, seqno);
}
private async Task KeepAlive(CancellationToken ct)
{
int ping_id = _random.Next();
while (!ct.IsCancellationRequested)
{
await Task.Delay(60000, ct);
if (_saltChangeCounter > 0) --_saltChangeCounter;
#if DEBUG
await this.PingDelayDisconnect(ping_id++, 300);
#else
await this.PingDelayDisconnect(ping_id++, 75);
#endif
}
}
private async Task Reactor(NetworkStream stream, CancellationTokenSource cts)
{
const int MinBufferSize = 1024;
var data = new byte[MinBufferSize];
while (!cts.IsCancellationRequested)
{
ITLObject obj = null;
try
{
if (await FullReadAsync(stream, data, 4, cts.Token) != 4)
throw new ApplicationException("Could not read payload length : Connection shut down");
int payloadLen = BinaryPrimitives.ReadInt32LittleEndian(data);
if (payloadLen > data.Length)
data = new byte[payloadLen];
else if (Math.Max(payloadLen, MinBufferSize) < data.Length / 4)
data = new byte[Math.Max(payloadLen, MinBufferSize)];
if (await FullReadAsync(stream, data, payloadLen, cts.Token) != payloadLen)
throw new ApplicationException("Could not read frame data : Connection shut down");
obj = ReadFrame(data, payloadLen);
}
catch (Exception ex) // an exception in RecvAsync is always fatal
{
if (cts.IsCancellationRequested) return;
Helpers.Log(5, $"An exception occured in the reactor: {ex}");
var oldSemaphore = _sendSemaphore;
await oldSemaphore.WaitAsync(cts.Token); // prevent any sending while we reconnect
var reactorError = new ReactorError { Exception = ex };
try
{
_reactorReconnects = (_reactorReconnects + 1) % MaxAutoReconnects;
if (_reactorReconnects != 0)
{
lock (_msgsToAck) _msgsToAck.Clear();
Reset(false, false);
await Task.Delay(5000);
await ConnectAsync(); // start a new reactor after 5 secs
lock (_pendingRequests) // retry all pending requests
{
foreach (var (_, tcs) in _pendingRequests.Values)
tcs.SetResult(reactorError);
_pendingRequests.Clear();
_bareRequest = 0;
}
}
else
throw;
}
catch
{
lock (_pendingRequests) // abort all pending requests
{
foreach (var (_, tcs) in _pendingRequests.Values)
tcs.SetException(ex);
_pendingRequests.Clear();
_bareRequest = 0;
}
OnUpdate(reactorError);
}
finally
{
oldSemaphore.Release();
}
cts.Cancel(); // always stop the reactor
}
if (obj != null)
await HandleMessageAsync(obj);
}
}
internal ITLObject ReadFrame(byte[] data, int dataLen)
{
if (dataLen == 4 && data[3] == 0xFF)
{
int error_code = -BinaryPrimitives.ReadInt32LittleEndian(data);
throw new RpcException(error_code, TransportError(error_code));
}
if (dataLen < 24) // authKeyId+msgId+length+ctorNb | authKeyId+msgKey
throw new ApplicationException($"Packet payload too small: {dataLen}");
long authKeyId = BinaryPrimitives.ReadInt64LittleEndian(data);
if (authKeyId != _dcSession.AuthKeyID)
throw new ApplicationException($"Received a packet encrypted with unexpected key {authKeyId:X}");
if (authKeyId == 0) // Unencrypted message
{
using var reader = new TL.BinaryReader(new MemoryStream(data, 8, dataLen - 8), this);
long msgId = _lastRecvMsgId = reader.ReadInt64();
if ((msgId & 1) == 0) throw new ApplicationException($"Invalid server msgId {msgId}");
int length = reader.ReadInt32();
if (length != dataLen - 20) throw new ApplicationException($"Unexpected unencrypted length {length} != {dataLen - 20}");
var obj = reader.ReadTLObject();
Helpers.Log(1, $"{_dcSession.DcID}>Receiving {obj.GetType().Name,-40} {MsgIdToStamp(msgId):u} clear{((msgId & 2) == 0 ? "" : " NAR")}");
return obj;
}
else
{
#if MTPROTO1
byte[] decrypted_data = EncryptDecryptMessage(data.AsSpan(24, dataLen - 24), false, _dcSession.AuthKey, data, 8, _sha1Recv);
#else
byte[] decrypted_data = EncryptDecryptMessage(data.AsSpan(24, dataLen - 24), false, _dcSession.AuthKey, data, 8, _sha256Recv);
#endif
if (decrypted_data.Length < 36) // header below+ctorNb
throw new ApplicationException($"Decrypted packet too small: {decrypted_data.Length}");
using var reader = new TL.BinaryReader(new MemoryStream(decrypted_data), this);
var serverSalt = reader.ReadInt64(); // int64 salt
var sessionId = reader.ReadInt64(); // int64 session_id
var msgId = _lastRecvMsgId = reader.ReadInt64();// int64 message_id
var seqno = reader.ReadInt32(); // int32 msg_seqno
var length = reader.ReadInt32(); // int32 message_data_length
var msgStamp = MsgIdToStamp(msgId);
if (serverSalt != _dcSession.Salt) // salt change happens every 30 min
{
Helpers.Log(2, $"{_dcSession.DcID}>Server salt has changed: {_dcSession.Salt:X} -> {serverSalt:X}");
_dcSession.Salt = serverSalt;
_saltChangeCounter += 20; // counter is decreased by KeepAlive every minute (we have margin of 10)
if (_saltChangeCounter >= 30)
throw new ApplicationException($"Server salt changed too often! Security issue?");
}
if (sessionId != _dcSession.Id) throw new ApplicationException($"Unexpected session ID {sessionId} != {_dcSession.Id}");
if ((msgId & 1) == 0) throw new ApplicationException($"Invalid server msgId {msgId}");
if ((seqno & 1) != 0) lock (_msgsToAck) _msgsToAck.Add(msgId);
if ((msgStamp - DateTime.UtcNow).Ticks / TimeSpan.TicksPerSecond is > 30 or < -300)
return null;
#if MTPROTO1
if (decrypted_data.Length - 32 - length is < 0 or > 15) throw new ApplicationException($"Unexpected decrypted message_data_length {length} / {decrypted_data.Length - 32}");
if (!data.AsSpan(8, 16).SequenceEqual(_sha1Recv.ComputeHash(decrypted_data, 0, 32 + length).AsSpan(4)))
throw new ApplicationException($"Mismatch between MsgKey & decrypted SHA1");
#else
if (decrypted_data.Length - 32 - length is < 12 or > 1024) throw new ApplicationException($"Unexpected decrypted message_data_length {length} / {decrypted_data.Length - 32}");
_sha256Recv.TransformBlock(_dcSession.AuthKey, 96, 32, null, 0);
_sha256Recv.TransformFinalBlock(decrypted_data, 0, decrypted_data.Length);
if (!data.AsSpan(8, 16).SequenceEqual(_sha256Recv.Hash.AsSpan(8, 16)))
throw new ApplicationException($"Mismatch between MsgKey & decrypted SHA1");
_sha256Recv.Initialize();
#endif
var ctorNb = reader.ReadUInt32();
if (ctorNb == Layer.MsgContainerCtor)
{
Helpers.Log(1, $"{_dcSession.DcID}>Receiving {"MsgContainer",-40} {msgStamp:u} (svc)");
return ReadMsgContainer(reader);
}
else if (ctorNb == Layer.RpcResultCtor)
{
Helpers.Log(1, $"{_dcSession.DcID}>Receiving {"RpcResult",-40} {msgStamp:u}");
return ReadRpcResult(reader);
}
else
{
var obj = reader.ReadTLObject(ctorNb);
Helpers.Log(1, $"{_dcSession.DcID}>Receiving {obj.GetType().Name,-40} {msgStamp:u} {((seqno & 1) != 0 ? "" : "(svc)")} {((msgId & 2) == 0 ? "" : "NAR")}");
return obj;
}
}
static string TransportError(int error_code) => error_code switch
{
404 => "Auth key not found",
429 => "Transport flood",
_ => ((HttpStatusCode)error_code).ToString(),
};
}
private static async Task FullReadAsync(Stream stream, byte[] buffer, int length, CancellationToken ct = default)
{
for (int offset = 0; offset < length;)
{
var read = await stream.ReadAsync(buffer, offset, length - offset, ct);
if (read == 0) return offset;
offset += read;
}
return length;
}
private async Task SendAsync(ITLFunction func, bool isContent)
{
if (_dcSession.AuthKeyID != 0 && isContent && CheckMsgsToAck() is MsgsAck msgsAck)
{
var ackMsg = NewMsgId(false);
var mainMsg = NewMsgId(true);
await SendAsync(MakeContainer((MakeFunction(msgsAck), ackMsg), (func, mainMsg)), false);
return mainMsg.msgId;
}
(long msgId, int seqno) = NewMsgId(isContent && _dcSession.AuthKeyID != 0);
await _sendSemaphore.WaitAsync();
try
{
using var memStream = new MemoryStream(1024);
using var writer = new BinaryWriter(memStream, Encoding.UTF8);
writer.Write(0); // int32 payload_len (to be patched with payload length)
if (_dcSession.AuthKeyID == 0) // send unencrypted message
{
writer.Write(0L); // int64 auth_key_id = 0 (Unencrypted)
writer.Write(msgId); // int64 message_id
writer.Write(0); // int32 message_data_length (to be patched)
var typeName = func(writer); // bytes message_data
Helpers.Log(1, $"{_dcSession.DcID}>Sending {typeName}...");
BinaryPrimitives.WriteInt32LittleEndian(memStream.GetBuffer().AsSpan(20), (int)memStream.Length - 24); // patch message_data_length
}
else
{
using var clearStream = new MemoryStream(1024);
using var clearWriter = new BinaryWriter(clearStream, Encoding.UTF8);
#if MTPROTO1
const int prepend = 0;
#else
const int prepend = 32;
clearWriter.Write(_dcSession.AuthKey, 88, prepend);
#endif
clearWriter.Write(_dcSession.Salt); // int64 salt
clearWriter.Write(_dcSession.Id); // int64 session_id
clearWriter.Write(msgId); // int64 message_id
clearWriter.Write(seqno); // int32 msg_seqno
clearWriter.Write(0); // int32 message_data_length (to be patched)
var typeName = func(clearWriter); // bytes message_data
if ((seqno & 1) != 0)
Helpers.Log(1, $"{_dcSession.DcID}>Sending {typeName,-40} #{(short)msgId.GetHashCode():X4}");
else
Helpers.Log(1, $"{_dcSession.DcID}>Sending {typeName,-40} {MsgIdToStamp(msgId):u} (svc)");
int clearLength = (int)clearStream.Length - prepend; // length before padding (= 32 + message_data_length)
int padding = (0x7FFFFFF0 - clearLength) % 16;
#if !MTPROTO1
padding += _random.Next(1, 64) * 16; // MTProto 2.0 padding must be between 12..1024 with total length divisible by 16
#endif
clearStream.SetLength(prepend + clearLength + padding);
byte[] clearBuffer = clearStream.GetBuffer();
BinaryPrimitives.WriteInt32LittleEndian(clearBuffer.AsSpan(prepend + 28), clearLength - 32); // patch message_data_length
RNG.GetBytes(clearBuffer, prepend + clearLength, padding);
#if MTPROTO1
var msgKeyLarge = _sha1.ComputeHash(clearBuffer, 0, clearLength); // padding excluded from computation!
const int msgKeyOffset = 4; // msg_key = low 128-bits of SHA1(plaintext)
byte[] encrypted_data = EncryptDecryptMessage(clearBuffer.AsSpan(prepend, clearLength + padding), true, _dcSession.AuthKey, msgKeyLarge, msgKeyOffset, _sha1);
#else
var msgKeyLarge = _sha256.ComputeHash(clearBuffer, 0, prepend + clearLength + padding);
const int msgKeyOffset = 8; // msg_key = middle 128-bits of SHA256(authkey_part+plaintext+padding)
byte[] encrypted_data = EncryptDecryptMessage(clearBuffer.AsSpan(prepend, clearLength + padding), true, _dcSession.AuthKey, msgKeyLarge, msgKeyOffset, _sha256);
#endif
writer.Write(_dcSession.AuthKeyID); // int64 auth_key_id
writer.Write(msgKeyLarge, msgKeyOffset, 16); // int128 msg_key
writer.Write(encrypted_data); // bytes encrypted_data
}
var buffer = memStream.GetBuffer();
int frameLength = (int)memStream.Length;
BinaryPrimitives.WriteInt32LittleEndian(buffer, frameLength - 4); // patch payload_len with correct value
//TODO: support Transport obfuscation?
await _networkStream.WriteAsync(memStream.GetBuffer(), 0, frameLength);
_lastSentMsg = func;
}
finally
{
_sendSemaphore.Release();
}
return msgId;
}
private static ITLFunction MakeFunction(ITLObject msg)
=> writer =>
{
writer.WriteTLObject(msg);
return msg.GetType().Name;
};
internal MsgContainer ReadMsgContainer(TL.BinaryReader reader)
{
int count = reader.ReadInt32();
var array = new _Message[count];
for (int i = 0; i < count; i++)
{
var msg = array[i] = new _Message
{
msg_id = reader.ReadInt64(),
seqno = reader.ReadInt32(),
bytes = reader.ReadInt32(),
};
if ((msg.seqno & 1) != 0) lock(_msgsToAck) _msgsToAck.Add(msg.msg_id);
var pos = reader.BaseStream.Position;
try
{
var ctorNb = reader.ReadUInt32();
if (ctorNb == Layer.RpcResultCtor)
{
Helpers.Log(1, $" → {"RpcResult",-38} {MsgIdToStamp(msg.msg_id):u}");
msg.body = ReadRpcResult(reader);
}
else
{
var obj = msg.body = reader.ReadTLObject(ctorNb);
Helpers.Log(1, $" → {obj.GetType().Name,-38} {MsgIdToStamp(msg.msg_id):u} {((msg.seqno & 1) != 0 ? "" : "(svc)")} {((msg.msg_id & 2) == 0 ? "" : "NAR")}");
}
}
catch (Exception ex)
{
Helpers.Log(4, "While deserializing vector<%Message>: " + ex.ToString());
}
reader.BaseStream.Position = pos + array[i].bytes;
}
return new MsgContainer { messages = array };
}
private RpcResult ReadRpcResult(TL.BinaryReader reader)
{
long msgId = reader.ReadInt64();
var (type, tcs) = PullPendingRequest(msgId);
object result;
if (tcs != null)
{
if (!type.IsArray)
result = reader.ReadTLValue(type);
else if (reader.ReadUInt32() == Layer.RpcErrorCtor)
result = reader.ReadTLObject(Layer.RpcErrorCtor);
else
{
reader.BaseStream.Position -= 4;
result = reader.ReadTLValue(type);
}
if (type.IsEnum) result = Enum.ToObject(type, result);
Log(1, "");
tcs.SetResult(result);
}
else
{
result = reader.ReadTLObject();
if (MsgIdToStamp(msgId) >= _session.SessionStart)
Log(4, "for unknown msgId ");
else
Log(1, "for past msgId ");
}
return new RpcResult { req_msg_id = msgId, result = result };
void Log(int level, string msgIdprefix)
{
if (result is RpcError rpcError)
Helpers.Log(4, $" → RpcError {rpcError.error_code,3} {rpcError.error_message,-24} {msgIdprefix}#{(short)msgId.GetHashCode():X4}");
else
Helpers.Log(level, $" → {result?.GetType().Name,-37} {msgIdprefix}#{(short)msgId.GetHashCode():X4}");
}
}
private (Type type, TaskCompletionSource