diff --git a/src/Client.Helpers.cs b/src/Client.Helpers.cs new file mode 100644 index 0000000..b8877d6 --- /dev/null +++ b/src/Client.Helpers.cs @@ -0,0 +1,637 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Reflection; +using System.Security.Cryptography; +using System.Threading; +using System.Threading.Tasks; +using TL; + +// necessary for .NET Standard 2.0 compilation: +#pragma warning disable CA1835 // Prefer the 'Memory'-based overloads for 'ReadAsync' and 'WriteAsync' + +namespace WTelegram +{ + partial class Client + { + #region Collect Access Hash system + /// Enable the collection of id/access_hash pairs (experimental) + public bool CollectAccessHash { get; set; } + readonly Dictionary> _accessHashes = new(); + public IEnumerable> AllAccessHashesFor() where T : IObject + => _accessHashes.GetValueOrDefault(typeof(T)); + /// Retrieve the access_hash associated with this id (for a TL class) if it was collected + /// This requires to be set to first. + ///
See Examples/Program_CollectAccessHash.cs for how to use this
+ /// a TL object class. For example User, Channel or Photo + public long GetAccessHashFor(long id) where T : IObject + { + lock (_accessHashes) + return _accessHashes.GetOrCreate(typeof(T)).TryGetValue(id, out var access_hash) ? access_hash : 0; + } + public void SetAccessHashFor(long id, long access_hash) where T : IObject + { + lock (_accessHashes) + _accessHashes.GetOrCreate(typeof(T))[id] = access_hash; + } + static readonly FieldInfo userFlagsField = typeof(User).GetField("flags"); + static readonly FieldInfo channelFlagsField = typeof(Channel).GetField("flags"); + internal void CollectField(FieldInfo fieldInfo, object obj, object access_hash) + { + if (fieldInfo.Name != "access_hash") return; + if (access_hash is not long accessHash) return; + var type = fieldInfo.ReflectedType; + if ((type == typeof(User) && ((User.Flags)userFlagsField.GetValue(obj)).HasFlag(User.Flags.min)) || + (type == typeof(Channel) && ((Channel.Flags)channelFlagsField.GetValue(obj)).HasFlag(Channel.Flags.min))) + return; // access_hash from Min constructors are mostly useless. see https://core.telegram.org/api/min + if (type.GetField("id") is not FieldInfo idField) return; + if (idField.GetValue(obj) is not long id) + if (idField.GetValue(obj) is not int idInt) return; + else id = idInt; + lock (_accessHashes) + _accessHashes.GetOrCreate(type)[id] = accessHash; + } + #endregion + + #region Client TL Helpers + /// Helper function to upload a file to Telegram + /// Path to the file to upload + /// (optional) Callback for tracking the progression of the transfer + /// an or than can be used in various requests + public Task UploadFileAsync(string pathname, ProgressCallback progress = null) + => UploadFileAsync(File.OpenRead(pathname), Path.GetFileName(pathname), progress); + + /// Helper function to upload a file to Telegram + /// Content of the file to upload. This method close/dispose the stream + /// Name of the file + /// (optional) Callback for tracking the progression of the transfer + /// an or than can be used in various requests + public async Task UploadFileAsync(Stream stream, string filename, ProgressCallback progress = null) + { + using var md5 = MD5.Create(); + using (stream) + { + long transmitted = 0, length = stream.Length; + var isBig = length >= 10 * 1024 * 1024; + int file_total_parts = (int)((length - 1) / FilePartSize) + 1; + long file_id = Helpers.RandomLong(); + int file_part = 0, read; + var tasks = new Dictionary(); + bool abort = false; + for (long bytesLeft = length; !abort && bytesLeft != 0; file_part++) + { + var bytes = new byte[Math.Min(FilePartSize, bytesLeft)]; + read = await stream.FullReadAsync(bytes, bytes.Length, default); + await _parallelTransfers.WaitAsync(); + bytesLeft -= read; + var task = SavePart(file_part, bytes); + lock (tasks) tasks[file_part] = task; + if (!isBig) + md5.TransformBlock(bytes, 0, read, null, 0); + if (read < FilePartSize && bytesLeft != 0) throw new ApplicationException($"Failed to fully read stream ({read},{bytesLeft})"); + + async Task SavePart(int file_part, byte[] bytes) + { + try + { + if (isBig) + await this.Upload_SaveBigFilePart(file_id, file_part, file_total_parts, bytes); + else + await this.Upload_SaveFilePart(file_id, file_part, bytes); + lock (tasks) { transmitted += bytes.Length; tasks.Remove(file_part); } + progress?.Invoke(transmitted, length); + } + catch (Exception) + { + abort = true; + throw; + } + finally + { + _parallelTransfers.Release(); + } + } + } + Task[] remainingTasks; + lock (tasks) remainingTasks = tasks.Values.ToArray(); + await Task.WhenAll(remainingTasks); // wait completion and eventually propagate any task exception + if (!isBig) md5.TransformFinalBlock(Array.Empty(), 0, 0); + return isBig ? new InputFileBig { id = file_id, parts = file_total_parts, name = filename } + : new InputFile { id = file_id, parts = file_total_parts, name = filename, md5_checksum = md5.Hash }; + } + } + + /// Search messages with filter and text See + /// See for a list of possible filter types + /// User or chat, histories with which are searched, or constructor for global search + /// Text search request + /// Only return messages starting from the specified message ID + /// Number of results to return + public Task Messages_Search(InputPeer peer, string text = null, int offset_id = 0, int limit = int.MaxValue) where T : MessagesFilter, new() + => this.Messages_Search(peer, text, new T(), offset_id: offset_id, limit: limit); + + /// Helper function to send a media message more easily + /// Destination of message (chat group, channel, user chat, etc..) + /// Caption for the media (in plain text) or + /// Media file already uploaded to TG (see UploadFileAsync) + /// for automatic detection, "photo" for an inline photo, or a MIME type to send as a document + /// Your message is a reply to an existing message with this ID, in the same chat + /// Text formatting entities for the caption. You can use MarkdownToEntities to create these + /// UTC timestamp when the message should be sent + /// The transmitted message confirmed by Telegram + public Task SendMediaAsync(InputPeer peer, string caption, InputFileBase mediaFile, string mimeType = null, int reply_to_msg_id = 0, MessageEntity[] entities = null, DateTime schedule_date = default) + { + mimeType ??= Path.GetExtension(mediaFile.Name)?.ToLowerInvariant() switch + { + ".jpg" or ".jpeg" or ".png" or ".bmp" => "photo", + ".gif" => "image/gif", + ".webp" => "image/webp", + ".mp4" => "video/mp4", + ".mp3" => "audio/mpeg", + ".wav" => "audio/x-wav", + _ => "", // send as generic document with undefined MIME type + }; + if (mimeType == "photo") + return SendMessageAsync(peer, caption, new InputMediaUploadedPhoto { file = mediaFile }, reply_to_msg_id, entities, schedule_date); + return SendMessageAsync(peer, caption, new InputMediaUploadedDocument(mediaFile, mimeType), reply_to_msg_id, entities, schedule_date); + } + + /// Helper function to send a text or media message easily + /// Destination of message (chat group, channel, user chat, etc..) + /// The plain text of the message (or media caption) + /// An instance of InputMedia-derived class, or if there is no associated media + /// Your message is a reply to an existing message with this ID, in the same chat + /// Text formatting entities. You can use MarkdownToEntities to create these + /// UTC timestamp when the message should be sent + /// Should website/media preview be shown or not, for URLs in your message + /// The transmitted message as confirmed by Telegram + public async Task SendMessageAsync(InputPeer peer, string text, InputMedia media = null, int reply_to_msg_id = 0, MessageEntity[] entities = null, DateTime schedule_date = default, bool disable_preview = false) + { + UpdatesBase updates; + long random_id = Helpers.RandomLong(); + if (media == null) + updates = await this.Messages_SendMessage(peer, text, random_id, no_webpage: disable_preview, entities: entities, + reply_to_msg_id: reply_to_msg_id == 0 ? null : reply_to_msg_id, schedule_date: schedule_date == default ? null : schedule_date); + else + updates = await this.Messages_SendMedia(peer, media, text, random_id, entities: entities, + reply_to_msg_id: reply_to_msg_id == 0 ? null : reply_to_msg_id, schedule_date: schedule_date == default ? null : schedule_date); + OnUpdate(updates); + int msgId = -1; + foreach (var update in updates.UpdateList) + { + switch (update) + { + case UpdateMessageID updMsgId when updMsgId.random_id == random_id: msgId = updMsgId.id; break; + case UpdateNewMessage { message: Message message } when message.id == msgId: return message; + case UpdateNewScheduledMessage { message: Message schedMsg } when schedMsg.id == msgId: return schedMsg; + } + } + if (updates is UpdateShortSentMessage sent) + { + return new Message + { + flags = (Message.Flags)sent.flags | (reply_to_msg_id == 0 ? 0 : Message.Flags.has_reply_to) | (peer is InputPeerSelf ? 0 : Message.Flags.has_from_id), + id = sent.id, date = sent.date, message = text, entities = sent.entities, media = sent.media, ttl_period = sent.ttl_period, + reply_to = reply_to_msg_id == 0 ? null : new MessageReplyHeader { reply_to_msg_id = reply_to_msg_id }, + from_id = peer is InputPeerSelf ? null : new PeerUser { user_id = _session.UserId }, + peer_id = InputToPeer(peer) + }; + } + return null; + } + + /// Helper function to send an album (media group) of photos or documents more easily + /// Destination of message (chat group, channel, user chat, etc..) + /// An array of InputMedia-derived class + /// Caption for the media (in plain text) or + /// Your message is a reply to an existing message with this ID, in the same chat + /// Text formatting entities for the caption. You can use MarkdownToEntities to create these + /// UTC timestamp when the message should be sent + /// The last of the media group messages, confirmed by Telegram + /// + /// * The caption/entities are set on the last media
+ /// * and are supported by downloading the file from the web via HttpClient and sending it to Telegram. + /// WTelegramClient proxy settings don't apply to HttpClient
+ /// * You may run into errors if you mix, in the same album, photos and file documents having no thumbnails/video attributes + ///
+ public async Task SendAlbumAsync(InputPeer peer, InputMedia[] medias, string caption = null, int reply_to_msg_id = 0, MessageEntity[] entities = null, DateTime schedule_date = default) + { + System.Net.Http.HttpClient httpClient = null; + var multiMedia = new InputSingleMedia[medias.Length]; + for (int i = 0; i < medias.Length; i++) + { + var ism = multiMedia[i] = new InputSingleMedia { random_id = Helpers.RandomLong(), media = medias[i] }; + retry: + switch (ism.media) + { + case InputMediaUploadedPhoto imup: + var mmp = (MessageMediaPhoto)await this.Messages_UploadMedia(peer, imup); + ism.media = mmp.photo; + break; + case InputMediaUploadedDocument imud: + var mmd = (MessageMediaDocument)await this.Messages_UploadMedia(peer, imud); + ism.media = mmd.document; + break; + case InputMediaDocumentExternal imde: + string mimeType = null; + var inputFile = await UploadFromUrl(imde.url); + ism.media = new InputMediaUploadedDocument(inputFile, mimeType); + goto retry; + case InputMediaPhotoExternal impe: + inputFile = await UploadFromUrl(impe.url); + ism.media = new InputMediaUploadedPhoto { file = inputFile }; + goto retry; + + async Task UploadFromUrl(string url) + { + var filename = Path.GetFileName(new Uri(url).LocalPath); + httpClient ??= new(); + var response = await httpClient.GetAsync(url); + using var stream = await response.Content.ReadAsStreamAsync(); + mimeType = response.Content.Headers.ContentType?.MediaType; + if (response.Content.Headers.ContentLength is long length) + return await UploadFileAsync(new Helpers.IndirectStream(stream) { ContentLength = length }, filename); + else + { + using var ms = new MemoryStream(); + await stream.CopyToAsync(ms); + ms.Position = 0; + return await UploadFileAsync(ms, filename); + } + } + } + } + var lastMedia = multiMedia[^1]; + lastMedia.message = caption; + lastMedia.entities = entities; + if (entities != null) lastMedia.flags = InputSingleMedia.Flags.has_entities; + + var updates = await this.Messages_SendMultiMedia(peer, multiMedia, reply_to_msg_id: reply_to_msg_id, schedule_date: schedule_date); + OnUpdate(updates); + int msgId = -1; + foreach (var update in updates.UpdateList) + { + switch (update) + { + case UpdateMessageID updMsgId when updMsgId.random_id == lastMedia.random_id: msgId = updMsgId.id; break; + case UpdateNewMessage { message: Message message } when message.id == msgId: return message; + case UpdateNewScheduledMessage { message: Message schedMsg } when schedMsg.id == msgId: return schedMsg; + } + } + return null; + } + + private Peer InputToPeer(InputPeer peer) => peer switch + { + InputPeerSelf => new PeerUser { user_id = _session.UserId }, + InputPeerUser ipu => new PeerUser { user_id = ipu.user_id }, + InputPeerChat ipc => new PeerChat { chat_id = ipc.chat_id }, + InputPeerChannel ipch => new PeerChannel { channel_id = ipch.channel_id }, + InputPeerUserFromMessage ipufm => new PeerUser { user_id = ipufm.user_id }, + InputPeerChannelFromMessage ipcfm => new PeerChannel { channel_id = ipcfm.channel_id }, + _ => null, + }; + + /// Download a photo from Telegram into the outputStream + /// The photo to download + /// Stream to write the file content to. This method does not close/dispose the stream + /// A specific size/version of the photo, or to download the largest version of the photo + /// (optional) Callback for tracking the progression of the transfer + /// The file type of the photo + public async Task DownloadFileAsync(Photo photo, Stream outputStream, PhotoSizeBase photoSize = null, ProgressCallback progress = null) + { + if (photoSize is PhotoStrippedSize psp) + return InflateStrippedThumb(outputStream, psp.bytes) ? Storage_FileType.jpeg : 0; + photoSize ??= photo.LargestPhotoSize; + var fileLocation = photo.ToFileLocation(photoSize); + return await DownloadFileAsync(fileLocation, outputStream, photo.dc_id, photoSize.FileSize, progress); + } + + /// Download a document from Telegram into the outputStream + /// The document to download + /// Stream to write the file content to. This method does not close/dispose the stream + /// A specific size/version of the document thumbnail to download, or to download the document itself + /// (optional) Callback for tracking the progression of the transfer + /// MIME type of the document/thumbnail + public async Task DownloadFileAsync(Document document, Stream outputStream, PhotoSizeBase thumbSize = null, ProgressCallback progress = null) + { + if (thumbSize is PhotoStrippedSize psp) + return InflateStrippedThumb(outputStream, psp.bytes) ? "image/jpeg" : null; + var fileLocation = document.ToFileLocation(thumbSize); + var fileType = await DownloadFileAsync(fileLocation, outputStream, document.dc_id, thumbSize?.FileSize ?? document.size, progress); + return thumbSize == null ? document.mime_type : "image/" + fileType; + } + + /// Download a file from Telegram into the outputStream + /// Telegram file identifier, typically obtained with a .ToFileLocation() call + /// Stream to write file content to. This method does not close/dispose the stream + /// (optional) DC on which the file is stored + /// (optional) Expected file size + /// (optional) Callback for tracking the progression of the transfer + /// The file type + public async Task DownloadFileAsync(InputFileLocationBase fileLocation, Stream outputStream, int dc_id = 0, int fileSize = 0, ProgressCallback progress = null) + { + Storage_FileType fileType = Storage_FileType.unknown; + var client = dc_id == 0 ? this : await GetClientForDC(dc_id, true); + using var writeSem = new SemaphoreSlim(1); + long streamStartPos = outputStream.Position; + int fileOffset = 0, maxOffsetSeen = 0; + long transmitted = 0; + var tasks = new Dictionary(); + progress?.Invoke(0, fileSize); + bool abort = false; + while (!abort) + { + await _parallelTransfers.WaitAsync(); + var task = LoadPart(fileOffset); + lock (tasks) tasks[fileOffset] = task; + if (dc_id == 0) { await task; dc_id = client._dcSession.DcID; } + fileOffset += FilePartSize; + if (fileSize != 0 && fileOffset >= fileSize) + { + if (await task != ((fileSize - 1) % FilePartSize) + 1) + throw new ApplicationException("Downloaded file size does not match expected file size"); + break; + } + + async Task LoadPart(int offset) + { + Upload_FileBase fileBase; + try + { + fileBase = await client.Upload_GetFile(fileLocation, offset, FilePartSize); + } + catch (RpcException ex) when (ex.Code == 303 && ex.Message.StartsWith("FILE_MIGRATE_")) + { + var dcId = int.Parse(ex.Message[13..]); + client = await GetClientForDC(dcId, true); + fileBase = await client.Upload_GetFile(fileLocation, offset, FilePartSize); + } + catch (RpcException ex) when (ex.Code == 400 && ex.Message == "OFFSET_INVALID") + { + abort = true; + return 0; + } + catch (Exception) + { + abort = true; + throw; + } + finally + { + _parallelTransfers.Release(); + } + if (fileBase is not Upload_File fileData) + throw new ApplicationException("Upload_GetFile returned unsupported " + fileBase?.GetType().Name); + if (fileData.bytes.Length != FilePartSize) abort = true; + if (fileData.bytes.Length != 0) + { + fileType = fileData.type; + await writeSem.WaitAsync(); + try + { + if (streamStartPos + offset != outputStream.Position) // if we're about to write out of order + { + await outputStream.FlushAsync(); // async flush, otherwise Seek would do a sync flush + outputStream.Seek(streamStartPos + offset, SeekOrigin.Begin); + } + await outputStream.WriteAsync(fileData.bytes, 0, fileData.bytes.Length); + maxOffsetSeen = Math.Max(maxOffsetSeen, offset + fileData.bytes.Length); + transmitted += fileData.bytes.Length; + } + catch (Exception) + { + abort = true; + throw; + } + finally + { + writeSem.Release(); + progress?.Invoke(transmitted, fileSize); + } + } + lock (tasks) tasks.Remove(offset); + return fileData.bytes.Length; + } + } + Task[] remainingTasks; + lock (tasks) remainingTasks = tasks.Values.ToArray(); + await Task.WhenAll(remainingTasks); // wait completion and eventually propagate any task exception + await outputStream.FlushAsync(); + outputStream.Seek(streamStartPos + maxOffsetSeen, SeekOrigin.Begin); + return fileType; + } + + /// Download the profile photo for a given peer into the outputStream + /// User, Chat or Channel + /// Stream to write the file content to. This method does not close/dispose the stream + /// Whether to download the high-quality version of the picture + /// Whether to extract the embedded very low-res thumbnail (synchronous, no actual download needed) + /// The file type of the photo, or 0 if no photo available + public async Task DownloadProfilePhotoAsync(IPeerInfo peer, Stream outputStream, bool big = false, bool miniThumb = false) + { + int dc_id; + long photo_id; + byte[] stripped_thumb; + switch (peer) + { + case User user: + if (user.photo == null) return 0; + dc_id = user.photo.dc_id; + photo_id = user.photo.photo_id; + stripped_thumb = user.photo.stripped_thumb; + break; + case ChatBase { Photo: var photo }: + if (photo == null) return 0; + dc_id = photo.dc_id; + photo_id = photo.photo_id; + stripped_thumb = photo.stripped_thumb; + break; + default: + return 0; + } + if (miniThumb && !big) + return InflateStrippedThumb(outputStream, stripped_thumb) ? Storage_FileType.jpeg : 0; + var fileLocation = new InputPeerPhotoFileLocation { peer = peer.ToInputPeer(), photo_id = photo_id }; + if (big) fileLocation.flags |= InputPeerPhotoFileLocation.Flags.big; + return await DownloadFileAsync(fileLocation, outputStream, dc_id); + } + + private static bool InflateStrippedThumb(Stream outputStream, byte[] stripped_thumb) + { + if (stripped_thumb == null || stripped_thumb.Length <= 3 || stripped_thumb[0] != 1) + return false; + var header = Helpers.StrippedThumbJPG; + outputStream.Write(header, 0, 164); + outputStream.WriteByte(stripped_thumb[1]); + outputStream.WriteByte(0); + outputStream.WriteByte(stripped_thumb[2]); + outputStream.Write(header, 167, header.Length - 167); + outputStream.Write(stripped_thumb, 3, stripped_thumb.Length - 3); + outputStream.WriteByte(0xff); + outputStream.WriteByte(0xd9); + return true; + } + + /// Returns the current user dialog list. Possible codes: 400 (details) + /// Peer folder ID, for more info click here + /// See + public async Task Messages_GetAllDialogs(int? folder_id = null) + { + var dialogs = await this.Messages_GetDialogs(folder_id: folder_id); + switch (dialogs) + { + case Messages_DialogsSlice mds: + var dialogList = new List(); + var messageList = new List(); + while (dialogs.Dialogs.Length != 0) + { + dialogList.AddRange(dialogs.Dialogs); + messageList.AddRange(dialogs.Messages); + var lastDialog = dialogs.Dialogs[^1]; + var lastMsg = dialogs.Messages.LastOrDefault(m => m.Peer.ID == lastDialog.Peer.ID && m.ID == lastDialog.TopMessage); + var offsetPeer = dialogs.UserOrChat(lastDialog).ToInputPeer(); + dialogs = await this.Messages_GetDialogs(lastMsg?.Date ?? default, lastDialog.TopMessage, offsetPeer, folder_id: folder_id); + if (dialogs is not Messages_Dialogs md) break; + foreach (var (key, value) in md.chats) mds.chats[key] = value; + foreach (var (key, value) in md.users) mds.users[key] = value; + } + mds.dialogs = dialogList.ToArray(); + mds.messages = messageList.ToArray(); + return mds; + case Messages_Dialogs md: return md; + default: throw new ApplicationException("Messages_GetDialogs returned unexpected " + dialogs?.GetType().Name); + } + } + + /// Helper method that tries to fetch all participants from a Channel (beyond Telegram server-side limitations) + /// The channel to query + /// Also fetch the kicked/banned members? + /// first letters used to search for in participants names
(default values crafted with ♥ to find most latin and cyrillic names) + /// second (and further) letters used to search for in participants names + /// Can be used to abort the work of this method + /// Field count indicates the total count of members. Field participants contains those that were successfully fetched + /// ⚠ This method can take a few minutes to complete on big broadcast channels. It likely won't be able to obtain the full total count of members + public async Task Channels_GetAllParticipants(InputChannelBase channel, bool includeKickBan = false, string alphabet1 = "АБCДЕЄЖФГHИІJКЛМНОПQРСТУВWХЦЧШЩЫЮЯЗ", string alphabet2 = "АCЕHИJЛМНОРСТУВWЫ", CancellationToken cancellationToken = default) + { + alphabet2 ??= alphabet1; + var result = new Channels_ChannelParticipants { chats = new(), users = new() }; + var user_ids = new HashSet(); + var participants = new List(); + + var mcf = await this.Channels_GetFullChannel(channel); + result.count = mcf.full_chat.ParticipantsCount; + if (result.count > 2000 && ((Channel)mcf.chats[channel.ChannelId]).IsChannel) + Helpers.Log(2, "Fetching all participants on a big channel can take several minutes..."); + await GetWithFilter(new ChannelParticipantsAdmins()); + await GetWithFilter(new ChannelParticipantsBots()); + await GetWithFilter(new ChannelParticipantsSearch { q = "" }, (f, c) => new ChannelParticipantsSearch { q = f.q + c }, alphabet1); + if (includeKickBan) + { + await GetWithFilter(new ChannelParticipantsKicked { q = "" }, (f, c) => new ChannelParticipantsKicked { q = f.q + c }, alphabet1); + await GetWithFilter(new ChannelParticipantsBanned { q = "" }, (f, c) => new ChannelParticipantsBanned { q = f.q + c }, alphabet1); + } + result.participants = participants.ToArray(); + return result; + + async Task GetWithFilter(T filter, Func recurse = null, string alphabet = null) where T : ChannelParticipantsFilter + { + Channels_ChannelParticipants ccp; + int maxCount = 0; + for (int offset = 0; ;) + { + cancellationToken.ThrowIfCancellationRequested(); + ccp = await this.Channels_GetParticipants(channel, filter, offset, 1024, 0); + if (ccp.count > maxCount) maxCount = ccp.count; + foreach (var kvp in ccp.chats) result.chats[kvp.Key] = kvp.Value; + foreach (var kvp in ccp.users) result.users[kvp.Key] = kvp.Value; + lock (participants) + foreach (var participant in ccp.participants) + if (user_ids.Add(participant.UserID)) + participants.Add(participant); + offset += ccp.participants.Length; + if (offset >= ccp.count || ccp.participants.Length == 0) break; + } + Helpers.Log(0, $"GetParticipants({(filter as ChannelParticipantsSearch)?.q}) returned {ccp.count}/{maxCount}.\tAccumulated count: {participants.Count}"); + if (recurse != null && (ccp.count < maxCount - 100 || ccp.count == 200 || ccp.count == 1000)) + foreach (var c in alphabet) + await GetWithFilter(recurse(filter, c), recurse, c == 'А' ? alphabet : alphabet2); + } + } + + public Task AddChatUser(InputPeer peer, InputUserBase user, int fwd_limit = int.MaxValue) => peer switch + { + InputPeerChat chat => this.Messages_AddChatUser(chat.chat_id, user, fwd_limit), + InputPeerChannel channel => this.Channels_InviteToChannel(channel, new[] { user }), + _ => throw new ArgumentException("This method works on Chat & Channel only"), + }; + + public Task DeleteChatUser(InputPeer peer, InputUser user, bool revoke_history = true) => peer switch + { + InputPeerChat chat => this.Messages_DeleteChatUser(chat.chat_id, user, revoke_history), + InputPeerChannel channel => this.Channels_EditBanned(channel, user, new ChatBannedRights { flags = ChatBannedRights.Flags.view_messages }), + _ => throw new ArgumentException("This method works on Chat & Channel only"), + }; + + public Task LeaveChat(InputPeer peer, bool revoke_history = true) => peer switch + { + InputPeerChat chat => this.Messages_DeleteChatUser(chat.chat_id, InputUser.Self, revoke_history), + InputPeerChannel channel => this.Channels_LeaveChannel(channel), + _ => throw new ArgumentException("This method works on Chat & Channel only"), + }; + + public async Task EditChatAdmin(InputPeer peer, InputUserBase user, bool is_admin) + { + switch (peer) + { + case InputPeerChat chat: + await this.Messages_EditChatAdmin(chat.chat_id, user, is_admin); + return new Updates { date = DateTime.UtcNow, users = new(), updates = Array.Empty(), + chats = (await this.Messages_GetChats(new[] { chat.chat_id })).chats }; + case InputPeerChannel channel: + return await this.Channels_EditAdmin(channel, user, + new ChatAdminRights { flags = is_admin ? (ChatAdminRights.Flags)0x8BF : 0 }, null); + default: + throw new ArgumentException("This method works on Chat & Channel only"); + } + } + + public Task EditChatPhoto(InputPeer peer, InputChatPhotoBase photo) => peer switch + { + InputPeerChat chat => this.Messages_EditChatPhoto(chat.chat_id, photo), + InputPeerChannel channel => this.Channels_EditPhoto(channel, photo), + _ => throw new ArgumentException("This method works on Chat & Channel only"), + }; + + public Task EditChatTitle(InputPeer peer, string title) => peer switch + { + InputPeerChat chat => this.Messages_EditChatTitle(chat.chat_id, title), + InputPeerChannel channel => this.Channels_EditTitle(channel, title), + _ => throw new ArgumentException("This method works on Chat & Channel only"), + }; + + public Task GetFullChat(InputPeer peer) => peer switch + { + InputPeerChat chat => this.Messages_GetFullChat(chat.chat_id), + InputPeerChannel channel => this.Channels_GetFullChannel(channel), + _ => throw new ArgumentException("This method works on Chat & Channel only"), + }; + + public async Task DeleteChat(InputPeer peer) + { + switch (peer) + { + case InputPeerChat chat: + await this.Messages_DeleteChat(chat.chat_id); + return new Updates { date = DateTime.UtcNow, users = new(), updates = Array.Empty(), + chats = (await this.Messages_GetChats(new[] { chat.chat_id })).chats }; + case InputPeerChannel channel: + return await this.Channels_DeleteChannel(channel); + default: + throw new ArgumentException("This method works on Chat & Channel only"); + } + } + #endregion + } +} diff --git a/src/Client.cs b/src/Client.cs index f963a78..0d690ca 100644 --- a/src/Client.cs +++ b/src/Client.cs @@ -22,7 +22,7 @@ using static WTelegram.Encryption; namespace WTelegram { - public class Client : IDisposable + public partial class Client : IDisposable { /// This event will be called when an unsollicited update/message is sent by Telegram servers /// See Examples/Program_ListenUpdate.cs for how to use this @@ -203,14 +203,462 @@ namespace WTelegram _session.UserId = 0; } - /// Establish connection to Telegram servers - /// Config callback is queried for: server_address - /// Most methods of this class are async (Task), so please use - public async Task ConnectAsync() + private Session.DCSession GetOrCreateDCSession(int dcId, DcOption.Flags flags) { - lock (this) - _connecting ??= DoConnectAsync(); - await _connecting; + if (_session.DCSessions.TryGetValue(dcId, out var dcSession)) + if (dcSession.Client != null || dcSession.DataCenter.flags == flags) + return dcSession; // if we have already a session with this DC and we are connected or it is a perfect match, use it + // try to find the most appropriate DcOption for this DC + if ((dcSession?.AuthKeyID ?? 0) == 0) // we will need to negociate an AuthKey => can't use media_only DC + flags &= ~DcOption.Flags.media_only; + var dcOptions = _session.DcOptions.Where(dc => dc.id == dcId).OrderBy(dc => dc.flags ^ flags); + var dcOption = dcOptions.FirstOrDefault() ?? throw new ApplicationException($"Could not find adequate dc_option for DC {dcId}"); + dcSession ??= new Session.DCSession { Id = Helpers.RandomLong() }; // create new session only if not already existing + dcSession.DataCenter = dcOption; + return _session.DCSessions[dcId] = dcSession; + } + + /// Obtain/create a Client for a secondary session on a specific Data Center + /// ID of the Data Center + /// Session will be used only for transferring media + /// Connect immediately + /// Client connected to the selected DC + public async Task GetClientForDC(int dcId, bool media_only = true, bool connect = true) + { + if (_dcSession.DataCenter?.id == dcId) return this; + Session.DCSession altSession; + lock (_session) + { + altSession = GetOrCreateDCSession(dcId, _dcSession.DataCenter.flags | (media_only ? DcOption.Flags.media_only : 0)); + if (altSession.Client?.Disconnected ?? false) { altSession.Client.Dispose(); altSession.Client = null; } + altSession.Client ??= new Client(this, altSession); + } + Helpers.Log(2, $"Requested connection to DC {dcId}..."); + if (connect) + { + await _semaphore.WaitAsync(); + try + { + Auth_ExportedAuthorization exported = null; + if (_session.UserId != 0 && IsMainDC && altSession.UserId != _session.UserId) + exported = await this.Auth_ExportAuthorization(dcId); + await altSession.Client.ConnectAsync(); + if (exported != null) + { + var authorization = await altSession.Client.Auth_ImportAuthorization(exported.id, exported.bytes); + if (authorization is not Auth_Authorization { user: User user }) + throw new ApplicationException("Failed to get Authorization: " + authorization.GetType().Name); + altSession.UserId = user.id; + } + } + finally + { + _semaphore.Release(); + } + } + return altSession.Client; + } + + private async Task Reactor(Stream stream, CancellationTokenSource cts) + { + const int MinBufferSize = 1024; + var data = new byte[MinBufferSize]; + while (!cts.IsCancellationRequested) + { + IObject obj = null; + try + { + if (await stream.FullReadAsync(data, 4, cts.Token) != 4) + throw new ApplicationException(ConnectionShutDown); +#if OBFUSCATION + _recvCtr.EncryptDecrypt(data, 4); +#endif + int payloadLen = BinaryPrimitives.ReadInt32LittleEndian(data); + if (payloadLen <= 0) + throw new ApplicationException("Could not read frame data : Invalid payload length"); + else if (payloadLen > data.Length) + data = new byte[payloadLen]; + else if (Math.Max(payloadLen, MinBufferSize) < data.Length / 4) + data = new byte[Math.Max(payloadLen, MinBufferSize)]; + if (await stream.FullReadAsync(data, payloadLen, cts.Token) != payloadLen) + throw new ApplicationException("Could not read frame data : Connection shut down"); +#if OBFUSCATION + _recvCtr.EncryptDecrypt(data, payloadLen); +#endif + obj = ReadFrame(data, payloadLen); + } + catch (Exception ex) // an exception in RecvAsync is always fatal + { + if (cts.IsCancellationRequested) return; + Helpers.Log(5, $"{_dcSession.DcID}>An exception occured in the reactor: {ex}"); + var oldSemaphore = _sendSemaphore; + await oldSemaphore.WaitAsync(cts.Token); // prevent any sending while we reconnect + var reactorError = new ReactorError { Exception = ex }; + try + { + lock (_msgsToAck) _msgsToAck.Clear(); + Reset(false, false); + _reactorReconnects = (_reactorReconnects + 1) % MaxAutoReconnects; + if (!IsMainDC && _pendingRpcs.Count <= 1 && ex is ApplicationException { Message: ConnectionShutDown } or IOException { InnerException: SocketException }) + if (_pendingRpcs.Values.FirstOrDefault() is not Rpc rpc || rpc.type == typeof(Pong)) + _reactorReconnects = 0; + if (_reactorReconnects != 0) + { + await Task.Delay(5000); + if (_networkStream == null) return; // Dispose has been called in-between + await ConnectAsync(); // start a new reactor after 5 secs + lock (_pendingRpcs) // retry all pending requests + { + foreach (var rpc in _pendingRpcs.Values) + rpc.tcs.SetResult(reactorError); + _pendingRpcs.Clear(); + _bareRpc = null; + } + // TODO: implement an Updates gaps handling system? https://core.telegram.org/api/updates + if (IsMainDC) + { + var udpatesState = await this.Updates_GetState(); // this call reenables incoming Updates + OnUpdate(udpatesState); + } + } + else + throw; + } + catch + { + lock (_pendingRpcs) // abort all pending requests + { + foreach (var rpc in _pendingRpcs.Values) + rpc.tcs.SetException(ex); + _pendingRpcs.Clear(); + _bareRpc = null; + } + OnUpdate(reactorError); + } + finally + { + oldSemaphore.Release(); + } + } + if (obj != null) + await HandleMessageAsync(obj); + } + } + + internal DateTime MsgIdToStamp(long serverMsgId) + => new((serverMsgId >> 32) * 10000000 - _dcSession.ServerTicksOffset + 621355968000000000L, DateTimeKind.Utc); + + internal IObject ReadFrame(byte[] data, int dataLen) + { + if (dataLen == 4 && data[3] == 0xFF) + { + int error_code = -BinaryPrimitives.ReadInt32LittleEndian(data); + throw new RpcException(error_code, TransportError(error_code)); + } + if (dataLen < 24) // authKeyId+msgId+length+ctorNb | authKeyId+msgKey + throw new ApplicationException($"Packet payload too small: {dataLen}"); + + long authKeyId = BinaryPrimitives.ReadInt64LittleEndian(data); + if (authKeyId != _dcSession.AuthKeyID) + throw new ApplicationException($"Received a packet encrypted with unexpected key {authKeyId:X}"); + if (authKeyId == 0) // Unencrypted message + { + using var reader = new TL.BinaryReader(new MemoryStream(data, 8, dataLen - 8), this); + long msgId = _lastRecvMsgId = reader.ReadInt64(); + if ((msgId & 1) == 0) throw new ApplicationException($"Invalid server msgId {msgId}"); + int length = reader.ReadInt32(); + dataLen -= 20; + if (length > dataLen || length < dataLen - (_paddedMode ? 15 : 0)) + throw new ApplicationException($"Unexpected unencrypted length {length} != {dataLen}"); + + var obj = reader.ReadTLObject(); + Helpers.Log(1, $"{_dcSession.DcID}>Receiving {obj.GetType().Name,-40} {MsgIdToStamp(msgId):u} clear{((msgId & 2) == 0 ? "" : " NAR")}"); + return obj; + } + else + { + byte[] decrypted_data = EncryptDecryptMessage(data.AsSpan(24, (dataLen - 24) & ~0xF), false, _dcSession.AuthKey, data, 8, _sha256Recv); + if (decrypted_data.Length < 36) // header below+ctorNb + throw new ApplicationException($"Decrypted packet too small: {decrypted_data.Length}"); + using var reader = new TL.BinaryReader(new MemoryStream(decrypted_data), this); + var serverSalt = reader.ReadInt64(); // int64 salt + var sessionId = reader.ReadInt64(); // int64 session_id + var msgId = reader.ReadInt64(); // int64 message_id + var seqno = reader.ReadInt32(); // int32 msg_seqno + var length = reader.ReadInt32(); // int32 message_data_length + if (_lastRecvMsgId == 0) // resync ServerTicksOffset on first message + _dcSession.ServerTicksOffset = (msgId >> 32) * 10000000 - DateTime.UtcNow.Ticks + 621355968000000000L; + var msgStamp = MsgIdToStamp(_lastRecvMsgId = msgId); + + if (serverSalt != _dcSession.Salt) // salt change happens every 30 min + { + Helpers.Log(2, $"{_dcSession.DcID}>Server salt has changed: {_dcSession.Salt:X} -> {serverSalt:X}"); + _dcSession.Salt = serverSalt; + _saltChangeCounter += 20; // counter is decreased by KeepAlive every minute (we have margin of 10) + if (_saltChangeCounter >= 30) + throw new ApplicationException($"Server salt changed too often! Security issue?"); + } + if (sessionId != _dcSession.Id) throw new ApplicationException($"Unexpected session ID {sessionId} != {_dcSession.Id}"); + if ((msgId & 1) == 0) throw new ApplicationException($"Invalid server msgId {msgId}"); + if ((seqno & 1) != 0) lock (_msgsToAck) _msgsToAck.Add(msgId); + if (decrypted_data.Length - 32 - length is < 12 or > 1024) throw new ApplicationException($"Unexpected decrypted message_data_length {length} / {decrypted_data.Length - 32}"); + _sha256Recv.TransformBlock(_dcSession.AuthKey, 96, 32, null, 0); + _sha256Recv.TransformFinalBlock(decrypted_data, 0, decrypted_data.Length); + if (!data.AsSpan(8, 16).SequenceEqual(_sha256Recv.Hash.AsSpan(8, 16))) + throw new ApplicationException($"Mismatch between MsgKey & decrypted SHA256"); + _sha256Recv.Initialize(); + + var ctorNb = reader.ReadUInt32(); + if (ctorNb != Layer.BadMsgCtor && (msgStamp - DateTime.UtcNow).Ticks / TimeSpan.TicksPerSecond is > 30 or < -300) + { // msg_id values that belong over 30 seconds in the future or over 300 seconds in the past are to be ignored. + Helpers.Log(1, $"{_dcSession.DcID}>Ignoring 0x{ctorNb:X8} because of wrong timestamp {msgStamp:u} (svc)"); + return null; + } + if (ctorNb == Layer.MsgContainerCtor) + { + Helpers.Log(1, $"{_dcSession.DcID}>Receiving {"MsgContainer",-40} {msgStamp:u} (svc)"); + return ReadMsgContainer(reader); + } + else if (ctorNb == Layer.RpcResultCtor) + { + Helpers.Log(1, $"{_dcSession.DcID}>Receiving {"RpcResult",-40} {msgStamp:u}"); + return ReadRpcResult(reader); + } + else + { + var obj = reader.ReadTLObject(ctorNb); + Helpers.Log(1, $"{_dcSession.DcID}>Receiving {obj.GetType().Name,-40} {msgStamp:u} {((seqno & 1) != 0 ? "" : "(svc)")} {((msgId & 2) == 0 ? "" : "NAR")}"); + return obj; + } + } + + static string TransportError(int error_code) => error_code switch + { + 404 => "Auth key not found", + 429 => "Transport flood", + _ => Enum.GetName(typeof(HttpStatusCode), error_code) ?? "Transport error" + }; + } + + internal MsgContainer ReadMsgContainer(TL.BinaryReader reader) + { + int count = reader.ReadInt32(); + var array = new _Message[count]; + for (int i = 0; i < count; i++) + { + var msg = array[i] = new _Message(reader.ReadInt64(), reader.ReadInt32(), null) { bytes = reader.ReadInt32() }; + if ((msg.seqno & 1) != 0) lock (_msgsToAck) _msgsToAck.Add(msg.msg_id); + var pos = reader.BaseStream.Position; + try + { + var ctorNb = reader.ReadUInt32(); + if (ctorNb == Layer.RpcResultCtor) + { + Helpers.Log(1, $" → {"RpcResult",-38} {MsgIdToStamp(msg.msg_id):u}"); + msg.body = ReadRpcResult(reader); + } + else + { + var obj = msg.body = reader.ReadTLObject(ctorNb); + Helpers.Log(1, $" → {obj.GetType().Name,-38} {MsgIdToStamp(msg.msg_id):u} {((msg.seqno & 1) != 0 ? "" : "(svc)")} {((msg.msg_id & 2) == 0 ? "" : "NAR")}"); + } + } + catch (Exception ex) + { + Helpers.Log(4, "While deserializing vector<%Message>: " + ex.ToString()); + } + reader.BaseStream.Position = pos + array[i].bytes; + } + return new MsgContainer { messages = array }; + } + + private RpcResult ReadRpcResult(TL.BinaryReader reader) + { + long msgId = reader.ReadInt64(); + var rpc = PullPendingRequest(msgId); + object result; + if (rpc != null) + { + try + { + if (!rpc.type.IsArray) + result = reader.ReadTLValue(rpc.type); + else + { + var peek = reader.ReadUInt32(); + if (peek == Layer.RpcErrorCtor) + result = reader.ReadTLObject(Layer.RpcErrorCtor); + else if (peek == Layer.GZipedCtor) + using (var gzipReader = new TL.BinaryReader(new GZipStream(new MemoryStream(reader.ReadTLBytes()), CompressionMode.Decompress), reader.Client)) + result = gzipReader.ReadTLValue(rpc.type); + else + { + reader.BaseStream.Position -= 4; + result = reader.ReadTLValue(rpc.type); + } + } + if (rpc.type.IsEnum) result = Enum.ToObject(rpc.type, result); + if (result is RpcError rpcError) + Helpers.Log(4, $" → RpcError {rpcError.error_code,3} {rpcError.error_message,-24} #{(short)msgId.GetHashCode():X4}"); + else + Helpers.Log(1, $" → {result?.GetType().Name,-37} #{(short)msgId.GetHashCode():X4}"); + rpc.tcs.SetResult(result); + } + catch (Exception ex) + { + rpc.tcs.SetException(ex); + throw; + } + } + else + { + var ctorNb = reader.ReadUInt32(); + if (ctorNb == Layer.VectorCtor) + { + reader.BaseStream.Position -= 4; + result = reader.ReadTLVector(typeof(IObject[])); + } + else if (ctorNb == (uint)Bool.False) result = false; + else if (ctorNb == (uint)Bool.True) result = true; + else result = reader.ReadTLObject(ctorNb); + var typeName = result?.GetType().Name; + if (MsgIdToStamp(msgId) >= _session.SessionStart) + Helpers.Log(4, $" → {typeName,-37} for unknown msgId #{(short)msgId.GetHashCode():X4}"); + else + Helpers.Log(1, $" → {typeName,-37} for past msgId #{(short)msgId.GetHashCode():X4}"); + } + return new RpcResult { req_msg_id = msgId, result = result }; + } + + class Rpc + { + public Type type; + public TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); + public long msgId; + public Task Task => tcs.Task; + } + + private Rpc PullPendingRequest(long msgId) + { + Rpc request; + lock (_pendingRpcs) + if (_pendingRpcs.TryGetValue(msgId, out request)) + _pendingRpcs.Remove(msgId); + return request; + } + + private async Task HandleMessageAsync(IObject obj) + { + switch (obj) + { + case MsgContainer container: + foreach (var msg in container.messages) + if (msg.body != null) + await HandleMessageAsync(msg.body); + break; + case MsgCopy msgCopy: + if (msgCopy?.orig_message?.body != null) + await HandleMessageAsync(msgCopy.orig_message.body); + break; + case TL.Methods.Ping ping: + _ = SendAsync(new Pong { msg_id = _lastRecvMsgId, ping_id = ping.ping_id }, false); + break; + case Pong pong: + SetResult(pong.msg_id, pong); + break; + case FutureSalts futureSalts: + SetResult(futureSalts.req_msg_id, futureSalts); + break; + case RpcResult rpcResult: + break; // SetResult was already done in ReadRpcResult + case MsgsAck msgsAck: + break; // we don't do anything with these, for now + case BadMsgNotification badMsgNotification: + await _sendSemaphore.WaitAsync(); + bool retryLast = badMsgNotification.bad_msg_id == _dcSession.LastSentMsgId; + var lastSentMsg = _lastSentMsg; + _sendSemaphore.Release(); + var logLevel = badMsgNotification.error_code == 48 ? 2 : 4; + Helpers.Log(logLevel, $"BadMsgNotification {badMsgNotification.error_code} for msg #{(short)badMsgNotification.bad_msg_id.GetHashCode():X4}"); + switch (badMsgNotification.error_code) + { + case 16: // msg_id too low (most likely, client time is wrong; synchronize it using msg_id notifications and re-send the original message) + case 17: // msg_id too high (similar to the previous case, the client time has to be synchronized, and the message re-sent with the correct msg_id) + _dcSession.LastSentMsgId = 0; + var localTime = DateTime.UtcNow; + _dcSession.ServerTicksOffset = (_lastRecvMsgId >> 32) * 10000000 - localTime.Ticks + 621355968000000000L; + Helpers.Log(1, $"Time offset: {_dcSession.ServerTicksOffset} | Server: {MsgIdToStamp(_lastRecvMsgId).AddTicks(_dcSession.ServerTicksOffset).TimeOfDay} UTC | Local: {localTime.TimeOfDay} UTC"); + break; + case 32: // msg_seqno too low (the server has already received a message with a lower msg_id but with either a higher or an equal and odd seqno) + case 33: // msg_seqno too high (similarly, there is a message with a higher msg_id but with either a lower or an equal and odd seqno) + if (_dcSession.Seqno <= 1) + retryLast = false; + else + { + Reset(false, false); + _dcSession.Renew(); + await ConnectAsync(); + } + break; + case 48: // incorrect server salt (in this case, the bad_server_salt response is received with the correct salt, and the message is to be re-sent with it) + _dcSession.Salt = ((BadServerSalt)badMsgNotification).new_server_salt; + break; + default: + retryLast = false; + break; + } + if (retryLast) + { + Rpc prevRequest; + lock (_pendingRpcs) + _pendingRpcs.TryGetValue(badMsgNotification.bad_msg_id, out prevRequest); + await SendAsync(lastSentMsg, true, prevRequest); + lock (_pendingRpcs) + _pendingRpcs.Remove(badMsgNotification.bad_msg_id); + } + else if (PullPendingRequest(badMsgNotification.bad_msg_id) is Rpc rpc) + { + if (_bareRpc.msgId == badMsgNotification.bad_msg_id) _bareRpc = null; + rpc.tcs.SetException(new ApplicationException($"BadMsgNotification {badMsgNotification.error_code}")); + } + else + OnUpdate(obj); + break; + default: + if (_bareRpc != null) + { + var rpc = PullPendingRequest(_bareRpc.msgId); + if (rpc?.type.IsAssignableFrom(obj.GetType()) == true) + { + _bareRpc = null; + rpc.tcs.SetResult(obj); + break; + } + } + OnUpdate(obj); + break; + } + + void SetResult(long msgId, object result) + { + var rpc = PullPendingRequest(msgId); + if (rpc != null) + rpc.tcs.SetResult(result); + else + OnUpdate(obj); + } + } + + private void OnUpdate(IObject obj) + { + try + { + Update?.Invoke(obj); + } + catch (Exception ex) + { + Helpers.Log(4, $"Update callback on {obj.GetType().Name} raised {ex}"); + } } static async Task DefaultTcpHandler(string host, int port) @@ -228,6 +676,16 @@ namespace WTelegram return tcpClient; } + /// Establish connection to Telegram servers + /// Config callback is queried for: server_address + /// Most methods of this class are async (Task), so please use + public async Task ConnectAsync() + { + lock (this) + _connecting ??= DoConnectAsync(); + await _connecting; + } + private async Task DoConnectAsync() { _cts = new(); @@ -370,79 +828,6 @@ namespace WTelegram Helpers.Log(2, $"Connected to {(TLConfig.test_mode ? "Test DC" : "DC")} {TLConfig.this_dc}... {TLConfig.flags & (Config.Flags)~0xE00U}"); } - /// Obtain/create a Client for a secondary session on a specific Data Center - /// ID of the Data Center - /// Session will be used only for transferring media - /// Connect immediately - /// - public async Task GetClientForDC(int dcId, bool media_only = true, bool connect = true) - { - if (_dcSession.DataCenter?.id == dcId) return this; - Session.DCSession altSession; - lock (_session) - { - altSession = GetOrCreateDCSession(dcId, _dcSession.DataCenter.flags | (media_only ? DcOption.Flags.media_only : 0)); - if (altSession.Client?.Disconnected ?? false) { altSession.Client.Dispose(); altSession.Client = null; } - altSession.Client ??= new Client(this, altSession); - } - Helpers.Log(2, $"Requested connection to DC {dcId}..."); - if (connect) - { - await _semaphore.WaitAsync(); - try - { - Auth_ExportedAuthorization exported = null; - if (_session.UserId != 0 && IsMainDC && altSession.UserId != _session.UserId) - exported = await this.Auth_ExportAuthorization(dcId); - await altSession.Client.ConnectAsync(); - if (exported != null) - { - var authorization = await altSession.Client.Auth_ImportAuthorization(exported.id, exported.bytes); - if (authorization is not Auth_Authorization { user: User user }) - throw new ApplicationException("Failed to get Authorization: " + authorization.GetType().Name); - altSession.UserId = user.id; - } - } - finally - { - _semaphore.Release(); - } - } - return altSession.Client; - } - - private Session.DCSession GetOrCreateDCSession(int dcId, DcOption.Flags flags) - { - if (_session.DCSessions.TryGetValue(dcId, out var dcSession)) - if (dcSession.Client != null || dcSession.DataCenter.flags == flags) - return dcSession; // if we have already a session with this DC and we are connected or it is a perfect match, use it - // try to find the most appropriate DcOption for this DC - if ((dcSession?.AuthKeyID ?? 0) == 0) // we will need to negociate an AuthKey => can't use media_only DC - flags &= ~DcOption.Flags.media_only; - var dcOptions = _session.DcOptions.Where(dc => dc.id == dcId).OrderBy(dc => dc.flags ^ flags); - var dcOption = dcOptions.FirstOrDefault() ?? throw new ApplicationException($"Could not find adequate dc_option for DC {dcId}"); - dcSession ??= new Session.DCSession { Id = Helpers.RandomLong() }; // create new session only if not already existing - dcSession.DataCenter = dcOption; - return _session.DCSessions[dcId] = dcSession; - } - - internal DateTime MsgIdToStamp(long serverMsgId) - => new((serverMsgId >> 32) * 10000000 - _dcSession.ServerTicksOffset + 621355968000000000L, DateTimeKind.Utc); - - internal (long msgId, int seqno) NewMsgId(bool isContent) - { - int seqno; - long msgId = DateTime.UtcNow.Ticks + _dcSession.ServerTicksOffset - 621355968000000000L; - msgId = msgId * 428 + (msgId >> 24) * 25110956; // approximately unixtime*2^32 and divisible by 4 - lock (_session) - { - if (msgId <= _dcSession.LastSentMsgId) msgId = _dcSession.LastSentMsgId += 4; else _dcSession.LastSentMsgId = msgId; - seqno = isContent ? _dcSession.Seqno++ * 2 + 1 : _dcSession.Seqno * 2; - _session.Save(); - } - return (msgId, seqno); - } - private async Task KeepAlive(CancellationToken ct) { int ping_id = _random.Next(); @@ -461,566 +846,6 @@ namespace WTelegram } } - private async Task Reactor(Stream stream, CancellationTokenSource cts) - { - const int MinBufferSize = 1024; - var data = new byte[MinBufferSize]; - while (!cts.IsCancellationRequested) - { - IObject obj = null; - try - { - if (await stream.FullReadAsync(data, 4, cts.Token) != 4) - throw new ApplicationException(ConnectionShutDown); -#if OBFUSCATION - _recvCtr.EncryptDecrypt(data, 4); -#endif - int payloadLen = BinaryPrimitives.ReadInt32LittleEndian(data); - if (payloadLen <= 0) - throw new ApplicationException("Could not read frame data : Invalid payload length"); - else if (payloadLen > data.Length) - data = new byte[payloadLen]; - else if (Math.Max(payloadLen, MinBufferSize) < data.Length / 4) - data = new byte[Math.Max(payloadLen, MinBufferSize)]; - if (await stream.FullReadAsync(data, payloadLen, cts.Token) != payloadLen) - throw new ApplicationException("Could not read frame data : Connection shut down"); -#if OBFUSCATION - _recvCtr.EncryptDecrypt(data, payloadLen); -#endif - obj = ReadFrame(data, payloadLen); - } - catch (Exception ex) // an exception in RecvAsync is always fatal - { - if (cts.IsCancellationRequested) return; - Helpers.Log(5, $"{_dcSession.DcID}>An exception occured in the reactor: {ex}"); - var oldSemaphore = _sendSemaphore; - await oldSemaphore.WaitAsync(cts.Token); // prevent any sending while we reconnect - var reactorError = new ReactorError { Exception = ex }; - try - { - lock (_msgsToAck) _msgsToAck.Clear(); - Reset(false, false); - _reactorReconnects = (_reactorReconnects + 1) % MaxAutoReconnects; - if (!IsMainDC && _pendingRpcs.Count <= 1 && ex is ApplicationException { Message: ConnectionShutDown } or IOException { InnerException: SocketException }) - if (_pendingRpcs.Values.FirstOrDefault() is not Rpc rpc || rpc.type == typeof(Pong)) - _reactorReconnects = 0; - if (_reactorReconnects != 0) - { - await Task.Delay(5000); - if (_networkStream == null) return; // Dispose has been called in-between - await ConnectAsync(); // start a new reactor after 5 secs - lock (_pendingRpcs) // retry all pending requests - { - foreach (var rpc in _pendingRpcs.Values) - rpc.tcs.SetResult(reactorError); - _pendingRpcs.Clear(); - _bareRpc = null; - } - // TODO: implement an Updates gaps handling system? https://core.telegram.org/api/updates - if (IsMainDC) - { - var udpatesState = await this.Updates_GetState(); // this call reenables incoming Updates - OnUpdate(udpatesState); - } - } - else - throw; - } - catch - { - lock (_pendingRpcs) // abort all pending requests - { - foreach (var rpc in _pendingRpcs.Values) - rpc.tcs.SetException(ex); - _pendingRpcs.Clear(); - _bareRpc = null; - } - OnUpdate(reactorError); - } - finally - { - oldSemaphore.Release(); - } - } - if (obj != null) - await HandleMessageAsync(obj); - } - } - - internal IObject ReadFrame(byte[] data, int dataLen) - { - if (dataLen == 4 && data[3] == 0xFF) - { - int error_code = -BinaryPrimitives.ReadInt32LittleEndian(data); - throw new RpcException(error_code, TransportError(error_code)); - } - if (dataLen < 24) // authKeyId+msgId+length+ctorNb | authKeyId+msgKey - throw new ApplicationException($"Packet payload too small: {dataLen}"); - - long authKeyId = BinaryPrimitives.ReadInt64LittleEndian(data); - if (authKeyId != _dcSession.AuthKeyID) - throw new ApplicationException($"Received a packet encrypted with unexpected key {authKeyId:X}"); - if (authKeyId == 0) // Unencrypted message - { - using var reader = new TL.BinaryReader(new MemoryStream(data, 8, dataLen - 8), this); - long msgId = _lastRecvMsgId = reader.ReadInt64(); - if ((msgId & 1) == 0) throw new ApplicationException($"Invalid server msgId {msgId}"); - int length = reader.ReadInt32(); - dataLen -= 20; - if (length > dataLen || length < dataLen - (_paddedMode ? 15 : 0)) - throw new ApplicationException($"Unexpected unencrypted length {length} != {dataLen}"); - - var obj = reader.ReadTLObject(); - Helpers.Log(1, $"{_dcSession.DcID}>Receiving {obj.GetType().Name,-40} {MsgIdToStamp(msgId):u} clear{((msgId & 2) == 0 ? "" : " NAR")}"); - return obj; - } - else - { - byte[] decrypted_data = EncryptDecryptMessage(data.AsSpan(24, (dataLen - 24) & ~0xF), false, _dcSession.AuthKey, data, 8, _sha256Recv); - if (decrypted_data.Length < 36) // header below+ctorNb - throw new ApplicationException($"Decrypted packet too small: {decrypted_data.Length}"); - using var reader = new TL.BinaryReader(new MemoryStream(decrypted_data), this); - var serverSalt = reader.ReadInt64(); // int64 salt - var sessionId = reader.ReadInt64(); // int64 session_id - var msgId = reader.ReadInt64(); // int64 message_id - var seqno = reader.ReadInt32(); // int32 msg_seqno - var length = reader.ReadInt32(); // int32 message_data_length - if (_lastRecvMsgId == 0) // resync ServerTicksOffset on first message - _dcSession.ServerTicksOffset = (msgId >> 32) * 10000000 - DateTime.UtcNow.Ticks + 621355968000000000L; - var msgStamp = MsgIdToStamp(_lastRecvMsgId = msgId); - - if (serverSalt != _dcSession.Salt) // salt change happens every 30 min - { - Helpers.Log(2, $"{_dcSession.DcID}>Server salt has changed: {_dcSession.Salt:X} -> {serverSalt:X}"); - _dcSession.Salt = serverSalt; - _saltChangeCounter += 20; // counter is decreased by KeepAlive every minute (we have margin of 10) - if (_saltChangeCounter >= 30) - throw new ApplicationException($"Server salt changed too often! Security issue?"); - } - if (sessionId != _dcSession.Id) throw new ApplicationException($"Unexpected session ID {sessionId} != {_dcSession.Id}"); - if ((msgId & 1) == 0) throw new ApplicationException($"Invalid server msgId {msgId}"); - if ((seqno & 1) != 0) lock (_msgsToAck) _msgsToAck.Add(msgId); - if (decrypted_data.Length - 32 - length is < 12 or > 1024) throw new ApplicationException($"Unexpected decrypted message_data_length {length} / {decrypted_data.Length - 32}"); - _sha256Recv.TransformBlock(_dcSession.AuthKey, 96, 32, null, 0); - _sha256Recv.TransformFinalBlock(decrypted_data, 0, decrypted_data.Length); - if (!data.AsSpan(8, 16).SequenceEqual(_sha256Recv.Hash.AsSpan(8, 16))) - throw new ApplicationException($"Mismatch between MsgKey & decrypted SHA256"); - _sha256Recv.Initialize(); - - var ctorNb = reader.ReadUInt32(); - if (ctorNb != Layer.BadMsgCtor && (msgStamp - DateTime.UtcNow).Ticks / TimeSpan.TicksPerSecond is > 30 or < -300) - { // msg_id values that belong over 30 seconds in the future or over 300 seconds in the past are to be ignored. - Helpers.Log(1, $"{_dcSession.DcID}>Ignoring 0x{ctorNb:X8} because of wrong timestamp {msgStamp:u} (svc)"); - return null; - } - if (ctorNb == Layer.MsgContainerCtor) - { - Helpers.Log(1, $"{_dcSession.DcID}>Receiving {"MsgContainer",-40} {msgStamp:u} (svc)"); - return ReadMsgContainer(reader); - } - else if (ctorNb == Layer.RpcResultCtor) - { - Helpers.Log(1, $"{_dcSession.DcID}>Receiving {"RpcResult",-40} {msgStamp:u}"); - return ReadRpcResult(reader); - } - else - { - var obj = reader.ReadTLObject(ctorNb); - Helpers.Log(1, $"{_dcSession.DcID}>Receiving {obj.GetType().Name,-40} {msgStamp:u} {((seqno & 1) != 0 ? "" : "(svc)")} {((msgId & 2) == 0 ? "" : "NAR")}"); - return obj; - } - } - - static string TransportError(int error_code) => error_code switch - { - 404 => "Auth key not found", - 429 => "Transport flood", - _ => Enum.GetName(typeof(HttpStatusCode), error_code) ?? "Transport error" - }; - } - - private async Task SendAsync(IObject msg, bool isContent, Rpc rpc = null) - { - isContent &= _dcSession.AuthKeyID != 0; - (long msgId, int seqno) = NewMsgId(isContent); - if (rpc != null) - lock (_pendingRpcs) - _pendingRpcs[rpc.msgId = msgId] = rpc; - if (isContent && CheckMsgsToAck() is MsgsAck msgsAck) - { - var (ackId, ackSeqno) = NewMsgId(false); - var container = new MsgContainer { messages = new _Message[] { new(msgId, seqno, msg), new(ackId, ackSeqno, msgsAck) } }; - await SendAsync(container, false); - return; - } - await _sendSemaphore.WaitAsync(); - try - { - using var memStream = new MemoryStream(1024); - using var writer = new BinaryWriter(memStream, Encoding.UTF8); - writer.Write(0); // int32 payload_len (to be patched with payload length) - - if (_dcSession.AuthKeyID == 0) // send unencrypted message - { - writer.Write(0L); // int64 auth_key_id = 0 (Unencrypted) - writer.Write(msgId); // int64 message_id - writer.Write(0); // int32 message_data_length (to be patched) - Helpers.Log(1, $"{_dcSession.DcID}>Sending {msg.GetType().Name.TrimEnd('_')}..."); - writer.WriteTLObject(msg); // bytes message_data - BinaryPrimitives.WriteInt32LittleEndian(memStream.GetBuffer().AsSpan(20), (int)memStream.Length - 24); // patch message_data_length - } - else - { - using var clearStream = new MemoryStream(1024); - using var clearWriter = new BinaryWriter(clearStream, Encoding.UTF8); - clearWriter.Write(_dcSession.AuthKey, 88, 32); - clearWriter.Write(_dcSession.Salt); // int64 salt - clearWriter.Write(_dcSession.Id); // int64 session_id - clearWriter.Write(msgId); // int64 message_id - clearWriter.Write(seqno); // int32 msg_seqno - clearWriter.Write(0); // int32 message_data_length (to be patched) - if ((seqno & 1) != 0) - Helpers.Log(1, $"{_dcSession.DcID}>Sending {msg.GetType().Name.TrimEnd('_'),-40} #{(short)msgId.GetHashCode():X4}"); - else - Helpers.Log(1, $"{_dcSession.DcID}>Sending {msg.GetType().Name.TrimEnd('_'),-40} {MsgIdToStamp(msgId):u} (svc)"); - clearWriter.WriteTLObject(msg); // bytes message_data - int clearLength = (int)clearStream.Length - 32; // length before padding (= 32 + message_data_length) - int padding = (0x7FFFFFF0 - clearLength) % 16; - padding += _random.Next(1, 64) * 16; // MTProto 2.0 padding must be between 12..1024 with total length divisible by 16 - clearStream.SetLength(32 + clearLength + padding); - byte[] clearBuffer = clearStream.GetBuffer(); - BinaryPrimitives.WriteInt32LittleEndian(clearBuffer.AsSpan(60), clearLength - 32); // patch message_data_length - RNG.GetBytes(clearBuffer, 32 + clearLength, padding); - var msgKeyLarge = _sha256.ComputeHash(clearBuffer, 0, 32 + clearLength + padding); - const int msgKeyOffset = 8; // msg_key = middle 128-bits of SHA256(authkey_part+plaintext+padding) - byte[] encrypted_data = EncryptDecryptMessage(clearBuffer.AsSpan(32, clearLength + padding), true, _dcSession.AuthKey, msgKeyLarge, msgKeyOffset, _sha256); - - writer.Write(_dcSession.AuthKeyID); // int64 auth_key_id - writer.Write(msgKeyLarge, msgKeyOffset, 16); // int128 msg_key - writer.Write(encrypted_data); // bytes encrypted_data - } - if (_paddedMode) // Padded intermediate mode => append random padding - { - var padding = new byte[_random.Next(16)]; - RNG.GetBytes(padding); - writer.Write(padding); - } - var buffer = memStream.GetBuffer(); - int frameLength = (int)memStream.Length; - BinaryPrimitives.WriteInt32LittleEndian(buffer, frameLength - 4); // patch payload_len with correct value -#if OBFUSCATION - _sendCtr.EncryptDecrypt(buffer, frameLength); -#endif - await _networkStream.WriteAsync(buffer, 0, frameLength); - _lastSentMsg = msg; - } - finally - { - _sendSemaphore.Release(); - } - } - - internal MsgContainer ReadMsgContainer(TL.BinaryReader reader) - { - int count = reader.ReadInt32(); - var array = new _Message[count]; - for (int i = 0; i < count; i++) - { - var msg = array[i] = new _Message(reader.ReadInt64(), reader.ReadInt32(), null) { bytes = reader.ReadInt32() }; - if ((msg.seqno & 1) != 0) lock (_msgsToAck) _msgsToAck.Add(msg.msg_id); - var pos = reader.BaseStream.Position; - try - { - var ctorNb = reader.ReadUInt32(); - if (ctorNb == Layer.RpcResultCtor) - { - Helpers.Log(1, $" → {"RpcResult",-38} {MsgIdToStamp(msg.msg_id):u}"); - msg.body = ReadRpcResult(reader); - } - else - { - var obj = msg.body = reader.ReadTLObject(ctorNb); - Helpers.Log(1, $" → {obj.GetType().Name,-38} {MsgIdToStamp(msg.msg_id):u} {((msg.seqno & 1) != 0 ? "" : "(svc)")} {((msg.msg_id & 2) == 0 ? "" : "NAR")}"); - } - } - catch (Exception ex) - { - Helpers.Log(4, "While deserializing vector<%Message>: " + ex.ToString()); - } - reader.BaseStream.Position = pos + array[i].bytes; - } - return new MsgContainer { messages = array }; - } - - private RpcResult ReadRpcResult(TL.BinaryReader reader) - { - long msgId = reader.ReadInt64(); - var rpc = PullPendingRequest(msgId); - object result; - if (rpc != null) - { - try - { - if (!rpc.type.IsArray) - result = reader.ReadTLValue(rpc.type); - else - { - var peek = reader.ReadUInt32(); - if (peek == Layer.RpcErrorCtor) - result = reader.ReadTLObject(Layer.RpcErrorCtor); - else if (peek == Layer.GZipedCtor) - using (var gzipReader = new TL.BinaryReader(new GZipStream(new MemoryStream(reader.ReadTLBytes()), CompressionMode.Decompress), reader.Client)) - result = gzipReader.ReadTLValue(rpc.type); - else - { - reader.BaseStream.Position -= 4; - result = reader.ReadTLValue(rpc.type); - } - } - if (rpc.type.IsEnum) result = Enum.ToObject(rpc.type, result); - if (result is RpcError rpcError) - Helpers.Log(4, $" → RpcError {rpcError.error_code,3} {rpcError.error_message,-24} #{(short)msgId.GetHashCode():X4}"); - else - Helpers.Log(1, $" → {result?.GetType().Name,-37} #{(short)msgId.GetHashCode():X4}"); - rpc.tcs.SetResult(result); - } - catch (Exception ex) - { - rpc.tcs.SetException(ex); - throw; - } - } - else - { - var ctorNb = reader.ReadUInt32(); - if (ctorNb == Layer.VectorCtor) - { - reader.BaseStream.Position -= 4; - result = reader.ReadTLVector(typeof(IObject[])); - } - else if (ctorNb == (uint)Bool.False) result = false; - else if (ctorNb == (uint)Bool.True) result = true; - else result = reader.ReadTLObject(ctorNb); - var typeName = result?.GetType().Name; - if (MsgIdToStamp(msgId) >= _session.SessionStart) - Helpers.Log(4, $" → {typeName,-37} for unknown msgId #{(short)msgId.GetHashCode():X4}"); - else - Helpers.Log(1, $" → {typeName,-37} for past msgId #{(short)msgId.GetHashCode():X4}"); - } - return new RpcResult { req_msg_id = msgId, result = result }; - } - - class Rpc - { - public Type type; - public TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); - public long msgId; - public Task Task => tcs.Task; - } - - private Rpc PullPendingRequest(long msgId) - { - Rpc request; - lock (_pendingRpcs) - if (_pendingRpcs.TryGetValue(msgId, out request)) - _pendingRpcs.Remove(msgId); - return request; - } - - internal async Task InvokeBare(IMethod request) - { - if (_bareRpc != null) throw new ApplicationException("A bare request is already undergoing"); - _bareRpc = new Rpc { type = typeof(X) }; - await SendAsync(request, false, _bareRpc); - return (X)await _bareRpc.Task; - } - - /// Call the given TL method (You shouldn't need to use this method directly) - /// Expected type of the returned object - /// TL method structure - /// Wait for the reply and return the resulting object, or throws an RpcException if an error was replied - public async Task Invoke(IMethod query) - { - retry: - var rpc = new Rpc { type = typeof(X) }; - await SendAsync(query, true, rpc); - bool got503 = false; - var result = await rpc.Task; - switch (result) - { - case X resultX: return resultX; - case RpcError rpcError: - int number; - if (rpcError.error_code == 303 && ((number = rpcError.error_message.IndexOf("_MIGRATE_")) > 0)) - { - if (!rpcError.error_message.StartsWith("FILE_")) - { - number = int.Parse(rpcError.error_message[(number + 9)..]); - // this is a hack to migrate _dcSession in-place (staying in same Client): - Session.DCSession dcSession; - lock (_session) - dcSession = GetOrCreateDCSession(number, _dcSession.DataCenter.flags); - Reset(false, false); - _session.MainDC = number; - _dcSession.Client = null; - _dcSession = dcSession; - _dcSession.Client = this; - await ConnectAsync(); - goto retry; - } - } - else if (rpcError.error_code == 420 && ((number = rpcError.error_message.IndexOf("_WAIT_")) > 0)) - { - number = int.Parse(rpcError.error_message[(number + 6)..]); - if (number <= FloodRetryThreshold) - { - await Task.Delay(number * 1000); - goto retry; - } - } - else if (rpcError.error_code == -503 && !got503) - { - got503 = true; - goto retry; - } - else if (rpcError.error_code == 500 && rpcError.error_message == "AUTH_RESTART") - { - _session.UserId = 0; // force a full login authorization flow, next time - lock (_session) _session.Save(); - } - throw new RpcException(rpcError.error_code, rpcError.error_message); - case ReactorError: - goto retry; - default: - throw new ApplicationException($"{query.GetType().Name} call got a result of type {result.GetType().Name} instead of {typeof(X).Name}"); - } - } - - private MsgsAck CheckMsgsToAck() - { - lock (_msgsToAck) - { - if (_msgsToAck.Count == 0) return null; - var msgsAck = new MsgsAck { msg_ids = _msgsToAck.ToArray() }; - _msgsToAck.Clear(); - return msgsAck; - } - } - - private async Task HandleMessageAsync(IObject obj) - { - switch (obj) - { - case MsgContainer container: - foreach (var msg in container.messages) - if (msg.body != null) - await HandleMessageAsync(msg.body); - break; - case MsgCopy msgCopy: - if (msgCopy?.orig_message?.body != null) - await HandleMessageAsync(msgCopy.orig_message.body); - break; - case TL.Methods.Ping ping: - _ = SendAsync(new Pong { msg_id = _lastRecvMsgId, ping_id = ping.ping_id }, false); - break; - case Pong pong: - SetResult(pong.msg_id, pong); - break; - case FutureSalts futureSalts: - SetResult(futureSalts.req_msg_id, futureSalts); - break; - case RpcResult rpcResult: - break; // SetResult was already done in ReadRpcResult - case MsgsAck msgsAck: - break; // we don't do anything with these, for now - case BadMsgNotification badMsgNotification: - await _sendSemaphore.WaitAsync(); - bool retryLast = badMsgNotification.bad_msg_id == _dcSession.LastSentMsgId; - var lastSentMsg = _lastSentMsg; - _sendSemaphore.Release(); - var logLevel = badMsgNotification.error_code == 48 ? 2 : 4; - Helpers.Log(logLevel, $"BadMsgNotification {badMsgNotification.error_code} for msg #{(short)badMsgNotification.bad_msg_id.GetHashCode():X4}"); - switch (badMsgNotification.error_code) - { - case 16: // msg_id too low (most likely, client time is wrong; synchronize it using msg_id notifications and re-send the original message) - case 17: // msg_id too high (similar to the previous case, the client time has to be synchronized, and the message re-sent with the correct msg_id) - _dcSession.LastSentMsgId = 0; - var localTime = DateTime.UtcNow; - _dcSession.ServerTicksOffset = (_lastRecvMsgId >> 32) * 10000000 - localTime.Ticks + 621355968000000000L; - Helpers.Log(1, $"Time offset: {_dcSession.ServerTicksOffset} | Server: {MsgIdToStamp(_lastRecvMsgId).AddTicks(_dcSession.ServerTicksOffset).TimeOfDay} UTC | Local: {localTime.TimeOfDay} UTC"); - break; - case 32: // msg_seqno too low (the server has already received a message with a lower msg_id but with either a higher or an equal and odd seqno) - case 33: // msg_seqno too high (similarly, there is a message with a higher msg_id but with either a lower or an equal and odd seqno) - if (_dcSession.Seqno <= 1) - retryLast = false; - else - { - Reset(false, false); - _dcSession.Renew(); - await ConnectAsync(); - } - break; - case 48: // incorrect server salt (in this case, the bad_server_salt response is received with the correct salt, and the message is to be re-sent with it) - _dcSession.Salt = ((BadServerSalt)badMsgNotification).new_server_salt; - break; - default: - retryLast = false; - break; - } - if (retryLast) - { - Rpc prevRequest; - lock (_pendingRpcs) - _pendingRpcs.TryGetValue(badMsgNotification.bad_msg_id, out prevRequest); - await SendAsync(lastSentMsg, true, prevRequest); - lock (_pendingRpcs) - _pendingRpcs.Remove(badMsgNotification.bad_msg_id); - } - else if (PullPendingRequest(badMsgNotification.bad_msg_id) is Rpc rpc) - { - if (_bareRpc.msgId == badMsgNotification.bad_msg_id) _bareRpc = null; - rpc.tcs.SetException(new ApplicationException($"BadMsgNotification {badMsgNotification.error_code}")); - } - else - OnUpdate(obj); - break; - default: - if (_bareRpc != null) - { - var rpc = PullPendingRequest(_bareRpc.msgId); - if (rpc?.type.IsAssignableFrom(obj.GetType()) == true) - { - _bareRpc = null; - rpc.tcs.SetResult(obj); - break; - } - } - OnUpdate(obj); - break; - } - - void SetResult(long msgId, object result) - { - var rpc = PullPendingRequest(msgId); - if (rpc != null) - rpc.tcs.SetResult(result); - else - OnUpdate(obj); - } - } - - private void OnUpdate(IObject obj) - { - try - { - Update?.Invoke(obj); - } - catch (Exception ex) - { - Helpers.Log(4, $"Update callback on {obj.GetType().Name} raised {ex}"); - } - } - /// Login as a bot (if not already logged-in). /// Config callback is queried for: bot_token ///
Bots can only call API methods marked with [bots: ✓] in their documentation.
@@ -1157,621 +982,179 @@ namespace WTelegram return user; } - /// Enable the collection of id/access_hash pairs (experimental) - public bool CollectAccessHash { get; set; } - readonly Dictionary> _accessHashes = new(); - public IEnumerable> AllAccessHashesFor() where T : IObject - => _accessHashes.GetValueOrDefault(typeof(T)); - /// Retrieve the access_hash associated with this id (for a TL class) if it was collected - /// This requires to be set to first. - ///
See Examples/Program_CollectAccessHash.cs for how to use this
- /// a TL object class. For example User, Channel or Photo - public long GetAccessHashFor(long id) where T : IObject + private MsgsAck CheckMsgsToAck() { - lock (_accessHashes) - return _accessHashes.GetOrCreate(typeof(T)).TryGetValue(id, out var access_hash) ? access_hash : 0; - } - public void SetAccessHashFor(long id, long access_hash) where T : IObject - { - lock (_accessHashes) - _accessHashes.GetOrCreate(typeof(T))[id] = access_hash; - } - static readonly FieldInfo userFlagsField = typeof(User).GetField("flags"); - static readonly FieldInfo channelFlagsField = typeof(Channel).GetField("flags"); - internal void CollectField(FieldInfo fieldInfo, object obj, object access_hash) - { - if (fieldInfo.Name != "access_hash") return; - if (access_hash is not long accessHash) return; - var type = fieldInfo.ReflectedType; - if ((type == typeof(User) && ((User.Flags)userFlagsField.GetValue(obj)).HasFlag(User.Flags.min)) || - (type == typeof(Channel) && ((Channel.Flags)channelFlagsField.GetValue(obj)).HasFlag(Channel.Flags.min))) - return; // access_hash from Min constructors are mostly useless. see https://core.telegram.org/api/min - if (type.GetField("id") is not FieldInfo idField) return; - if (idField.GetValue(obj) is not long id) - if (idField.GetValue(obj) is not int idInt) return; - else id = idInt; - lock (_accessHashes) - _accessHashes.GetOrCreate(type)[id] = accessHash; - } - - #region TL-Helpers - /// Helper function to upload a file to Telegram - /// Path to the file to upload - /// (optional) Callback for tracking the progression of the transfer - /// an or than can be used in various requests - public Task UploadFileAsync(string pathname, ProgressCallback progress = null) - => UploadFileAsync(File.OpenRead(pathname), Path.GetFileName(pathname), progress); - - /// Helper function to upload a file to Telegram - /// Content of the file to upload. This method close/dispose the stream - /// Name of the file - /// (optional) Callback for tracking the progression of the transfer - /// an or than can be used in various requests - public async Task UploadFileAsync(Stream stream, string filename, ProgressCallback progress = null) - { - using var md5 = MD5.Create(); - using (stream) + lock (_msgsToAck) { - long transmitted = 0, length = stream.Length; - var isBig = length >= 10 * 1024 * 1024; - int file_total_parts = (int)((length - 1) / FilePartSize) + 1; - long file_id = Helpers.RandomLong(); - int file_part = 0, read; - var tasks = new Dictionary(); - bool abort = false; - for (long bytesLeft = length; !abort && bytesLeft != 0; file_part++) - { - var bytes = new byte[Math.Min(FilePartSize, bytesLeft)]; - read = await stream.FullReadAsync(bytes, bytes.Length, default); - await _parallelTransfers.WaitAsync(); - bytesLeft -= read; - var task = SavePart(file_part, bytes); - lock (tasks) tasks[file_part] = task; - if (!isBig) - md5.TransformBlock(bytes, 0, read, null, 0); - if (read < FilePartSize && bytesLeft != 0) throw new ApplicationException($"Failed to fully read stream ({read},{bytesLeft})"); + if (_msgsToAck.Count == 0) return null; + var msgsAck = new MsgsAck { msg_ids = _msgsToAck.ToArray() }; + _msgsToAck.Clear(); + return msgsAck; + } + } - async Task SavePart(int file_part, byte[] bytes) + internal (long msgId, int seqno) NewMsgId(bool isContent) + { + int seqno; + long msgId = DateTime.UtcNow.Ticks + _dcSession.ServerTicksOffset - 621355968000000000L; + msgId = msgId * 428 + (msgId >> 24) * 25110956; // approximately unixtime*2^32 and divisible by 4 + lock (_session) + { + if (msgId <= _dcSession.LastSentMsgId) msgId = _dcSession.LastSentMsgId += 4; else _dcSession.LastSentMsgId = msgId; + seqno = isContent ? _dcSession.Seqno++ * 2 + 1 : _dcSession.Seqno * 2; + _session.Save(); + } + return (msgId, seqno); + } + + private async Task SendAsync(IObject msg, bool isContent, Rpc rpc = null) + { + isContent &= _dcSession.AuthKeyID != 0; + (long msgId, int seqno) = NewMsgId(isContent); + if (rpc != null) + lock (_pendingRpcs) + _pendingRpcs[rpc.msgId = msgId] = rpc; + if (isContent && CheckMsgsToAck() is MsgsAck msgsAck) + { + var (ackId, ackSeqno) = NewMsgId(false); + var container = new MsgContainer { messages = new _Message[] { new(msgId, seqno, msg), new(ackId, ackSeqno, msgsAck) } }; + await SendAsync(container, false); + return; + } + await _sendSemaphore.WaitAsync(); + try + { + using var memStream = new MemoryStream(1024); + using var writer = new BinaryWriter(memStream, Encoding.UTF8); + writer.Write(0); // int32 payload_len (to be patched with payload length) + + if (_dcSession.AuthKeyID == 0) // send unencrypted message + { + writer.Write(0L); // int64 auth_key_id = 0 (Unencrypted) + writer.Write(msgId); // int64 message_id + writer.Write(0); // int32 message_data_length (to be patched) + Helpers.Log(1, $"{_dcSession.DcID}>Sending {msg.GetType().Name.TrimEnd('_')}..."); + writer.WriteTLObject(msg); // bytes message_data + BinaryPrimitives.WriteInt32LittleEndian(memStream.GetBuffer().AsSpan(20), (int)memStream.Length - 24); // patch message_data_length + } + else + { + using var clearStream = new MemoryStream(1024); + using var clearWriter = new BinaryWriter(clearStream, Encoding.UTF8); + clearWriter.Write(_dcSession.AuthKey, 88, 32); + clearWriter.Write(_dcSession.Salt); // int64 salt + clearWriter.Write(_dcSession.Id); // int64 session_id + clearWriter.Write(msgId); // int64 message_id + clearWriter.Write(seqno); // int32 msg_seqno + clearWriter.Write(0); // int32 message_data_length (to be patched) + if ((seqno & 1) != 0) + Helpers.Log(1, $"{_dcSession.DcID}>Sending {msg.GetType().Name.TrimEnd('_'),-40} #{(short)msgId.GetHashCode():X4}"); + else + Helpers.Log(1, $"{_dcSession.DcID}>Sending {msg.GetType().Name.TrimEnd('_'),-40} {MsgIdToStamp(msgId):u} (svc)"); + clearWriter.WriteTLObject(msg); // bytes message_data + int clearLength = (int)clearStream.Length - 32; // length before padding (= 32 + message_data_length) + int padding = (0x7FFFFFF0 - clearLength) % 16; + padding += _random.Next(1, 64) * 16; // MTProto 2.0 padding must be between 12..1024 with total length divisible by 16 + clearStream.SetLength(32 + clearLength + padding); + byte[] clearBuffer = clearStream.GetBuffer(); + BinaryPrimitives.WriteInt32LittleEndian(clearBuffer.AsSpan(60), clearLength - 32); // patch message_data_length + RNG.GetBytes(clearBuffer, 32 + clearLength, padding); + var msgKeyLarge = _sha256.ComputeHash(clearBuffer, 0, 32 + clearLength + padding); + const int msgKeyOffset = 8; // msg_key = middle 128-bits of SHA256(authkey_part+plaintext+padding) + byte[] encrypted_data = EncryptDecryptMessage(clearBuffer.AsSpan(32, clearLength + padding), true, _dcSession.AuthKey, msgKeyLarge, msgKeyOffset, _sha256); + + writer.Write(_dcSession.AuthKeyID); // int64 auth_key_id + writer.Write(msgKeyLarge, msgKeyOffset, 16); // int128 msg_key + writer.Write(encrypted_data); // bytes encrypted_data + } + if (_paddedMode) // Padded intermediate mode => append random padding + { + var padding = new byte[_random.Next(16)]; + RNG.GetBytes(padding); + writer.Write(padding); + } + var buffer = memStream.GetBuffer(); + int frameLength = (int)memStream.Length; + BinaryPrimitives.WriteInt32LittleEndian(buffer, frameLength - 4); // patch payload_len with correct value +#if OBFUSCATION + _sendCtr.EncryptDecrypt(buffer, frameLength); +#endif + await _networkStream.WriteAsync(buffer, 0, frameLength); + _lastSentMsg = msg; + } + finally + { + _sendSemaphore.Release(); + } + } + + internal async Task InvokeBare(IMethod request) + { + if (_bareRpc != null) throw new ApplicationException("A bare request is already undergoing"); + _bareRpc = new Rpc { type = typeof(X) }; + await SendAsync(request, false, _bareRpc); + return (X)await _bareRpc.Task; + } + + /// Call the given TL method (You shouldn't need to use this method directly) + /// Expected type of the returned object + /// TL method structure + /// Wait for the reply and return the resulting object, or throws an RpcException if an error was replied + public async Task Invoke(IMethod query) + { + retry: + var rpc = new Rpc { type = typeof(X) }; + await SendAsync(query, true, rpc); + bool got503 = false; + var result = await rpc.Task; + switch (result) + { + case X resultX: return resultX; + case RpcError rpcError: + int number; + if (rpcError.error_code == 303 && ((number = rpcError.error_message.IndexOf("_MIGRATE_")) > 0)) { - try + if (!rpcError.error_message.StartsWith("FILE_")) { - if (isBig) - await this.Upload_SaveBigFilePart(file_id, file_part, file_total_parts, bytes); - else - await this.Upload_SaveFilePart(file_id, file_part, bytes); - lock (tasks) { transmitted += bytes.Length; tasks.Remove(file_part); } - progress?.Invoke(transmitted, length); - } - catch (Exception) - { - abort = true; - throw; - } - finally - { - _parallelTransfers.Release(); + number = int.Parse(rpcError.error_message[(number + 9)..]); + // this is a hack to migrate _dcSession in-place (staying in same Client): + Session.DCSession dcSession; + lock (_session) + dcSession = GetOrCreateDCSession(number, _dcSession.DataCenter.flags); + Reset(false, false); + _session.MainDC = number; + _dcSession.Client = null; + _dcSession = dcSession; + _dcSession.Client = this; + await ConnectAsync(); + goto retry; } } - } - Task[] remainingTasks; - lock (tasks) remainingTasks = tasks.Values.ToArray(); - await Task.WhenAll(remainingTasks); // wait completion and eventually propagate any task exception - if (!isBig) md5.TransformFinalBlock(Array.Empty(), 0, 0); - return isBig ? new InputFileBig { id = file_id, parts = file_total_parts, name = filename } - : new InputFile { id = file_id, parts = file_total_parts, name = filename, md5_checksum = md5.Hash }; - } - } - - /// Search messages with filter and text See - /// See for a list of possible filter types - /// User or chat, histories with which are searched, or constructor for global search - /// Text search request - /// Only return messages starting from the specified message ID - /// Number of results to return - public Task Messages_Search(InputPeer peer, string text = null, int offset_id = 0, int limit = int.MaxValue) where T : MessagesFilter, new() - => this.Messages_Search(peer, text, new T(), offset_id: offset_id, limit: limit); - - /// Helper function to send a media message more easily - /// Destination of message (chat group, channel, user chat, etc..) - /// Caption for the media (in plain text) or - /// Media file already uploaded to TG (see UploadFileAsync) - /// for automatic detection, "photo" for an inline photo, or a MIME type to send as a document - /// Your message is a reply to an existing message with this ID, in the same chat - /// Text formatting entities for the caption. You can use MarkdownToEntities to create these - /// UTC timestamp when the message should be sent - /// The transmitted message confirmed by Telegram - public Task SendMediaAsync(InputPeer peer, string caption, InputFileBase mediaFile, string mimeType = null, int reply_to_msg_id = 0, MessageEntity[] entities = null, DateTime schedule_date = default) - { - mimeType ??= Path.GetExtension(mediaFile.Name)?.ToLowerInvariant() switch - { - ".jpg" or ".jpeg" or ".png" or ".bmp" => "photo", - ".gif" => "image/gif", - ".webp" => "image/webp", - ".mp4" => "video/mp4", - ".mp3" => "audio/mpeg", - ".wav" => "audio/x-wav", - _ => "", // send as generic document with undefined MIME type - }; - if (mimeType == "photo") - return SendMessageAsync(peer, caption, new InputMediaUploadedPhoto { file = mediaFile }, reply_to_msg_id, entities, schedule_date); - return SendMessageAsync(peer, caption, new InputMediaUploadedDocument(mediaFile, mimeType), reply_to_msg_id, entities, schedule_date); - } - - /// Helper function to send a text or media message easily - /// Destination of message (chat group, channel, user chat, etc..) - /// The plain text of the message (or media caption) - /// An instance of InputMedia-derived class, or if there is no associated media - /// Your message is a reply to an existing message with this ID, in the same chat - /// Text formatting entities. You can use MarkdownToEntities to create these - /// UTC timestamp when the message should be sent - /// Should website/media preview be shown or not, for URLs in your message - /// The transmitted message as confirmed by Telegram - public async Task SendMessageAsync(InputPeer peer, string text, InputMedia media = null, int reply_to_msg_id = 0, MessageEntity[] entities = null, DateTime schedule_date = default, bool disable_preview = false) - { - UpdatesBase updates; - long random_id = Helpers.RandomLong(); - if (media == null) - updates = await this.Messages_SendMessage(peer, text, random_id, no_webpage: disable_preview, entities: entities, - reply_to_msg_id: reply_to_msg_id == 0 ? null : reply_to_msg_id, schedule_date: schedule_date == default ? null : schedule_date); - else - updates = await this.Messages_SendMedia(peer, media, text, random_id, entities: entities, - reply_to_msg_id: reply_to_msg_id == 0 ? null : reply_to_msg_id, schedule_date: schedule_date == default ? null : schedule_date); - OnUpdate(updates); - int msgId = -1; - foreach (var update in updates.UpdateList) - { - switch (update) - { - case UpdateMessageID updMsgId when updMsgId.random_id == random_id: msgId = updMsgId.id; break; - case UpdateNewMessage { message: Message message } when message.id == msgId: return message; - case UpdateNewScheduledMessage { message: Message schedMsg } when schedMsg.id == msgId: return schedMsg; - } - } - if (updates is UpdateShortSentMessage sent) - { - return new Message - { - flags = (Message.Flags)sent.flags | (reply_to_msg_id == 0 ? 0 : Message.Flags.has_reply_to) | (peer is InputPeerSelf ? 0 : Message.Flags.has_from_id), - id = sent.id, date = sent.date, message = text, entities = sent.entities, media = sent.media, ttl_period = sent.ttl_period, - reply_to = reply_to_msg_id == 0 ? null : new MessageReplyHeader { reply_to_msg_id = reply_to_msg_id }, - from_id = peer is InputPeerSelf ? null : new PeerUser { user_id = _session.UserId }, - peer_id = InputToPeer(peer) - }; - } - return null; - } - - /// Helper function to send an album (media group) of photos or documents more easily - /// Destination of message (chat group, channel, user chat, etc..) - /// An array of InputMedia-derived class - /// Caption for the media (in plain text) or - /// Your message is a reply to an existing message with this ID, in the same chat - /// Text formatting entities for the caption. You can use MarkdownToEntities to create these - /// UTC timestamp when the message should be sent - /// The last of the media group messages, confirmed by Telegram - /// - /// * The caption/entities are set on the last media
- /// * and are supported by downloading the file from the web via HttpClient and sending it to Telegram. - /// WTelegramClient proxy settings don't apply to HttpClient
- /// * You may run into errors if you mix, in the same album, photos and file documents having no thumbnails/video attributes - ///
- public async Task SendAlbumAsync(InputPeer peer, InputMedia[] medias, string caption = null, int reply_to_msg_id = 0, MessageEntity[] entities = null, DateTime schedule_date = default) - { - System.Net.Http.HttpClient httpClient = null; - var multiMedia = new InputSingleMedia[medias.Length]; - for (int i = 0; i < medias.Length; i++) - { - var ism = multiMedia[i] = new InputSingleMedia { random_id = Helpers.RandomLong(), media = medias[i] }; - retry: - switch (ism.media) - { - case InputMediaUploadedPhoto imup: - var mmp = (MessageMediaPhoto)await this.Messages_UploadMedia(peer, imup); - ism.media = mmp.photo; - break; - case InputMediaUploadedDocument imud: - var mmd = (MessageMediaDocument)await this.Messages_UploadMedia(peer, imud); - ism.media = mmd.document; - break; - case InputMediaDocumentExternal imde: - string mimeType = null; - var inputFile = await UploadFromUrl(imde.url); - ism.media = new InputMediaUploadedDocument(inputFile, mimeType); + else if (rpcError.error_code == 420 && ((number = rpcError.error_message.IndexOf("_WAIT_")) > 0)) + { + number = int.Parse(rpcError.error_message[(number + 6)..]); + if (number <= FloodRetryThreshold) + { + await Task.Delay(number * 1000); + goto retry; + } + } + else if (rpcError.error_code == -503 && !got503) + { + got503 = true; goto retry; - case InputMediaPhotoExternal impe: - inputFile = await UploadFromUrl(impe.url); - ism.media = new InputMediaUploadedPhoto { file = inputFile }; - goto retry; - - async Task UploadFromUrl(string url) - { - var filename = Path.GetFileName(new Uri(url).LocalPath); - httpClient ??= new(); - var response = await httpClient.GetAsync(url); - using var stream = await response.Content.ReadAsStreamAsync(); - mimeType = response.Content.Headers.ContentType?.MediaType; - if (response.Content.Headers.ContentLength is long length) - return await UploadFileAsync(new Helpers.IndirectStream(stream) { ContentLength = length }, filename); - else - { - using var ms = new MemoryStream(); - await stream.CopyToAsync(ms); - ms.Position = 0; - return await UploadFileAsync(ms, filename); - } - } - } - } - var lastMedia = multiMedia[^1]; - lastMedia.message = caption; - lastMedia.entities = entities; - if (entities != null) lastMedia.flags = InputSingleMedia.Flags.has_entities; - - var updates = await this.Messages_SendMultiMedia(peer, multiMedia, reply_to_msg_id: reply_to_msg_id, schedule_date: schedule_date); - OnUpdate(updates); - int msgId = -1; - foreach (var update in updates.UpdateList) - { - switch (update) - { - case UpdateMessageID updMsgId when updMsgId.random_id == lastMedia.random_id: msgId = updMsgId.id; break; - case UpdateNewMessage { message: Message message } when message.id == msgId: return message; - case UpdateNewScheduledMessage { message: Message schedMsg } when schedMsg.id == msgId: return schedMsg; - } - } - return null; - } - - private Peer InputToPeer(InputPeer peer) => peer switch - { - InputPeerSelf => new PeerUser { user_id = _session.UserId }, - InputPeerUser ipu => new PeerUser { user_id = ipu.user_id }, - InputPeerChat ipc => new PeerChat { chat_id = ipc.chat_id }, - InputPeerChannel ipch => new PeerChannel { channel_id = ipch.channel_id }, - InputPeerUserFromMessage ipufm => new PeerUser { user_id = ipufm.user_id }, - InputPeerChannelFromMessage ipcfm => new PeerChannel { channel_id = ipcfm.channel_id }, - _ => null, - }; - - /// Download a photo from Telegram into the outputStream - /// The photo to download - /// Stream to write the file content to. This method does not close/dispose the stream - /// A specific size/version of the photo, or to download the largest version of the photo - /// (optional) Callback for tracking the progression of the transfer - /// The file type of the photo - public async Task DownloadFileAsync(Photo photo, Stream outputStream, PhotoSizeBase photoSize = null, ProgressCallback progress = null) - { - if (photoSize is PhotoStrippedSize psp) - return InflateStrippedThumb(outputStream, psp.bytes) ? Storage_FileType.jpeg : 0; - photoSize ??= photo.LargestPhotoSize; - var fileLocation = photo.ToFileLocation(photoSize); - return await DownloadFileAsync(fileLocation, outputStream, photo.dc_id, photoSize.FileSize, progress); - } - - /// Download a document from Telegram into the outputStream - /// The document to download - /// Stream to write the file content to. This method does not close/dispose the stream - /// A specific size/version of the document thumbnail to download, or to download the document itself - /// (optional) Callback for tracking the progression of the transfer - /// MIME type of the document/thumbnail - public async Task DownloadFileAsync(Document document, Stream outputStream, PhotoSizeBase thumbSize = null, ProgressCallback progress = null) - { - if (thumbSize is PhotoStrippedSize psp) - return InflateStrippedThumb(outputStream, psp.bytes) ? "image/jpeg" : null; - var fileLocation = document.ToFileLocation(thumbSize); - var fileType = await DownloadFileAsync(fileLocation, outputStream, document.dc_id, thumbSize?.FileSize ?? document.size, progress); - return thumbSize == null ? document.mime_type : "image/" + fileType; - } - - /// Download a file from Telegram into the outputStream - /// Telegram file identifier, typically obtained with a .ToFileLocation() call - /// Stream to write file content to. This method does not close/dispose the stream - /// (optional) DC on which the file is stored - /// (optional) Expected file size - /// (optional) Callback for tracking the progression of the transfer - /// The file type - public async Task DownloadFileAsync(InputFileLocationBase fileLocation, Stream outputStream, int dc_id = 0, int fileSize = 0, ProgressCallback progress = null) - { - Storage_FileType fileType = Storage_FileType.unknown; - var client = dc_id == 0 ? this : await GetClientForDC(dc_id, true); - using var writeSem = new SemaphoreSlim(1); - long streamStartPos = outputStream.Position; - int fileOffset = 0, maxOffsetSeen = 0; - long transmitted = 0; - var tasks = new Dictionary(); - progress?.Invoke(0, fileSize); - bool abort = false; - while (!abort) - { - await _parallelTransfers.WaitAsync(); - var task = LoadPart(fileOffset); - lock (tasks) tasks[fileOffset] = task; - if (dc_id == 0) { await task; dc_id = client._dcSession.DcID; } - fileOffset += FilePartSize; - if (fileSize != 0 && fileOffset >= fileSize) - { - if (await task != ((fileSize - 1) % FilePartSize) + 1) - throw new ApplicationException("Downloaded file size does not match expected file size"); - break; - } - - async Task LoadPart(int offset) - { - Upload_FileBase fileBase; - try - { - fileBase = await client.Upload_GetFile(fileLocation, offset, FilePartSize); } - catch (RpcException ex) when (ex.Code == 303 && ex.Message.StartsWith("FILE_MIGRATE_")) + else if (rpcError.error_code == 500 && rpcError.error_message == "AUTH_RESTART") { - var dcId = int.Parse(ex.Message[13..]); - client = await GetClientForDC(dcId, true); - fileBase = await client.Upload_GetFile(fileLocation, offset, FilePartSize); + _session.UserId = 0; // force a full login authorization flow, next time + lock (_session) _session.Save(); } - catch (RpcException ex) when (ex.Code == 400 && ex.Message == "OFFSET_INVALID") - { - abort = true; - return 0; - } - catch (Exception) - { - abort = true; - throw; - } - finally - { - _parallelTransfers.Release(); - } - if (fileBase is not Upload_File fileData) - throw new ApplicationException("Upload_GetFile returned unsupported " + fileBase?.GetType().Name); - if (fileData.bytes.Length != FilePartSize) abort = true; - if (fileData.bytes.Length != 0) - { - fileType = fileData.type; - await writeSem.WaitAsync(); - try - { - if (streamStartPos + offset != outputStream.Position) // if we're about to write out of order - { - await outputStream.FlushAsync(); // async flush, otherwise Seek would do a sync flush - outputStream.Seek(streamStartPos + offset, SeekOrigin.Begin); - } - await outputStream.WriteAsync(fileData.bytes, 0, fileData.bytes.Length); - maxOffsetSeen = Math.Max(maxOffsetSeen, offset + fileData.bytes.Length); - transmitted += fileData.bytes.Length; - } - catch (Exception) - { - abort = true; - throw; - } - finally - { - writeSem.Release(); - progress?.Invoke(transmitted, fileSize); - } - } - lock (tasks) tasks.Remove(offset); - return fileData.bytes.Length; - } - } - Task[] remainingTasks; - lock (tasks) remainingTasks = tasks.Values.ToArray(); - await Task.WhenAll(remainingTasks); // wait completion and eventually propagate any task exception - await outputStream.FlushAsync(); - outputStream.Seek(streamStartPos + maxOffsetSeen, SeekOrigin.Begin); - return fileType; - } - - /// Download the profile photo for a given peer into the outputStream - /// User, Chat or Channel - /// Stream to write the file content to. This method does not close/dispose the stream - /// Whether to download the high-quality version of the picture - /// Whether to extract the embedded very low-res thumbnail (synchronous, no actual download needed) - /// The file type of the photo, or 0 if no photo available - public async Task DownloadProfilePhotoAsync(IPeerInfo peer, Stream outputStream, bool big = false, bool miniThumb = false) - { - int dc_id; - long photo_id; - byte[] stripped_thumb; - switch (peer) - { - case User user: - if (user.photo == null) return 0; - dc_id = user.photo.dc_id; - photo_id = user.photo.photo_id; - stripped_thumb = user.photo.stripped_thumb; - break; - case ChatBase { Photo: var photo }: - if (photo == null) return 0; - dc_id = photo.dc_id; - photo_id = photo.photo_id; - stripped_thumb = photo.stripped_thumb; - break; + throw new RpcException(rpcError.error_code, rpcError.error_message); + case ReactorError: + goto retry; default: - return 0; - } - if (miniThumb && !big) - return InflateStrippedThumb(outputStream, stripped_thumb) ? Storage_FileType.jpeg : 0; - var fileLocation = new InputPeerPhotoFileLocation { peer = peer.ToInputPeer(), photo_id = photo_id }; - if (big) fileLocation.flags |= InputPeerPhotoFileLocation.Flags.big; - return await DownloadFileAsync(fileLocation, outputStream, dc_id); - } - - private static bool InflateStrippedThumb(Stream outputStream, byte[] stripped_thumb) - { - if (stripped_thumb == null || stripped_thumb.Length <= 3 || stripped_thumb[0] != 1) - return false; - var header = Helpers.StrippedThumbJPG; - outputStream.Write(header, 0, 164); - outputStream.WriteByte(stripped_thumb[1]); - outputStream.WriteByte(0); - outputStream.WriteByte(stripped_thumb[2]); - outputStream.Write(header, 167, header.Length - 167); - outputStream.Write(stripped_thumb, 3, stripped_thumb.Length - 3); - outputStream.WriteByte(0xff); - outputStream.WriteByte(0xd9); - return true; - } - - /// Returns the current user dialog list. Possible codes: 400 (details) - /// Peer folder ID, for more info click here - /// See - public async Task Messages_GetAllDialogs(int? folder_id = null) - { - var dialogs = await this.Messages_GetDialogs(folder_id: folder_id); - switch (dialogs) - { - case Messages_DialogsSlice mds: - var dialogList = new List(); - var messageList = new List(); - while (dialogs.Dialogs.Length != 0) - { - dialogList.AddRange(dialogs.Dialogs); - messageList.AddRange(dialogs.Messages); - var lastDialog = dialogs.Dialogs[^1]; - var lastMsg = dialogs.Messages.LastOrDefault(m => m.Peer.ID == lastDialog.Peer.ID && m.ID == lastDialog.TopMessage); - var offsetPeer = dialogs.UserOrChat(lastDialog).ToInputPeer(); - dialogs = await this.Messages_GetDialogs(lastMsg?.Date ?? default, lastDialog.TopMessage, offsetPeer, folder_id: folder_id); - if (dialogs is not Messages_Dialogs md) break; - foreach (var (key, value) in md.chats) mds.chats[key] = value; - foreach (var (key, value) in md.users) mds.users[key] = value; - } - mds.dialogs = dialogList.ToArray(); - mds.messages = messageList.ToArray(); - return mds; - case Messages_Dialogs md: return md; - default: throw new ApplicationException("Messages_GetDialogs returned unexpected " + dialogs?.GetType().Name); + throw new ApplicationException($"{query.GetType().Name} call got a result of type {result.GetType().Name} instead of {typeof(X).Name}"); } } - - /// Helper method that tries to fetch all participants from a Channel (beyond Telegram server-side limitations) - /// The channel to query - /// Also fetch the kicked/banned members? - /// first letters used to search for in participants names
(default values crafted with ♥ to find most latin and cyrillic names) - /// second (and further) letters used to search for in participants names - /// Can be used to abort the work of this method - /// Field count indicates the total count of members. Field participants contains those that were successfully fetched - /// ⚠ This method can take a few minutes to complete on big broadcast channels. It likely won't be able to obtain the full total count of members - public async Task Channels_GetAllParticipants(InputChannelBase channel, bool includeKickBan = false, string alphabet1 = "АБCДЕЄЖФГHИІJКЛМНОПQРСТУВWХЦЧШЩЫЮЯЗ", string alphabet2 = "АCЕHИJЛМНОРСТУВWЫ", CancellationToken cancellationToken = default) - { - alphabet2 ??= alphabet1; - var result = new Channels_ChannelParticipants { chats = new(), users = new() }; - var user_ids = new HashSet(); - var participants = new List(); - - var mcf = await this.Channels_GetFullChannel(channel); - result.count = mcf.full_chat.ParticipantsCount; - if (result.count > 2000 && ((Channel)mcf.chats[channel.ChannelId]).IsChannel) - Helpers.Log(2, "Fetching all participants on a big channel can take several minutes..."); - await GetWithFilter(new ChannelParticipantsAdmins()); - await GetWithFilter(new ChannelParticipantsBots()); - await GetWithFilter(new ChannelParticipantsSearch { q = "" }, (f, c) => new ChannelParticipantsSearch { q = f.q + c }, alphabet1); - if (includeKickBan) - { - await GetWithFilter(new ChannelParticipantsKicked { q = "" }, (f, c) => new ChannelParticipantsKicked { q = f.q + c }, alphabet1); - await GetWithFilter(new ChannelParticipantsBanned { q = "" }, (f, c) => new ChannelParticipantsBanned { q = f.q + c }, alphabet1); - } - result.participants = participants.ToArray(); - return result; - - async Task GetWithFilter(T filter, Func recurse = null, string alphabet = null) where T : ChannelParticipantsFilter - { - Channels_ChannelParticipants ccp; - int maxCount = 0; - for (int offset = 0; ;) - { - cancellationToken.ThrowIfCancellationRequested(); - ccp = await this.Channels_GetParticipants(channel, filter, offset, 1024, 0); - if (ccp.count > maxCount) maxCount = ccp.count; - foreach (var kvp in ccp.chats) result.chats[kvp.Key] = kvp.Value; - foreach (var kvp in ccp.users) result.users[kvp.Key] = kvp.Value; - lock (participants) - foreach (var participant in ccp.participants) - if (user_ids.Add(participant.UserID)) - participants.Add(participant); - offset += ccp.participants.Length; - if (offset >= ccp.count || ccp.participants.Length == 0) break; - } - Helpers.Log(0, $"GetParticipants({(filter as ChannelParticipantsSearch)?.q}) returned {ccp.count}/{maxCount}.\tAccumulated count: {participants.Count}"); - if (recurse != null && (ccp.count < maxCount - 100 || ccp.count == 200 || ccp.count == 1000)) - foreach (var c in alphabet) - await GetWithFilter(recurse(filter, c), recurse, c == 'А' ? alphabet : alphabet2); - } - } - - public Task AddChatUser(InputPeer peer, InputUserBase user, int fwd_limit = int.MaxValue) => peer switch - { - InputPeerChat chat => this.Messages_AddChatUser(chat.chat_id, user, fwd_limit), - InputPeerChannel channel => this.Channels_InviteToChannel(channel, new[] { user }), - _ => throw new ArgumentException("This method works on Chat & Channel only"), - }; - - public Task DeleteChatUser(InputPeer peer, InputUser user, bool revoke_history = true) => peer switch - { - InputPeerChat chat => this.Messages_DeleteChatUser(chat.chat_id, user, revoke_history), - InputPeerChannel channel => this.Channels_EditBanned(channel, user, new ChatBannedRights { flags = ChatBannedRights.Flags.view_messages }), - _ => throw new ArgumentException("This method works on Chat & Channel only"), - }; - - public Task LeaveChat(InputPeer peer, bool revoke_history = true) => peer switch - { - InputPeerChat chat => this.Messages_DeleteChatUser(chat.chat_id, InputUser.Self, revoke_history), - InputPeerChannel channel => this.Channels_LeaveChannel(channel), - _ => throw new ArgumentException("This method works on Chat & Channel only"), - }; - - public async Task EditChatAdmin(InputPeer peer, InputUserBase user, bool is_admin) - { - switch (peer) - { - case InputPeerChat chat: - await this.Messages_EditChatAdmin(chat.chat_id, user, is_admin); - return new Updates { date = DateTime.UtcNow, users = new(), updates = Array.Empty(), - chats = (await this.Messages_GetChats(new[] { chat.chat_id })).chats }; - case InputPeerChannel channel: - return await this.Channels_EditAdmin(channel, user, - new ChatAdminRights { flags = is_admin ? (ChatAdminRights.Flags)0x8BF : 0 }, null); - default: - throw new ArgumentException("This method works on Chat & Channel only"); - } - } - - public Task EditChatPhoto(InputPeer peer, InputChatPhotoBase photo) => peer switch - { - InputPeerChat chat => this.Messages_EditChatPhoto(chat.chat_id, photo), - InputPeerChannel channel => this.Channels_EditPhoto(channel, photo), - _ => throw new ArgumentException("This method works on Chat & Channel only"), - }; - - public Task EditChatTitle(InputPeer peer, string title) => peer switch - { - InputPeerChat chat => this.Messages_EditChatTitle(chat.chat_id, title), - InputPeerChannel channel => this.Channels_EditTitle(channel, title), - _ => throw new ArgumentException("This method works on Chat & Channel only"), - }; - - public Task GetFullChat(InputPeer peer) => peer switch - { - InputPeerChat chat => this.Messages_GetFullChat(chat.chat_id), - InputPeerChannel channel => this.Channels_GetFullChannel(channel), - _ => throw new ArgumentException("This method works on Chat & Channel only"), - }; - - public async Task DeleteChat(InputPeer peer) - { - switch (peer) - { - case InputPeerChat chat: - await this.Messages_DeleteChat(chat.chat_id); - return new Updates { date = DateTime.UtcNow, users = new(), updates = Array.Empty(), - chats = (await this.Messages_GetChats(new[] { chat.chat_id })).chats }; - case InputPeerChannel channel: - return await this.Channels_DeleteChannel(channel); - default: - throw new ArgumentException("This method works on Chat & Channel only"); - } - } - #endregion } }