diff --git a/src/Client.cs b/src/Client.cs index 8330dfc..28207cf 100644 --- a/src/Client.cs +++ b/src/Client.cs @@ -223,7 +223,7 @@ namespace WTelegram { try { - if (CheckMsgsToAck() is MsgsAck msgsAck) + if (_httpClient == null && CheckMsgsToAck() is MsgsAck msgsAck) await SendAsync(msgsAck, false).WaitAsync(1000).ConfigureAwait(false); } catch { } @@ -885,7 +885,10 @@ namespace WTelegram #endif } else if (_httpClient != null) + { + Helpers.Log(2, $"Using HTTP Mode"); _reactorTask = Task.CompletedTask; + } else { endpoint = _dcSession?.EndPoint ?? GetDefaultEndpoint(out int defaultDc); @@ -1434,7 +1437,9 @@ namespace WTelegram return; } } - await _sendSemaphore.WaitAsync(_cts.Token); + Task receiveTask = null; + var sem = _sendSemaphore; + await sem.WaitAsync(_cts.Token); try { using var memStream = new MemoryStream(1024); @@ -1494,32 +1499,30 @@ namespace WTelegram #if OBFUSCATION _sendCtr?.EncryptDecrypt(buffer, frameLength); #endif - var sending = SendFrame(buffer, frameLength); + if (_networkStream != null) + await _networkStream.WriteAsync(buffer, 0, frameLength); + else + receiveTask = SendReceiveHttp(buffer, frameLength); _lastSentMsg = msg; - await sending; } finally { - _sendSemaphore.Release(); + sem.Release(); } + if (receiveTask != null) await receiveTask; } - private async Task SendFrame(byte[] buffer, int frameLength) + private async Task SendReceiveHttp(byte[] buffer, int frameLength) { - if (_networkStream != null) - await _networkStream.WriteAsync(buffer, 0, frameLength); - else - { - var endpoint = _dcSession?.EndPoint ?? GetDefaultEndpoint(out _); - var content = new ByteArrayContent(buffer, 4, frameLength - 4); - var response = await _httpClient.PostAsync($"http://{endpoint}/api", content); - if (response.StatusCode != HttpStatusCode.OK) - throw new RpcException((int)response.StatusCode, TransportError((int)response.StatusCode)); - var data = await response.Content.ReadAsByteArrayAsync(); - var obj = ReadFrame(data, data.Length); - if (obj != null) - _ = HandleMessageAsync(obj); - } + var endpoint = _dcSession?.EndPoint ?? GetDefaultEndpoint(out _); + var content = new ByteArrayContent(buffer, 4, frameLength - 4); + var response = await _httpClient.PostAsync($"http://{endpoint}/api", content, _cts.Token); + if (response.StatusCode != HttpStatusCode.OK) + throw new RpcException((int)response.StatusCode, TransportError((int)response.StatusCode)); + var data = await response.Content.ReadAsByteArrayAsync(); + var obj = ReadFrame(data, data.Length); + if (obj != null) + await HandleMessageAsync(obj); } /// Long poll on HTTP connections @@ -1540,9 +1543,9 @@ namespace WTelegram { if (_bareRpc != null) throw new WTException("A bare request is already undergoing"); retry: - _bareRpc = new Rpc { type = typeof(T) }; + var bareRpc = _bareRpc = new Rpc { type = typeof(T) }; await SendAsync(request, false, _bareRpc); - var result = await _bareRpc.Task; + var result = await bareRpc.Task; if (result is ReactorError) goto retry; return (T)result; } @@ -1559,12 +1562,8 @@ namespace WTelegram retry: var rpc = new Rpc { type = typeof(T) }; await SendAsync(query, true, rpc); - if (_httpClient != null && !rpc.Task.IsCompleted) - { - // usually happens when a batch of unrelated messages were serialized before in the previous MsgContainer reply - await HttpWait(_httpWait); // wait a bit more - if (!rpc.Task.IsCompleted) rpc.tcs.TrySetException(new RpcException(417, "Missing RPC response via HTTP")); - } + while (_httpClient != null && !rpc.Task.IsCompleted) + await HttpWait(_httpWait); // need to wait a bit more in some case var result = await rpc.Task; switch (result)