FloodRetryThreshold, ProgressCallback, don't call Updates_GetState on alt DCs upon reconnection

This commit is contained in:
Wizou 2021-11-12 20:50:39 +01:00
parent c0b10e82f4
commit 7ed21d5af4

View file

@ -31,10 +31,15 @@ namespace WTelegram
public Config TLConfig { get; private set; }
/// <summary>Number of automatic reconnections on connection/reactor failure</summary>
public int MaxAutoReconnects { get; set; } = 5;
/// <summary>Number of seconds under which an error 420 FLOOD_WAIT_X will not be raised and your request will instead be auto-retried after the delay</summary>
public int FloodRetryThreshold { get; set; } = 60;
/// <summary>Is this Client instance the main or a secondary DC session</summary>
public bool IsMainDC => (_dcSession?.DataCenter?.id ?? 0) == _session.MainDC;
/// <summary>Is this Client currently disconnected?</summary>
public bool Disconnected => _tcpClient != null && !(_tcpClient.Client?.Connected ?? false);
/// <summary>Used to indicate progression of file download/upload</summary>
/// <param name="totalSize">total size of file in bytes, or 0 if unknown</param>
public delegate void ProgressCallback(long transmitted, long totalSize);
private readonly Func<string, string> _config;
private readonly int _apiId;
@ -421,8 +426,11 @@ namespace WTelegram
_bareRequest = 0;
}
// TODO: implement an Updates gaps handling system? https://core.telegram.org/api/updates
var udpatesState = await this.Updates_GetState(); // this call reenables incoming Updates
OnUpdate(udpatesState);
if (IsMainDC)
{
var udpatesState = await this.Updates_GetState(); // this call reenables incoming Updates
OnUpdate(udpatesState);
}
}
else
throw;
@ -772,7 +780,7 @@ namespace WTelegram
else if (rpcError.error_code == 420 && ((number = rpcError.error_message.IndexOf("_WAIT_")) > 0))
{
number = int.Parse(rpcError.error_message[(number + 6)..]);
if (number <= 60)
if (number <= FloodRetryThreshold)
{
await Task.Delay(number * 1000);
goto retry;
@ -1019,23 +1027,25 @@ namespace WTelegram
return user;
}
#region TL-Helpers
#region TL-Helpers
/// <summary>Helper function to upload a file to Telegram</summary>
/// <param name="pathname">Path to the file to upload</param>
/// <param name="progress">(optional) Callback for tracking the progression of the transfer</param>
/// <returns>an <see cref="InputFile"/> or <see cref="InputFileBig"/> than can be used in various requests</returns>
public Task<InputFileBase> UploadFileAsync(string pathname)
=> UploadFileAsync(File.OpenRead(pathname), Path.GetFileName(pathname));
public Task<InputFileBase> UploadFileAsync(string pathname, ProgressCallback progress = null)
=> UploadFileAsync(File.OpenRead(pathname), Path.GetFileName(pathname), progress);
/// <summary>Helper function to upload a file to Telegram</summary>
/// <param name="stream">Content of the file to upload</param>
/// <param name="filename">Name of the file</param>
/// <param name="progress">(optional) Callback for tracking the progression of the transfer</param>
/// <returns>an <see cref="InputFile"/> or <see cref="InputFileBig"/> than can be used in various requests</returns>
public async Task<InputFileBase> UploadFileAsync(Stream stream, string filename)
public async Task<InputFileBase> UploadFileAsync(Stream stream, string filename, ProgressCallback progress = null)
{
using var md5 = MD5.Create();
using (stream)
{
long length = stream.Length;
long transmitted = 0, length = stream.Length;
var isBig = length >= 10 * 1024 * 1024;
int file_total_parts = (int)((length - 1) / FilePartSize) + 1;
long file_id = Helpers.RandomLong();
@ -1047,11 +1057,11 @@ namespace WTelegram
var bytes = new byte[Math.Min(FilePartSize, bytesLeft)];
read = await FullReadAsync(stream, bytes, bytes.Length);
await _parallelTransfers.WaitAsync();
bytesLeft -= read;
var task = SavePart(file_part, bytes);
lock (tasks) tasks[file_part] = task;
if (!isBig)
md5.TransformBlock(bytes, 0, read, null, 0);
bytesLeft -= read;
if (read < FilePartSize && bytesLeft != 0) throw new ApplicationException($"Failed to fully read stream ({read},{bytesLeft})");
async Task SavePart(int file_part, byte[] bytes)
@ -1062,7 +1072,8 @@ namespace WTelegram
await this.Upload_SaveBigFilePart(file_id, file_part, file_total_parts, bytes);
else
await this.Upload_SaveFilePart(file_id, file_part, bytes);
lock (tasks) tasks.Remove(file_part);
lock (tasks) { transmitted += bytes.Length; tasks.Remove(file_part); }
progress?.Invoke(transmitted, length);
}
catch (Exception)
{
@ -1175,23 +1186,25 @@ namespace WTelegram
/// <param name="photo">The photo to download</param>
/// <param name="outputStream">Stream to write the file content to. This method does not close/dispose the stream</param>
/// <param name="photoSize">A specific size/version of the photo, or <see langword="null"/> to download the largest version of the photo</param>
/// <param name="progress">(optional) Callback for tracking the progression of the transfer</param>
/// <returns>The file type of the photo</returns>
public async Task<Storage_FileType> DownloadFileAsync(Photo photo, Stream outputStream, PhotoSizeBase photoSize = null)
public async Task<Storage_FileType> DownloadFileAsync(Photo photo, Stream outputStream, PhotoSizeBase photoSize = null, ProgressCallback progress = null)
{
photoSize ??= photo.LargestPhotoSize;
var fileLocation = photo.ToFileLocation(photoSize);
return await DownloadFileAsync(fileLocation, outputStream, photo.dc_id, photoSize.FileSize);
return await DownloadFileAsync(fileLocation, outputStream, photo.dc_id, photoSize.FileSize, progress);
}
/// <summary>Download a document from Telegram into the outputStream</summary>
/// <param name="document">The document to download</param>
/// <param name="outputStream">Stream to write the file content to. This method does not close/dispose the stream</param>
/// <param name="thumbSize">A specific size/version of the document thumbnail to download, or <see langword="null"/> to download the document itself</param>
/// <param name="progress">(optional) Callback for tracking the progression of the transfer</param>
/// <returns>MIME type of the document/thumbnail</returns>
public async Task<string> DownloadFileAsync(Document document, Stream outputStream, PhotoSizeBase thumbSize = null)
public async Task<string> DownloadFileAsync(Document document, Stream outputStream, PhotoSizeBase thumbSize = null, ProgressCallback progress = null)
{
var fileLocation = document.ToFileLocation(thumbSize);
var fileType = await DownloadFileAsync(fileLocation, outputStream, document.dc_id, thumbSize?.FileSize ?? document.size);
var fileType = await DownloadFileAsync(fileLocation, outputStream, document.dc_id, thumbSize?.FileSize ?? document.size, progress);
return thumbSize == null ? document.mime_type : "image/" + fileType;
}
@ -1200,14 +1213,16 @@ namespace WTelegram
/// <param name="outputStream">Stream to write file content to. This method does not close/dispose the stream</param>
/// <param name="fileDC">(optional) DC on which the file is stored</param>
/// <param name="fileSize">(optional) Expected file size</param>
/// <param name="progress">(optional) Callback for tracking the progression of the transfer</param>
/// <returns>The file type</returns>
public async Task<Storage_FileType> DownloadFileAsync(InputFileLocationBase fileLocation, Stream outputStream, int fileDC = 0, int fileSize = 0)
public async Task<Storage_FileType> DownloadFileAsync(InputFileLocationBase fileLocation, Stream outputStream, int fileDC = 0, int fileSize = 0, ProgressCallback progress = null)
{
Storage_FileType fileType = Storage_FileType.unknown;
var client = fileDC == 0 ? this : await GetClientForDC(fileDC, true);
using var writeSem = new SemaphoreSlim(1);
long streamStartPos = outputStream.Position;
int fileOffset = 0, maxOffsetSeen = 0;
long transmitted = 0;
var tasks = new Dictionary<int, Task>();
bool abort = false;
while (!abort)
@ -1267,6 +1282,7 @@ namespace WTelegram
}
await outputStream.WriteAsync(fileData.bytes, 0, fileData.bytes.Length);
maxOffsetSeen = Math.Max(maxOffsetSeen, offset + fileData.bytes.Length);
transmitted += fileData.bytes.Length;
}
catch (Exception)
{
@ -1276,6 +1292,7 @@ namespace WTelegram
finally
{
writeSem.Release();
progress?.Invoke(transmitted, fileSize);
}
}
lock (tasks) tasks.Remove(offset);