Improved HTTP mode reliability

This commit is contained in:
Wizou 2024-11-18 13:44:32 +01:00
parent 9ad6220527
commit fc441121a3

View file

@ -223,7 +223,7 @@ namespace WTelegram
{
try
{
if (CheckMsgsToAck() is MsgsAck msgsAck)
if (_httpClient == null && CheckMsgsToAck() is MsgsAck msgsAck)
await SendAsync(msgsAck, false).WaitAsync(1000).ConfigureAwait(false);
}
catch { }
@ -885,7 +885,10 @@ namespace WTelegram
#endif
}
else if (_httpClient != null)
{
Helpers.Log(2, $"Using HTTP Mode");
_reactorTask = Task.CompletedTask;
}
else
{
endpoint = _dcSession?.EndPoint ?? GetDefaultEndpoint(out int defaultDc);
@ -1434,7 +1437,9 @@ namespace WTelegram
return;
}
}
await _sendSemaphore.WaitAsync(_cts.Token);
Task receiveTask = null;
var sem = _sendSemaphore;
await sem.WaitAsync(_cts.Token);
try
{
using var memStream = new MemoryStream(1024);
@ -1494,32 +1499,30 @@ namespace WTelegram
#if OBFUSCATION
_sendCtr?.EncryptDecrypt(buffer, frameLength);
#endif
var sending = SendFrame(buffer, frameLength);
_lastSentMsg = msg;
await sending;
}
finally
{
_sendSemaphore.Release();
}
}
private async Task SendFrame(byte[] buffer, int frameLength)
{
if (_networkStream != null)
await _networkStream.WriteAsync(buffer, 0, frameLength);
else
receiveTask = SendReceiveHttp(buffer, frameLength);
_lastSentMsg = msg;
}
finally
{
sem.Release();
}
if (receiveTask != null) await receiveTask;
}
private async Task SendReceiveHttp(byte[] buffer, int frameLength)
{
var endpoint = _dcSession?.EndPoint ?? GetDefaultEndpoint(out _);
var content = new ByteArrayContent(buffer, 4, frameLength - 4);
var response = await _httpClient.PostAsync($"http://{endpoint}/api", content);
var response = await _httpClient.PostAsync($"http://{endpoint}/api", content, _cts.Token);
if (response.StatusCode != HttpStatusCode.OK)
throw new RpcException((int)response.StatusCode, TransportError((int)response.StatusCode));
var data = await response.Content.ReadAsByteArrayAsync();
var obj = ReadFrame(data, data.Length);
if (obj != null)
_ = HandleMessageAsync(obj);
}
await HandleMessageAsync(obj);
}
/// <summary>Long poll on HTTP connections</summary>
@ -1540,9 +1543,9 @@ namespace WTelegram
{
if (_bareRpc != null) throw new WTException("A bare request is already undergoing");
retry:
_bareRpc = new Rpc { type = typeof(T) };
var bareRpc = _bareRpc = new Rpc { type = typeof(T) };
await SendAsync(request, false, _bareRpc);
var result = await _bareRpc.Task;
var result = await bareRpc.Task;
if (result is ReactorError) goto retry;
return (T)result;
}
@ -1559,12 +1562,8 @@ namespace WTelegram
retry:
var rpc = new Rpc { type = typeof(T) };
await SendAsync(query, true, rpc);
if (_httpClient != null && !rpc.Task.IsCompleted)
{
// usually happens when a batch of unrelated messages were serialized before in the previous MsgContainer reply
await HttpWait(_httpWait); // wait a bit more
if (!rpc.Task.IsCompleted) rpc.tcs.TrySetException(new RpcException(417, "Missing RPC response via HTTP"));
}
while (_httpClient != null && !rpc.Task.IsCompleted)
await HttpWait(_httpWait); // need to wait a bit more in some case
var result = await rpc.Task;
switch (result)