Introducing the UpdateManager to streamline the handling of continuous updates (see FAQ)

This commit is contained in:
Wizou 2024-03-30 17:09:54 +01:00
parent 3d224afb23
commit 210a3365e5
8 changed files with 620 additions and 47 deletions

View file

@ -95,7 +95,7 @@ foreach (Dialog dialog in dialogs.dialogs)
Notes:
- The lists returned by Messages_GetAllDialogs contains the `access_hash` for those chats and users.
- See also the `Main` method in [Examples/Program_ListenUpdates.cs](https://github.com/wiz0u/WTelegramClient/blob/master/Examples/Program_ListenUpdates.cs?ts=4#L20).
- See also the `Main` method in [Examples/Program_ListenUpdates.cs](https://github.com/wiz0u/WTelegramClient/blob/master/Examples/Program_ListenUpdates.cs?ts=4#L18).
- To retrieve the dialog information about a specific [peer](README.md#terminology), use `client.Messages_GetPeerDialogs(inputPeer)`
<a name="list-chats"></a>
@ -114,7 +114,7 @@ Notes:
- The list returned by Messages_GetAllChats contains the `access_hash` for those chats. Read [FAQ #4](FAQ.md#access-hash) about this.
- If a basic chat group has been migrated to a supergroup, you may find both the old `Chat` and a `Channel` with different IDs in the `chats.chats` result,
but the old `Chat` will be marked with flag [deactivated] and should not be used anymore. See [Terminology in ReadMe](README.md#terminology).
- You can find a longer version of this method call in [Examples/Program_GetAllChats.cs](https://github.com/wiz0u/WTelegramClient/blob/master/Examples/Program_GetAllChats.cs?ts=4#L32)
- You can find a longer version of this method call in [Examples/Program_GetAllChats.cs](https://github.com/wiz0u/WTelegramClient/blob/master/Examples/Program_GetAllChats.cs?ts=4#L31)
<a name="list-members"></a>
## List the members from a chat
@ -187,17 +187,17 @@ Notes:
<a name="updates"></a>
## Monitor all Telegram events happening for the user
This is done through the `client.OnUpdates` callback event.
Your event handler implementation can either return `Task.CompletedTask` or be an `async Task` method.
This is done through the `client.OnUpdates` callback event, or via the [UpdateManager class](FAQ.md#manager) that simplifies the handling of updates.
See [Examples/Program_ListenUpdates.cs](https://github.com/wiz0u/WTelegramClient/blob/master/Examples/Program_ListenUpdates.cs?ts=4#L23).
See [Examples/Program_ListenUpdates.cs](https://github.com/wiz0u/WTelegramClient/blob/master/Examples/Program_ListenUpdates.cs?ts=4#L21).
<a name="monitor-msg"></a>
## Monitor new messages being posted in chats in real-time
You have to handle `client.OnUpdates` events containing an `UpdateNewMessage`.
You have to handle update events containing an `UpdateNewMessage`.
This can be done through the `client.OnUpdates` callback event, or via the [UpdateManager class](FAQ.md#manager) that simplifies the handling of updates.
See the `HandleMessage` method in [Examples/Program_ListenUpdates.cs](https://github.com/wiz0u/WTelegramClient/blob/master/Examples/Program_ListenUpdates.cs?ts=4#L23).
See the `HandleMessage` method in [Examples/Program_ListenUpdates.cs](https://github.com/wiz0u/WTelegramClient/blob/master/Examples/Program_ListenUpdates.cs?ts=4#L21).
You can filter specific chats the message are posted in, by looking at the `Message.peer_id` field.
See also [explanation below](#message-user) to extract user/chat info from messages.
@ -208,7 +208,7 @@ See also [explanation below](#message-user) to extract user/chat info from messa
This is done using the helper method `client.DownloadFileAsync(file, outputStream)`
that simplifies the download of a photo/document/file once you get a reference to its location *(through updates or API calls)*.
See [Examples/Program_DownloadSavedMedia.cs](https://github.com/wiz0u/WTelegramClient/blob/master/Examples/Program_DownloadSavedMedia.cs?ts=4#L31) that download all media files you forward to yourself (Saved Messages)
See [Examples/Program_DownloadSavedMedia.cs](https://github.com/wiz0u/WTelegramClient/blob/master/Examples/Program_DownloadSavedMedia.cs?ts=4#L28) that download all media files you forward to yourself (Saved Messages)
<a name="upload"></a>
## Upload a media file and post it with caption to a chat
@ -498,6 +498,8 @@ A message contains those two fields/properties:
These two fields derive from class `Peer` and can be of type `PeerChat`, `PeerChannel` or `PeerUser` depending on the nature of WHERE & WHO
(private chat with a user? message posted BY a channel IN a chat? ...)
> ✳️ It is recommended that you use the [UpdateManager class](FAQ.md#manager), as it handles automatically all of the details below, and you just need to use `Manager.UserOrChat(peer)` or Manager.Users/Chats dictionaries
The root structure where you obtained the message (typically `UpdatesBase` or `Messages_MessagesBase`) inherits from `IPeerResolver`.
This allows you to call `.UserOrChat(peer)` on the root structure, in order to resolve those fields into a `User` class, or a `ChatBase`-derived class
(typically `Chat` or `Channel`) which will give you details about the peer, instead of just the ID.
@ -506,7 +508,7 @@ However, in some case _(typically when dealing with updates)_, Telegram might ch
because it expects you to already know about it (`UserOrChat` returns `null`).
That's why you should collect users/chats details each time you're dealing with Updates or other API results inheriting from `IPeerResolver`,
and use the collected dictionaries to find details about users/chats
([see previous section](#collect-users-chats) and [Program_ListenUpdates.cs](https://github.com/wiz0u/WTelegramClient/blob/master/Examples/Program_ListenUpdates.cs?ts=4#L23) example)
([see previous section](#collect-users-chats) and [Program_ListenUpdates.cs](https://github.com/wiz0u/WTelegramClient/blob/master/Examples/Program_ListenUpdates.cs?ts=4#L21) example)
And finally, it may happen that you receive updates of type `UpdateShortMessage` or `UpdateShortChatMessage` with totally unknown peers (even in your collected dictionaries).
In this case, [Telegram recommends](https://core.telegram.org/api/updates#recovering-gaps) that you use the [`Updates_GetDifference`](https://corefork.telegram.org/method/updates.getDifference) method to retrieve the full information associated with the short message.

View file

@ -1,5 +1,4 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using TL;
@ -8,9 +7,8 @@ namespace WTelegramClientTest
static class Program_ListenUpdates
{
static WTelegram.Client Client;
static WTelegram.UpdateManager Manager;
static User My;
static readonly Dictionary<long, User> Users = [];
static readonly Dictionary<long, ChatBase> Chats = [];
// go to Project Properties > Debug > Environment variables and add at least these: api_id, api_hash, phone_number
static async Task Main(string[] _)
@ -20,43 +18,37 @@ namespace WTelegramClientTest
Client = new WTelegram.Client(Environment.GetEnvironmentVariable);
using (Client)
{
Client.OnUpdates += Client_OnUpdates;
Manager = Client.WithUpdateManager(Client_OnUpdate/*, "Updates.state"*/);
My = await Client.LoginUserIfNeeded();
Users[My.id] = My;
// Note: on login, Telegram may sends a bunch of updates/messages that happened in the past and were not acknowledged
Console.WriteLine($"We are logged-in as {My.username ?? My.first_name + " " + My.last_name} (id {My.id})");
// We collect all infos about the users/chats so that updates can be printed with their names
var dialogs = await Client.Messages_GetAllDialogs(); // dialogs = groups/channels/users
dialogs.CollectUsersChats(Users, Chats);
dialogs.CollectUsersChats(Manager.Users, Manager.Chats);
Console.ReadKey();
}
//Manager.SaveState("Updates.state"); // if you want to resume missed updates on the next run (see WithUpdateManager above)
}
// if not using async/await, we could just return Task.CompletedTask
private static async Task Client_OnUpdates(UpdatesBase updates)
private static async Task Client_OnUpdate(Update update)
{
updates.CollectUsersChats(Users, Chats);
if (updates is UpdateShortMessage usm && !Users.ContainsKey(usm.user_id))
(await Client.Updates_GetDifference(usm.pts - usm.pts_count, usm.date, 0)).CollectUsersChats(Users, Chats);
else if (updates is UpdateShortChatMessage uscm && (!Users.ContainsKey(uscm.from_id) || !Chats.ContainsKey(uscm.chat_id)))
(await Client.Updates_GetDifference(uscm.pts - uscm.pts_count, uscm.date, 0)).CollectUsersChats(Users, Chats);
foreach (var update in updates.UpdateList)
switch (update)
{
case UpdateNewMessage unm: await HandleMessage(unm.message); break;
case UpdateEditMessage uem: await HandleMessage(uem.message, true); break;
// Note: UpdateNewChannelMessage and UpdateEditChannelMessage are also handled by above cases
case UpdateDeleteChannelMessages udcm: Console.WriteLine($"{udcm.messages.Length} message(s) deleted in {Chat(udcm.channel_id)}"); break;
case UpdateDeleteMessages udm: Console.WriteLine($"{udm.messages.Length} message(s) deleted"); break;
case UpdateUserTyping uut: Console.WriteLine($"{User(uut.user_id)} is {uut.action}"); break;
case UpdateChatUserTyping ucut: Console.WriteLine($"{Peer(ucut.from_id)} is {ucut.action} in {Chat(ucut.chat_id)}"); break;
case UpdateChannelUserTyping ucut2: Console.WriteLine($"{Peer(ucut2.from_id)} is {ucut2.action} in {Chat(ucut2.channel_id)}"); break;
case UpdateChatParticipants { participants: ChatParticipants cp }: Console.WriteLine($"{cp.participants.Length} participants in {Chat(cp.chat_id)}"); break;
case UpdateUserStatus uus: Console.WriteLine($"{User(uus.user_id)} is now {uus.status.GetType().Name[10..]}"); break;
case UpdateUserName uun: Console.WriteLine($"{User(uun.user_id)} has changed profile name: {uun.first_name} {uun.last_name}"); break;
case UpdateUser uu: Console.WriteLine($"{User(uu.user_id)} has changed infos/photo"); break;
default: Console.WriteLine(update.GetType().Name); break; // there are much more update types than the above example cases
}
switch (update)
{
case UpdateNewMessage unm: await HandleMessage(unm.message); break;
case UpdateEditMessage uem: await HandleMessage(uem.message, true); break;
// Note: UpdateNewChannelMessage and UpdateEditChannelMessage are also handled by above cases
case UpdateDeleteChannelMessages udcm: Console.WriteLine($"{udcm.messages.Length} message(s) deleted in {Chat(udcm.channel_id)}"); break;
case UpdateDeleteMessages udm: Console.WriteLine($"{udm.messages.Length} message(s) deleted"); break;
case UpdateUserTyping uut: Console.WriteLine($"{User(uut.user_id)} is {uut.action}"); break;
case UpdateChatUserTyping ucut: Console.WriteLine($"{Peer(ucut.from_id)} is {ucut.action} in {Chat(ucut.chat_id)}"); break;
case UpdateChannelUserTyping ucut2: Console.WriteLine($"{Peer(ucut2.from_id)} is {ucut2.action} in {Chat(ucut2.channel_id)}"); break;
case UpdateChatParticipants { participants: ChatParticipants cp }: Console.WriteLine($"{cp.participants.Length} participants in {Chat(cp.chat_id)}"); break;
case UpdateUserStatus uus: Console.WriteLine($"{User(uus.user_id)} is now {uus.status.GetType().Name[10..]}"); break;
case UpdateUserName uun: Console.WriteLine($"{User(uun.user_id)} has changed profile name: {uun.first_name} {uun.last_name}"); break;
case UpdateUser uu: Console.WriteLine($"{User(uu.user_id)} has changed infos/photo"); break;
default: Console.WriteLine(update.GetType().Name); break; // there are much more update types than the above example cases
}
}
// in this example method, we're not using async/await, so we just return Task.CompletedTask
@ -71,9 +63,8 @@ namespace WTelegramClientTest
return Task.CompletedTask;
}
private static string User(long id) => Users.TryGetValue(id, out var user) ? user.ToString() : $"User {id}";
private static string Chat(long id) => Chats.TryGetValue(id, out var chat) ? chat.ToString() : $"Chat {id}";
private static string Peer(Peer peer) => peer is null ? null : peer is PeerUser user ? User(user.user_id)
: peer is PeerChat or PeerChannel ? Chat(peer.ID) : $"Peer {peer.ID}";
private static string User(long id) => Manager.Users.TryGetValue(id, out var user) ? user.ToString() : $"User {id}";
private static string Chat(long id) => Manager.Chats.TryGetValue(id, out var chat) ? chat.ToString() : $"Chat {id}";
private static string Peer(Peer peer) => Manager.UserOrChat(peer)?.ToString() ?? $"Peer {peer?.ID}";
}
}

32
FAQ.md
View file

@ -329,3 +329,35 @@ For a console program, this is typical done by waiting for a key or some close e
5) Is every Telegram API call rejected? (typically with an exception message like `AUTH_RESTART`)
The user authentification might have failed at some point (or the user revoked the authorization).
It is therefore necessary to go through the authentification again. This can be done by deleting the WTelegram.session file, or at runtime by calling `client.Reset()`
<a name="manager"></a>
# About the UpdateManager
The UpdateManager does the following:
- ensure the correct sequential order of receiving updates (Telegram may send them in wrong order)
- fetch the missing updates if there was a gap (missing update) in the flow of incoming updates
- resume the flow of updates where you left off if you stopped your program (with saved state)
- collect the users & chats from updates automatically for you _(by default)_
- simplifies the handling of the various containers of update (UpdatesBase)
To use the UpdateManager, instead of setting `client.OnUpdates`, you call:
```csharp
// if you don't care about missed updates while your program was down:
var manager = client.WithUpdateManager(OnUpdate);
// if you want to recover missed updates using the state saved on the last run of your program
var manager = client.WithUpdateManager(OnUpdate, "Updates.state");
// to save the state later, preferably after disposing the client:
manager.SaveState("Updates.state")
// (WithUpdateManager has other parameters for advanced use)
```
Your `OnUpdate` method will directly take a single `Update` as parameter, instead of a container of updates.
The `manager.Users` and `manager.Chats` dictionaries will collect the users/chats data from updates.
You can also feed them manually from result of your API calls by calling `result.CollectUsersChats(manager.Users, manager.Chats);` and resolve Peer fields via `manager.UserOrChat(peer)`
See [Examples/Program_ListenUpdates.cs](https://github.com/wiz0u/WTelegramClient/blob/master/Examples/Program_ListenUpdates.cs?ts=4#L21) for an example of implementation.
Notes:
- set `manager.Log` if you want different logger settings than the client
- `WithUpdateManager()` has other parameters for advanced use

View file

@ -164,8 +164,9 @@ See [FAQ #4](https://wiz0u.github.io/WTelegramClient/FAQ#access-hash) to learn m
# Other things to know
The Client class also offers `OnUpdates` and `OnOther` events that are triggered when Telegram servers sends Updates (like new messages or status) or other notifications, independently of your API requests.
See [Examples/Program_ListenUpdates.cs](https://github.com/wiz0u/WTelegramClient/blob/master/Examples/Program_ListenUpdates.cs?ts=4#L23) and [Examples/Program_ReactorError.cs](https://github.com/wiz0u/WTelegramClient/blob/master/Examples/Program_ReactorError.cs?ts=4#L32)
The Client class offers `OnUpdates` and `OnOther` events that are triggered when Telegram servers sends Updates (like new messages or status) or other notifications, independently of your API requests.
You can also use the [UpdateManager class](https://wiz0u.github.io/WTelegramClient/FAQ#manager) to simplify the handling of such updates.
See [Examples/Program_ListenUpdates.cs](https://github.com/wiz0u/WTelegramClient/blob/master/Examples/Program_ListenUpdates.cs?ts=4#L21) and [Examples/Program_ReactorError.cs](https://github.com/wiz0u/WTelegramClient/blob/master/Examples/Program_ReactorError.cs?ts=4#L30)
An invalid API request can result in a `RpcException` being raised, reflecting the [error code and status text](https://revgram.github.io/errors.html) of the problem.

View file

@ -23,9 +23,9 @@ namespace WTelegram
public partial class Client : IDisposable
{
/// <summary>This event will be called when unsollicited updates/messages are sent by Telegram servers</summary>
/// <remarks>Make your handler <see langword="async"/>, or return <see cref="Task.CompletedTask"/> or <see langword="null"/><br/>See <see href="https://github.com/wiz0u/WTelegramClient/blob/master/Examples/Program_ListenUpdates.cs?ts=4#L23">Examples/Program_ListenUpdate.cs</see> for how to use this</remarks>
/// <remarks>Make your handler <see langword="async"/>, or return <see cref="Task.CompletedTask"/> or <see langword="null"/><br/>See <see href="https://github.com/wiz0u/WTelegramClient/blob/master/Examples/Program_ReactorError.cs?ts=4#L30">Examples/Program_ReactorError.cs</see> for how to use this<br/>or <see href="https://github.com/wiz0u/WTelegramClient/blob/master/Examples/Program_ListenUpdates.cs?ts=4#L21">Examples/Program_ListenUpdate.cs</see> using the UpdateManager class instead</remarks>
public event Func<UpdatesBase, Task> OnUpdates;
[Obsolete("This event was renamed OnUpdates (plural)")]
[Obsolete("This event was renamed OnUpdates (plural). You may also want to consider using our new UpdateManager class instead (see FAQ)")]
public event Func<UpdatesBase, Task> OnUpdate { add { OnUpdates += value; } remove { OnUpdates -= value; } }
/// <summary>This event is called for other types of notifications (login states, reactor errors, ...)</summary>
public event Func<IObject, Task> OnOther;
@ -352,7 +352,6 @@ namespace WTelegram
_pendingRpcs.Clear();
_bareRpc = null;
}
// TODO: implement an Updates gaps handling system? https://core.telegram.org/api/updates
if (IsMainDC)
{
var updatesState = await this.Updates_GetState(); // this call reenables incoming Updates

View file

@ -10,6 +10,9 @@ using System.Threading.Tasks;
#if NET8_0_OR_GREATER
[JsonSerializable(typeof(WTelegram.Session))]
[JsonSerializable(typeof(Dictionary<long, WTelegram.UpdateManager.MBoxState>))]
[JsonSerializable(typeof(IDictionary<long, WTelegram.UpdateManager.MBoxState>))]
[JsonSerializable(typeof(System.Collections.Immutable.ImmutableDictionary<long, WTelegram.UpdateManager.MBoxState>))]
internal partial class WTelegramContext : JsonSerializerContext { }
#endif

544
src/UpdateManager.cs Normal file
View file

@ -0,0 +1,544 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.ComponentModel;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using TL;
namespace WTelegram
{
public delegate IPeerInfo UserChatCollector(Dictionary<long, User> users, Dictionary<long, ChatBase> chats);
public class UpdateManager : IPeerResolver
{
/// <summary>Collected info about Users <i>(only if using the default collector)</i></summary>
public readonly Dictionary<long, User> Users;
/// <summary>Collected info about Chats <i>(only if using the default collector)</i></summary>
public readonly Dictionary<long, ChatBase> Chats;
/// <summary>Timout to detect lack of updates and force refetch them</summary>
public TimeSpan InactivityThreshold { get; set; } = TimeSpan.FromMinutes(15);
/// <summary>Logging callback (defaults to WTelegram.Helpers.Log ; can be null for performance)</summary>
public Action<int, string> Log { get; set; } = Helpers.Log;
/// <summary>Current set of update states (for saving and later resume)</summary>
public ImmutableDictionary<long, MBoxState> State
{
get
{
_sem.Wait();
try { return _local.ToImmutableDictionary(); }
finally { _sem.Release(); }
}
}
[System.Diagnostics.CodeAnalysis.SuppressMessage("Style", "IDE1006")]
public sealed class MBoxState { public int pts { get; set; } public long access_hash { get; set; } }
private readonly Client _client;
private readonly Func<Update, Task> _onUpdate;
private readonly UserChatCollector _onCollect;
private readonly bool _reentrant;
private readonly SemaphoreSlim _sem = new(1);
private readonly Extensions.CollectorPeer _collector;
private readonly List<(Update update, UpdatesBase updates, bool own, DateTime stamp)> _pending = [];
private readonly Dictionary<long, MBoxState> _local; // -2 for seq/date, -1 for qts, 0 for common pts, >0 for channel pts
private const int L_SEQ = -2, L_QTS = -1, L_PTS = 0;
private const long UndefinedSeqDate = 3155378975999999999L; // DateTime.MaxValue.Ticks
private static readonly TimeSpan HalfSec = new(TimeSpan.TicksPerSecond / 2);
private Task _recoveringGaps;
private DateTime _lastUpdateStamp = DateTime.UtcNow;
/// <summary>Manager ensuring that you receive Telegram updates in correct order, without missing any</summary>
/// <param name="client">the WTelegram Client to manage</param>
/// <param name="onUpdate">Event to be called on sequential individual update</param>
/// <param name="state">(optional) Resume session by recovering all updates that occured since this state</param>
/// <param name="collector">Custom users/chats collector. By default, those are collected in properties Users/Chats</param>
/// <param name="reentrant"><see langword="true"/> if your <paramref name="onUpdate"/> method can be called again even when last async call didn't return yet</param>
public UpdateManager(Client client, Func<Update, Task> onUpdate, IDictionary<long, MBoxState> state = null, UserChatCollector collector = null, bool reentrant = false)
{
_client = client;
_onUpdate = onUpdate;
if (collector != null)
_onCollect = collector;
else
{
_collector = new() { _users = Users = [], _chats = Chats = [] };
_onCollect = _collector.UserOrChat;
}
if (state == null)
_local = new() { [L_SEQ] = new() { access_hash = UndefinedSeqDate }, [L_QTS] = new(), [L_PTS] = new() };
else
_local = state as Dictionary<long, MBoxState> ?? new Dictionary<long, MBoxState>(state);
_reentrant = reentrant;
client.OnOther += OnOther;
client.OnUpdates += u => OnUpdates(u, false);
client.OnOwnUpdates += u => OnUpdates(u, true);
}
private async Task OnOther(IObject obj)
{
switch (obj)
{
case Pong when DateTime.UtcNow - _lastUpdateStamp > InactivityThreshold:
if (_local[L_PTS].pts != 0) await ResyncState();
break;
case User user when user.flags.HasFlag(User.Flags.self):
if (Users != null) Users[user.id] = user;
goto newSession;
case NewSessionCreated when _client.User != null:
newSession:
if (_local[L_PTS].pts != 0) await ResyncState();
else await ResyncState(await _client.Updates_GetState());
break;
case Updates_State state:
await ResyncState(state);
break;
}
}
private async Task ResyncState(Updates_State state = null)
{
state ??= new() { qts = int.MaxValue };
await _sem.WaitAsync();
try
{
var local = _local[L_PTS];
Log?.Invoke(2, $"Got Updates_State {local.pts}->{state.pts}, date={new DateTime(_local[L_SEQ].access_hash, DateTimeKind.Utc)}->{state.date}, seq={_local[L_SEQ].pts}->{state.seq}");
if (local.pts == 0 || local.pts >= state.pts && _local[L_SEQ].pts >= state.seq && _local[L_QTS].pts >= state.qts)
await HandleDifference(null, null, state, null);
else if (await GetDifference(L_PTS, state.pts, local))
await ApplyFilledGaps();
}
finally { _sem.Release(); }
}
private async Task OnUpdates(UpdatesBase updates, bool own)
{
RaiseCollect(updates.Users, updates.Chats);
await _sem.WaitAsync();
try
{
await HandleUpdates(updates, own);
}
finally { _sem.Release(); }
}
private async Task HandleUpdates(UpdatesBase updates, bool own)
{
var now = _lastUpdateStamp = DateTime.UtcNow;
var updateList = updates.UpdateList;
if (updates is UpdateShortSentMessage sent)
updateList = new[] { new UpdateNewMessage { pts = sent.pts, pts_count = sent.pts_count, message = new Message {
flags = (Message.Flags)sent.flags,
id = sent.id, date = sent.date, entities = sent.entities, media = sent.media, ttl_period = sent.ttl_period,
} } };
else if (Users != null)
if (updates is UpdateShortMessage usm && !Users.ContainsKey(usm.user_id))
(await _client.Updates_GetDifference(usm.pts - usm.pts_count, usm.date, 0)).UserOrChat(_collector);
else if (updates is UpdateShortChatMessage uscm && (!Users.ContainsKey(uscm.from_id) || !Chats.ContainsKey(uscm.chat_id)))
(await _client.Updates_GetDifference(uscm.pts - uscm.pts_count, uscm.date, 0)).UserOrChat(_collector);
bool ptsChanged = false, gotUPts = false;
int seq = 0;
try
{
if (updates is UpdatesTooLong)
{
var local_pts = _local[L_PTS];
ptsChanged = await GetDifference(L_PTS, local_pts.pts, local_pts);
return;
}
foreach (var update in updateList)
{
if (update == null) continue;
var (mbox_id, pts, pts_count) = update.GetMBox();
if (pts == 0) (mbox_id, pts, pts_count) = updates.GetMBox();
MBoxState local = null;
if (pts != 0)
{
local = _local.GetOrCreate(mbox_id);
if (mbox_id > 0 && local.access_hash == 0)
if (updates.Chats.TryGetValue(mbox_id, out var chat) && chat is Channel channel && !channel.flags.HasFlag(Channel.Flags.min))
local.access_hash = channel.access_hash;
var diff = local.pts + pts_count - pts;
if (diff > 0 && pts_count != 0) // the update was already applied, and must be ignored.
{
Log?.Invoke(1, $"({mbox_id,10}, {local.pts,6}+{pts_count}->{pts,-6}) {update,-30} ignored {ExtendedLog(update)}");
continue;
}
if (diff < 0) // there's an update gap that must be filled.
{
Log?.Invoke(1, $"({mbox_id,10}, {local.pts,6}+{pts_count}->{pts,-6}) {update,-30} pending {ExtendedLog(update)}");
_pending.Add((update, updates, own, now + HalfSec));
_recoveringGaps ??= Task.Delay(HalfSec).ContinueWith(RecoverGaps);
continue;
}
// the update can be applied.
}
Log?.Invoke(1, $"({mbox_id,10}, {local?.pts,6}+{pts_count}->{pts,-6}) {update,-30} applied {ExtendedLog(update)}");
if (mbox_id == L_SEQ && update is UpdatePtsChanged) gotUPts = true;
if (pts_count > 0 && pts != 0)
{
ptsChanged = true;
if (mbox_id == L_SEQ)
seq = pts;
else if (pts_count != 0)
local.pts = pts;
}
await RaiseUpdate(update, own);
}
}
finally
{
if (seq > 0) // update local_seq & date after the updates were applied
{
var local_seq = _local[L_SEQ];
local_seq.pts = seq;
local_seq.access_hash = updates.Date.Ticks;
}
if (gotUPts) ptsChanged = await GetDifference(L_PTS, _local[L_PTS].pts = 1, _local[L_PTS]);
if (ptsChanged) await ApplyFilledGaps();
}
}
private async Task<int> ApplyFilledGaps()
{
if (_pending.Count != 0) Log?.Invoke(2, $"Trying to apply {_pending.Count} pending updates after filled gaps");
int removed = 0;
for (int i = 0; i < _pending.Count; )
{
var (update, updates, own, _) = _pending[i];
var (mbox_id, pts, pts_count) = update.GetMBox();
if (pts == 0) (mbox_id, pts, pts_count) = updates.GetMBox();
var local = _local[mbox_id];
var diff = local.pts + pts_count - pts;
if (diff < 0)
++i; // there's still a gap, skip it
else
{
_pending.RemoveAt(i);
++removed;
if (diff > 0) // the update was already applied, remove & ignore
Log?.Invoke(1, $"({mbox_id,10}, {local.pts,6}+{pts_count}->{pts,-6}) {update,-30} obsolete {ExtendedLog(update)}");
else
{
Log?.Invoke(1, $"({mbox_id,10}, {local.pts,6}+{pts_count}->{pts,-6}) {update,-30} applied now {ExtendedLog(update)}");
// the update can be applied.
local.pts = pts;
if (mbox_id == L_SEQ) local.access_hash = updates.Date.Ticks;
await RaiseUpdate(update, own);
i = 0; // rescan pending updates from start
}
}
}
return removed;
}
private async Task RecoverGaps(Task _) // https://corefork.telegram.org/api/updates#recovering-gaps
{
await _sem.WaitAsync();
try
{
_recoveringGaps = null;
if (_pending.Count == 0) return;
Log?.Invoke(2, $"Trying to recover gaps for {_pending.Count} pending updates");
var now = DateTime.UtcNow;
while (_pending.Count != 0)
{
var (update, updates, own, stamp) = _pending[0];
if (stamp > now)
{
_recoveringGaps = Task.Delay(stamp - now).ContinueWith(RecoverGaps);
return;
}
var (mbox_id, pts, pts_count) = update.GetMBox();
if (pts == 0) (mbox_id, pts, pts_count) = updates.GetMBox();
var local = _local[mbox_id];
bool getDiffSuccess = false;
if (local.pts == 0)
Log?.Invoke(2, $"({mbox_id,10}, new +{pts_count}->{pts,-6}) {update,-30} First appearance of MBox {ExtendedLog(update)}");
else if (local.access_hash == -1) // no valid access_hash for this channel, so just raise this update
Log?.Invoke(3, $"({mbox_id,10}, {local.pts,6}+{pts_count}->{pts,-6}) {update,-30} No access_hash to recover {ExtendedLog(update)}");
else
{
Log?.Invoke(1, $"({mbox_id,10}, {local.pts,6}+{pts_count}->{pts,-6}) {update,-30} Calling GetDifference {ExtendedLog(update)}");
getDiffSuccess = await GetDifference(mbox_id, pts - pts_count, local);
}
if (!getDiffSuccess) // no getDiff => just raise received pending updates in order
{
local.pts = pts - pts_count;
for (int i = 1; i < _pending.Count; i++) // find lowest pending pts-pts_count for this mbox
{
var pending = _pending[i];
var mbox = pending.update.GetMBox();
if (mbox.pts == 0) mbox = pending.updates.GetMBox();
if (mbox.mbox_id == mbox_id) local.pts = Math.Min(local.pts, mbox.pts - mbox.pts_count);
}
}
if (await ApplyFilledGaps() == 0)
{
Log?.Invoke(3, $"({mbox_id,10}, {local.pts,6}+{pts_count}->{pts,-6}) {update,-30} forcibly removed!");
_pending.RemoveAt(0);
local.pts = pts;
await RaiseUpdate(update, own);
}
}
}
finally { _sem.Release(); }
}
private async Task<InputChannel> GetInputChannel(long channel_id, MBoxState local)
{
if (channel_id <= 0) return null;
if (local?.access_hash is not null and not 0)
return new InputChannel(channel_id, local.access_hash);
var inputChannel = new InputChannel(channel_id, 0);
try
{
var mc = await _client.Channels_GetChannels(inputChannel);
if (mc.chats.TryGetValue(channel_id, out var chat) && chat is Channel channel)
inputChannel.access_hash = channel.access_hash;
}
catch (Exception)
{
inputChannel.access_hash = -1; // no valid access_hash available
}
local ??= _local[channel_id] = new();
local.access_hash = inputChannel.access_hash;
return inputChannel;
}
private async Task<bool> GetDifference(long mbox_id, int expected_pts, MBoxState local)
{
try
{
moreDiffNeeded:
if (mbox_id <= 0)
{
Log?.Invoke(0, $"Local states {string.Join(" ", _local.Select(l => $"{l.Key}:{l.Value.pts}"))}");
var local_seq = _local[L_SEQ];
var diff = await _client.Updates_GetDifference(_local[L_PTS].pts, qts: _local[L_QTS].pts,
date: new DateTime(local_seq.access_hash, DateTimeKind.Utc));
Log?.Invoke(1, $"{diff.GetType().Name[8..]}: {diff.NewMessages.Length} msg, {diff.OtherUpdates.Length} upd, pts={diff.State?.pts}, date={diff.State?.date}, seq={diff.State?.seq}, msgIDs={string.Join(" ", diff.NewMessages.Select(m => m.ID))}");
switch (diff)
{
case Updates_Difference ud:
await HandleDifference(ud.new_messages, ud.new_encrypted_messages, ud.state,
new UpdatesCombined { updates = ud.other_updates, users = ud.users, chats = ud.chats,
date = ud.state.date, seq_start = local_seq.pts + 1, seq = ud.state.seq });
break;
case Updates_DifferenceSlice uds:
await HandleDifference(uds.new_messages, uds.new_encrypted_messages, uds.intermediate_state,
new UpdatesCombined { updates = uds.other_updates, users = uds.users, chats = uds.chats,
date = uds.intermediate_state.date, seq_start = local_seq.pts + 1, seq = uds.intermediate_state.seq });
goto moreDiffNeeded;
case Updates_DifferenceTooLong udtl:
_local[L_PTS].pts = udtl.pts;
goto moreDiffNeeded;
case Updates_DifferenceEmpty ude:
local_seq.pts = ude.seq;
local_seq.access_hash = ude.date.Ticks;
_lastUpdateStamp = DateTime.UtcNow;
break;
}
}
else
{
var channel = await GetInputChannel(mbox_id, local);
if (channel.access_hash == -1) return false;
try
{
var diff = await _client.Updates_GetChannelDifference(channel, null, local.pts);
Log?.Invoke(1, $"{diff.GetType().Name[8..]}({mbox_id}): {diff.NewMessages.Length} msg, {diff.OtherUpdates.Length} upd, pts={diff.Pts}, msgIDs={string.Join(" ", diff.NewMessages.Select(m => m.ID))}");
switch (diff)
{
case Updates_ChannelDifference ucd:
local.pts = ucd.pts;
await HandleDifference(ucd.new_messages, null, null,
new UpdatesCombined { updates = ucd.other_updates, users = ucd.users, chats = ucd.chats });
if (!ucd.flags.HasFlag(Updates_ChannelDifference.Flags.final)) goto moreDiffNeeded;
break;
case Updates_ChannelDifferenceTooLong ucdtl:
if (ucdtl.dialog is Dialog dialog) local.pts = dialog.pts;
await HandleDifference(ucdtl.messages, null, null,
new UpdatesCombined { updates = null, users = ucdtl.users, chats = ucdtl.chats });
break;
case Updates_ChannelDifferenceEmpty ucde:
local.pts = ucde.pts;
break;
}
}
catch (RpcException ex) when (ex.Message is "CHANNEL_PRIVATE" or "CHANNEL_INVALID")
{
local.access_hash = -1; // access_hash is no longer valid
throw;
}
}
return true;
}
catch (Exception ex)
{
Log?.Invoke(4, $"GetDifference({mbox_id}, {local.pts}->{expected_pts}) raised {ex}");
}
finally
{
if (local.pts < expected_pts) local.pts = expected_pts;
}
return false;
}
private async Task HandleDifference(MessageBase[] new_messages, EncryptedMessageBase[] enc_messages, Updates_State state, UpdatesCombined updates)
{
if (updates != null)
RaiseCollect(updates.users, updates.chats);
try
{
if (updates?.updates != null)
for (int i = 0; i < updates.updates.Length; i++)
{
var update = updates.updates[i];
if (update is UpdateMessageID or UpdateStoryID)
{
await RaiseUpdate(update, false);
updates.updates[i] = null;
}
}
if (new_messages?.Length > 0)
{
var update = state == null ? new UpdateNewChannelMessage() : new UpdateNewMessage() { pts = state.pts, pts_count = 1 };
foreach (var msg in new_messages)
{
update.message = msg;
await RaiseUpdate(update, false);
}
}
if (enc_messages?.Length > 0)
{
var update = new UpdateNewEncryptedMessage();
if (state != null) update.qts = state.qts;
foreach (var msg in enc_messages)
{
update.message = msg;
await RaiseUpdate(update, false);
}
}
if (updates?.updates != null)
await HandleUpdates(updates, false);
}
finally
{
if (state != null)
{
_local[L_PTS].pts = state.pts;
_local[L_QTS].pts = state.qts;
var local_seq = _local[L_SEQ];
local_seq.pts = state.seq;
local_seq.access_hash = state.date.Ticks;
}
}
}
private void RaiseCollect(Dictionary<long, User> users, Dictionary<long, ChatBase> chats)
{
try
{
foreach (var chat in chats.Values)
if (chat is Channel channel && !channel.flags.HasFlag(Channel.Flags.min))
if (_local.TryGetValue(channel.id, out var local))
local.access_hash = channel.access_hash;
_onCollect(users, chats);
}
catch (Exception ex)
{
Log?.Invoke(4, $"onCollect({users?.Count},{chats?.Count}) raised {ex}");
}
}
private async Task RaiseUpdate(Update update, bool own)
{
if (own) return;
try
{
var task = _onUpdate(update);
if (!_reentrant) await task;
}
catch (Exception ex)
{
Log?.Invoke(4, $"onUpdate({update?.GetType().Name}) raised {ex}");
}
}
private static string ExtendedLog(Update update) => update switch
{
UpdateNewMessage unm => $"| msgID={unm.message.ID}",
UpdateEditMessage uem => $"| msgID={uem.message.ID}",
UpdateDeleteMessages udm => $"| count={udm.messages.Length}",
_ => null
};
/// <summary>Load latest dialogs states, checking for missing updates</summary>
/// <param name="dialogs">structure returned by Messages_Get*Dialogs calls</param>
/// <param name="fullLoadNewChans">Dangerous! Load full history of unknown new channels as updates</param>
public async Task LoadDialogs(Messages_Dialogs dialogs, bool fullLoadNewChans = false)
{
await _sem.WaitAsync();
try
{
foreach (var dialog in dialogs.dialogs.OfType<Dialog>())
{
if (dialog.peer is not PeerChannel pc) continue;
var local = _local.GetOrCreate(pc.channel_id);
if (dialogs.chats.TryGetValue(pc.channel_id, out var chat) && chat is Channel channel)
local.access_hash = channel.access_hash;
if (local.pts is 0)
if (fullLoadNewChans) local.pts = 1;
else local.pts = dialog.pts;
if (local.pts < dialog.pts)
{
Log?.Invoke(1, $"LoadDialogs {pc.channel_id} has {local.pts} < {dialog.pts} ({dialog.folder_id})");
await GetDifference(pc.channel_id, dialog.pts, local);
}
}
}
finally { _sem.Release(); }
}
/// <summary>Save the current state of the manager to JSON file</summary>
/// <param name="statePath">File path to write</param>
/// <remarks>Note: This does not save the the content of collected Users/Chats dictionaries</remarks>
public void SaveState(string statePath)
=> System.IO.File.WriteAllText(statePath, System.Text.Json.JsonSerializer.Serialize(State, Helpers.JsonOptions));
public static Dictionary<long, MBoxState> LoadState(string statePath) => !System.IO.File.Exists(statePath) ? null
: System.Text.Json.JsonSerializer.Deserialize<Dictionary<long, MBoxState>>(System.IO.File.ReadAllText(statePath), Helpers.JsonOptions);
/// <summary>returns a <see cref="User"/> or <see cref="ChatBase"/> for the given Peer</summary>
public IPeerInfo UserOrChat(Peer peer) => peer?.UserOrChat(Users, Chats);
}
}
namespace TL
{
using WTelegram;
[EditorBrowsable(EditorBrowsableState.Never)]
public static class UpdateManagerExtensions
{
/// <summary>Manager ensuring that you receive Telegram updates in correct order, without missing any</summary>
/// <param name="onUpdate">Event to be called on sequential individual update</param>
/// <param name="statePath">Resume session by recovering all updates that occured since the state saved in this file</param>
/// <param name="collector">Custom users/chats collector. By default, those are collected in properties Users/Chats</param>
/// <param name="reentrant"><see langword="true"/> if your <paramref name="onUpdate"/> method can be called again even when last async call didn't return yet</param>
public static UpdateManager WithUpdateManager(this Client client, Func<TL.Update, Task> onUpdate, string statePath, UserChatCollector collector = null, bool reentrant = false)
=> new(client, onUpdate, UpdateManager.LoadState(statePath), collector, reentrant);
/// <summary>Manager ensuring that you receive Telegram updates in correct order, without missing any</summary>
/// <param name="onUpdate">Event to be called on sequential individual update</param>
/// <param name="state">(optional) Resume session by recovering all updates that occured since this state</param>
/// <param name="collector">Custom users/chats collector. By default, those are collected in properties Users/Chats</param>
/// <param name="reentrant"><see langword="true"/> if your <paramref name="onUpdate"/> method can be called again even when last async call didn't return yet</param>
public static UpdateManager WithUpdateManager(this Client client, Func<TL.Update, Task> onUpdate, IDictionary<long, UpdateManager.MBoxState> state = null, UserChatCollector collector = null, bool reentrant = false)
=> new(client, onUpdate, state, collector, reentrant);
}
}

View file

@ -51,6 +51,7 @@
<PackageReference Include="IndexRange" Version="1.0.2" />
<PackageReference Include="System.Memory" Version="4.5.5" />
<PackageReference Include="System.Text.Json" Version="6.0.5" />
<PackageReference Include="System.Collections.Immutable" Version="6.0.0" />
</ItemGroup>
</Project>