Improved HTTP support

This commit is contained in:
Wizou 2024-10-07 02:43:07 +02:00
parent a19db86c1d
commit 62c105959c
3 changed files with 60 additions and 29 deletions

View file

@ -159,7 +159,7 @@ public class MTProtoGenerator : IIncrementalGenerator
readTL.AppendLine($"r.{member.Name} = new Int256(reader);");
writeTl.AppendLine($"writer.Write({member.Name});");
break;
case "TL._Message[]":
case "System.Collections.Generic.List<TL._Message>":
readTL.AppendLine($"r.{member.Name} = reader.ReadTLRawVector<_Message>(0x5BB8E511);");
writeTl.AppendLine($"writer.WriteTLMessages({member.Name});");
break;
@ -179,7 +179,7 @@ public class MTProtoGenerator : IIncrementalGenerator
if (member.Type is IArrayTypeSymbol arrayType)
{
if (name is "FutureSalts")
readTL.AppendLine($"r.{member.Name} = reader.ReadTLRawVector<{memberType.Substring(0, memberType.Length - 2)}>(0x0949D9DC);");
readTL.AppendLine($"r.{member.Name} = reader.ReadTLRawVector<{memberType.Substring(0, memberType.Length - 2)}>(0x0949D9DC).ToArray();");
else
readTL.AppendLine($"r.{member.Name} = reader.ReadTLVector<{memberType.Substring(0, memberType.Length - 2)}>();");
writeTl.AppendLine($"writer.WriteTLVector({member.Name});");

View file

@ -67,6 +67,7 @@ namespace WTelegram
private TcpClient _tcpClient;
private Stream _networkStream;
private HttpClient _httpClient;
private HttpWait _httpWait;
private IObject _lastSentMsg;
private long _lastRecvMsgId;
private readonly List<long> _msgsToAck = [];
@ -192,16 +193,23 @@ namespace WTelegram
foreach (var rpc in _pendingRpcs.Values)
rpc.tcs.TrySetException(ex);
_sendSemaphore.Dispose();
_httpClient?.Dispose();
_networkStream = null;
if (IsMainDC) _session.Dispose();
GC.SuppressFinalize(this);
}
public void DisableUpdates(bool disable = true) => _dcSession.DisableUpdates(disable);
/// <summary>Enable connecting to Telegram via on-demand HTTP requests instead of permanent TCP connection</summary>
/// <param name="httpClient">HttpClient to use. Leave <see langword="null"/> for a default one</param>
public void HttpMode(HttpClient httpClient = null) => _httpClient = httpClient ?? new();
/// <param name="defaultHttpWait">Default HttpWait parameters for requests.<para>⚠️ Telegram servers don't support this correctly at the moment.</para>So leave <see langword="null"/> for the default 25 seconds long poll</param>
public void HttpMode(HttpClient httpClient = null, HttpWait defaultHttpWait = null)
{
if (_tcpClient != null) throw new InvalidOperationException("Cannot switch to HTTP after TCP connection");
_httpClient = httpClient ?? new();
_httpWait = defaultHttpWait;
}
/// <summary>Disconnect from Telegram <i>(shouldn't be needed in normal usage)</i></summary>
/// <param name="resetUser">Forget about logged-in user</param>
@ -560,10 +568,11 @@ namespace WTelegram
internal MsgContainer ReadMsgContainer(BinaryReader reader)
{
int count = reader.ReadInt32();
var array = new _Message[count];
var messages = new List<_Message>(count);
for (int i = 0; i < count; i++)
{
var msg = array[i] = new _Message(reader.ReadInt64(), reader.ReadInt32(), null) { bytes = reader.ReadInt32() };
var msg = new _Message(reader.ReadInt64(), reader.ReadInt32(), null) { bytes = reader.ReadInt32() };
messages.Add(msg);
if ((msg.seq_no & 1) != 0) lock (_msgsToAck) _msgsToAck.Add(msg.msg_id);
var pos = reader.BaseStream.Position;
try
@ -584,9 +593,9 @@ namespace WTelegram
{
Helpers.Log(4, "While deserializing vector<%Message>: " + ex.ToString());
}
reader.BaseStream.Position = pos + array[i].bytes;
reader.BaseStream.Position = pos + msg.bytes;
}
return new MsgContainer { messages = array };
return new MsgContainer { messages = messages };
}
private RpcResult ReadRpcResult(BinaryReader reader)
@ -1407,16 +1416,23 @@ namespace WTelegram
{
if (_reactorTask == null) throw new WTException("You must connect to Telegram first");
isContent &= _dcSession.AuthKeyID != 0;
(long msgId, int seqno) = NewMsgId(isContent);
var (msgId, seqno) = NewMsgId(isContent);
if (rpc != null)
lock (_pendingRpcs)
_pendingRpcs[rpc.msgId = msgId] = rpc;
if (isContent && CheckMsgsToAck() is MsgsAck msgsAck)
if (isContent)
{
var (ackId, ackSeqno) = NewMsgId(false);
var container = new MsgContainer { messages = [new(msgId, seqno, msg), new(ackId, ackSeqno, msgsAck)] };
await SendAsync(container, false);
return;
List<_Message> messages = null;
if (_httpWait != null && NewMsgId(false) is var (hwId, hwSeqno))
(messages ??= []).Add(new(hwId, hwSeqno, _httpWait));
if (CheckMsgsToAck() is MsgsAck msgsAck && NewMsgId(false) is var (ackId, ackSeqno))
(messages ??= []).Add(new(ackId, ackSeqno, msgsAck));
if (messages != null)
{
messages.Add(new(msgId, seqno, msg));
await SendAsync(new MsgContainer { messages = messages }, false);
return;
}
}
await _sendSemaphore.WaitAsync(_cts.Token);
try
@ -1490,7 +1506,7 @@ namespace WTelegram
private async Task SendFrame(byte[] buffer, int frameLength)
{
if (_httpClient == null)
if (_networkStream != null)
await _networkStream.WriteAsync(buffer, 0, frameLength);
else
{
@ -1506,6 +1522,20 @@ namespace WTelegram
}
}
/// <summary>Long poll on HTTP connections</summary>
/// <param name="httpWait">Parameters for the long poll. Leave <see langword="null"/> for the default 25 seconds.</param>
/// <remarks>⚠️ Telegram servers don't seem to support other parameter than <see langword="null"/> correctly</remarks>
public async Task HttpWait(HttpWait httpWait = null)
{
if (_networkStream != null) throw new InvalidOperationException("Can't use HttpWait over TCP connection");
var container = new MsgContainer { messages = [] };
if (httpWait != null && NewMsgId(false) is var (hwId, hwSeqno))
container.messages.Add(new(hwId, hwSeqno, httpWait));
if (CheckMsgsToAck() is MsgsAck msgsAck && NewMsgId(false) is var (ackId, ackSeqno))
container.messages.Add(new(ackId, ackSeqno, msgsAck));
await SendAsync(container, false);
}
internal async Task<T> InvokeBare<T>(IMethod<T> request)
{
if (_bareRpc != null) throw new WTException("A bare request is already undergoing");
@ -1531,7 +1561,8 @@ namespace WTelegram
await SendAsync(query, true, rpc);
if (_httpClient != null && !rpc.Task.IsCompleted)
{
await SendAsync(new HttpWait { max_delay = 30, wait_after = 10, max_wait = 1000 * PingInterval }, true);
// usually happens when a batch of unrelated messages were serialized before in the previous MsgContainer reply
await HttpWait(_httpWait); // wait a bit more
if (!rpc.Task.IsCompleted) rpc.tcs.TrySetException(new RpcException(417, "Missing RPC response via HTTP"));
}

View file

@ -127,16 +127,16 @@ namespace TL
if (type.IsArray)
if (value is byte[] bytes)
writer.WriteTLBytes(bytes);
else if (value is _Message[] messages)
writer.WriteTLMessages(messages);
else
writer.WriteTLVector((Array)value);
else if (value is IObject tlObject)
WriteTLObject(writer, tlObject);
else if (value is List<_Message> messages)
writer.WriteTLMessages(messages);
else if (value is Int128 int128)
writer.Write(int128);
else if (value is Int256 int256)
writer.Write(int256);
else if (value is IObject tlObject)
WriteTLObject(writer, tlObject);
else if (type.IsEnum) // needed for Mono (enums in generic types are seen as TypeCode.Object)
writer.Write((uint)value);
else
@ -191,22 +191,22 @@ namespace TL
}
}
internal static void WriteTLMessages(this BinaryWriter writer, _Message[] messages)
internal static void WriteTLMessages(this BinaryWriter writer, List<_Message> messages)
{
writer.Write(messages.Length);
writer.Write(messages.Count);
foreach (var msg in messages)
{
writer.Write(msg.msg_id);
writer.Write(msg.seq_no);
var patchPos = writer.BaseStream.Position;
writer.Write(0); // patched below
writer.Write(0); // patched below
if ((msg.seq_no & 1) != 0)
WTelegram.Helpers.Log(1, $" → {msg.body.GetType().Name.TrimEnd('_'),-38} #{(short)msg.msg_id.GetHashCode():X4}");
else
WTelegram.Helpers.Log(1, $" → {msg.body.GetType().Name.TrimEnd('_'),-38}");
writer.WriteTLObject(msg.body);
writer.BaseStream.Position = patchPos;
writer.Write((int)(writer.BaseStream.Length - patchPos - 4)); // patch bytes field
writer.Write((int)(writer.BaseStream.Length - patchPos - 4)); // patch bytes field
writer.Seek(0, SeekOrigin.End);
}
}
@ -222,13 +222,13 @@ namespace TL
writer.WriteTLValue(array.GetValue(i), elementType);
}
internal static T[] ReadTLRawVector<T>(this BinaryReader reader, uint ctorNb)
internal static List<T> ReadTLRawVector<T>(this BinaryReader reader, uint ctorNb)
{
int count = reader.ReadInt32();
var array = new T[count];
var list = new List<T>(count);
for (int i = 0; i < count; i++)
array[i] = (T)reader.ReadTLObject(ctorNb);
return array;
list.Add((T)reader.ReadTLObject(ctorNb));
return list;
}
internal static T[] ReadTLVector<T>(this BinaryReader reader)
@ -437,7 +437,7 @@ namespace TL
}
[TLDef(0x73F1F8DC)] //msg_container#73f1f8dc messages:vector<%Message> = MessageContainer
public sealed partial class MsgContainer : IObject { public _Message[] messages; }
public sealed partial class MsgContainer : IObject { public List<_Message> messages; }
[TLDef(0xE06046B2)] //msg_copy#e06046b2 orig_message:Message = MessageCopy
public sealed partial class MsgCopy : IObject { public _Message orig_message; }