mirror of
https://github.com/wiz0u/WTelegramClient.git
synced 2025-12-06 06:52:01 +01:00
Reactor system for parallelization of requests
This commit is contained in:
parent
5e6421d76e
commit
897b61747a
228
src/Client.cs
228
src/Client.cs
|
|
@ -10,6 +10,7 @@ using System.Net.Sockets;
|
||||||
using System.Reflection;
|
using System.Reflection;
|
||||||
using System.Security.Cryptography;
|
using System.Security.Cryptography;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
|
using System.Threading;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using TL;
|
using TL;
|
||||||
using static WTelegram.Encryption;
|
using static WTelegram.Encryption;
|
||||||
|
|
@ -28,11 +29,15 @@ namespace WTelegram
|
||||||
private NetworkStream _networkStream;
|
private NetworkStream _networkStream;
|
||||||
private int _frame_seqTx = 0, _frame_seqRx = 0;
|
private int _frame_seqTx = 0, _frame_seqRx = 0;
|
||||||
private ITLFunction _lastSentMsg;
|
private ITLFunction _lastSentMsg;
|
||||||
private Type _lastRpcResultType = typeof(object);
|
|
||||||
private readonly List<long> _msgsToAck = new();
|
private readonly List<long> _msgsToAck = new();
|
||||||
private int _unexpectedSaltChange;
|
|
||||||
private readonly Random _random = new();
|
private readonly Random _random = new();
|
||||||
private readonly SHA256 _sha256 = SHA256.Create();
|
private readonly SHA256 _sha256 = SHA256.Create();
|
||||||
|
private int _unexpectedSaltChange;
|
||||||
|
private Task _reactorTask;
|
||||||
|
private TaskCompletionSource<object> _rawRequest;
|
||||||
|
private readonly Dictionary<long, (Type type, TaskCompletionSource<object> tcs)> _pendingRequests = new();
|
||||||
|
private readonly SemaphoreSlim _sendSemaphore = new(1);
|
||||||
|
private CancellationTokenSource _cts;
|
||||||
|
|
||||||
public Client(Func<string,string> configProvider = null, Func<ITLObject, Task> updateHandler = null)
|
public Client(Func<string,string> configProvider = null, Func<ITLObject, Task> updateHandler = null)
|
||||||
{
|
{
|
||||||
|
|
@ -76,14 +81,27 @@ namespace WTelegram
|
||||||
[System.Diagnostics.CodeAnalysis.SuppressMessage("Performance", "CA1822")]
|
[System.Diagnostics.CodeAnalysis.SuppressMessage("Performance", "CA1822")]
|
||||||
public void LoadPublicKey(string pem) => Encryption.LoadPublicKey(pem);
|
public void LoadPublicKey(string pem) => Encryption.LoadPublicKey(pem);
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
if (CheckMsgsToAck() is MsgsAck msgsAck)
|
||||||
|
SendAsync(MakeFunction(msgsAck), false).Wait(1000);
|
||||||
|
_cts?.Cancel();
|
||||||
|
_reactorTask = null;
|
||||||
|
_tcpClient?.Dispose();
|
||||||
|
}
|
||||||
|
|
||||||
public void Reset() // disconnect and reset session (forget server address, current user and authkey)
|
public void Reset() // disconnect and reset session (forget server address, current user and authkey)
|
||||||
{
|
{
|
||||||
_tcpClient.Close();
|
_cts?.Cancel();
|
||||||
|
_reactorTask = null;
|
||||||
|
_tcpClient?.Dispose();
|
||||||
_session.Reset();
|
_session.Reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task ConnectAsync()
|
public async Task ConnectAsync()
|
||||||
{
|
{
|
||||||
|
if (_reactorTask != null)
|
||||||
|
throw new ApplicationException("Already connected!");
|
||||||
var endpoint = _session.DataCenter == null ? IPEndPoint.Parse(Config("server_address"))
|
var endpoint = _session.DataCenter == null ? IPEndPoint.Parse(Config("server_address"))
|
||||||
: new IPEndPoint(IPAddress.Parse(_session.DataCenter.ip_address), _session.DataCenter.port);
|
: new IPEndPoint(IPAddress.Parse(_session.DataCenter.ip_address), _session.DataCenter.port);
|
||||||
Helpers.Log(2, $"Connecting to {endpoint}...");
|
Helpers.Log(2, $"Connecting to {endpoint}...");
|
||||||
|
|
@ -92,6 +110,8 @@ namespace WTelegram
|
||||||
await _tcpClient.ConnectAsync(endpoint.Address, endpoint.Port);
|
await _tcpClient.ConnectAsync(endpoint.Address, endpoint.Port);
|
||||||
_networkStream = _tcpClient.GetStream();
|
_networkStream = _tcpClient.GetStream();
|
||||||
_frame_seqTx = _frame_seqRx = 0;
|
_frame_seqTx = _frame_seqRx = 0;
|
||||||
|
_cts = new();
|
||||||
|
_reactorTask = Reactor(_cts.Token);
|
||||||
|
|
||||||
if (_session.AuthKey == null)
|
if (_session.AuthKey == null)
|
||||||
await CreateAuthorizationKey(this, _session);
|
await CreateAuthorizationKey(this, _session);
|
||||||
|
|
@ -114,7 +134,9 @@ namespace WTelegram
|
||||||
if (_session.User != null)
|
if (_session.User != null)
|
||||||
exported = await Auth_ExportAuthorization(dcId);
|
exported = await Auth_ExportAuthorization(dcId);
|
||||||
var prevFamily = _tcpClient.Client.RemoteEndPoint.AddressFamily;
|
var prevFamily = _tcpClient.Client.RemoteEndPoint.AddressFamily;
|
||||||
_tcpClient.Close();
|
_cts.Cancel();
|
||||||
|
_reactorTask = null;
|
||||||
|
_tcpClient.Dispose();
|
||||||
var dcOptions = TLConfig.dc_options.Where(dc => dc.id == dcId && (dc.flags & (DcOption.Flags.media_only | DcOption.Flags.cdn)) == 0);
|
var dcOptions = TLConfig.dc_options.Where(dc => dc.id == dcId && (dc.flags & (DcOption.Flags.media_only | DcOption.Flags.cdn)) == 0);
|
||||||
if (prevFamily == AddressFamily.InterNetworkV6) // try to stay in the same connectivity
|
if (prevFamily == AddressFamily.InterNetworkV6) // try to stay in the same connectivity
|
||||||
dcOptions = dcOptions.OrderByDescending(dc => dc.flags & DcOption.Flags.ipv6); // list ipv6 first
|
dcOptions = dcOptions.OrderByDescending(dc => dc.flags & DcOption.Flags.ipv6); // list ipv6 first
|
||||||
|
|
@ -132,36 +154,37 @@ namespace WTelegram
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void Dispose()
|
private static ITLFunction MakeFunction(ITLObject msg)
|
||||||
{
|
=> writer =>
|
||||||
CheckMsgsToAck().Wait(1000);
|
|
||||||
_networkStream?.Dispose();
|
|
||||||
_tcpClient?.Dispose();
|
|
||||||
}
|
|
||||||
|
|
||||||
public Task SendAsync(ITLObject msg, bool isContent = true)
|
|
||||||
=> SendAsync(writer =>
|
|
||||||
{
|
{
|
||||||
writer.WriteTLObject(msg);
|
writer.WriteTLObject(msg);
|
||||||
return msg.GetType().Name;
|
return msg.GetType().Name;
|
||||||
}, isContent);
|
};
|
||||||
|
|
||||||
public async Task SendAsync(ITLFunction msgSerializer, bool isContent = true)
|
private async Task<long> SendAsync(ITLFunction func, bool isContent)
|
||||||
|
{
|
||||||
|
if (_session.AuthKeyID != 0 && isContent && CheckMsgsToAck() is MsgsAck msgsAck)
|
||||||
|
{
|
||||||
|
var ackMsg = _session.NewMsg(false);
|
||||||
|
var mainMsg = _session.NewMsg(true);
|
||||||
|
await SendAsync(MakeContainer((MakeFunction(msgsAck), ackMsg), (func, mainMsg)), false);
|
||||||
|
return mainMsg.msgId;
|
||||||
|
}
|
||||||
|
(long msgId, int seqno) = _session.NewMsg(isContent && _session.AuthKeyID != 0);
|
||||||
|
await _sendSemaphore.WaitAsync();
|
||||||
|
try
|
||||||
{
|
{
|
||||||
if (_session.AuthKeyID != 0) await CheckMsgsToAck();
|
|
||||||
using var memStream = new MemoryStream(1024);
|
using var memStream = new MemoryStream(1024);
|
||||||
using var writer = new BinaryWriter(memStream, Encoding.UTF8);
|
using var writer = new BinaryWriter(memStream, Encoding.UTF8);
|
||||||
writer.Write(0); // int32 frame_len (to be patched with full frame length)
|
writer.Write(0); // int32 frame_len (to be patched with full frame length)
|
||||||
writer.Write(_frame_seqTx++); // int32 frame_seq
|
writer.Write(_frame_seqTx++); // int32 frame_seq
|
||||||
|
|
||||||
(long msgId, int seqno) = _session.NewMsg(isContent && _session.AuthKeyID != 0);
|
|
||||||
|
|
||||||
if (_session.AuthKeyID == 0) // send unencrypted message
|
if (_session.AuthKeyID == 0) // send unencrypted message
|
||||||
{
|
{
|
||||||
writer.Write(0L); // int64 auth_key_id = 0 (Unencrypted)
|
writer.Write(0L); // int64 auth_key_id = 0 (Unencrypted)
|
||||||
writer.Write(msgId); // int64 message_id
|
writer.Write(msgId); // int64 message_id
|
||||||
writer.Write(0); // int32 message_data_length (to be patched)
|
writer.Write(0); // int32 message_data_length (to be patched)
|
||||||
var typeName = msgSerializer(writer); // bytes message_data
|
var typeName = func(writer); // bytes message_data
|
||||||
Helpers.Log(1, $"Sending {typeName}...");
|
Helpers.Log(1, $"Sending {typeName}...");
|
||||||
BinaryPrimitives.WriteInt32LittleEndian(memStream.GetBuffer().AsSpan(24), (int)memStream.Length - 28); // patch message_data_length
|
BinaryPrimitives.WriteInt32LittleEndian(memStream.GetBuffer().AsSpan(24), (int)memStream.Length - 28); // patch message_data_length
|
||||||
}
|
}
|
||||||
|
|
@ -180,8 +203,11 @@ namespace WTelegram
|
||||||
clearWriter.Write(msgId); // int64 message_id
|
clearWriter.Write(msgId); // int64 message_id
|
||||||
clearWriter.Write(seqno); // int32 msg_seqno
|
clearWriter.Write(seqno); // int32 msg_seqno
|
||||||
clearWriter.Write(0); // int32 message_data_length (to be patched)
|
clearWriter.Write(0); // int32 message_data_length (to be patched)
|
||||||
var typeName = msgSerializer(clearWriter); // bytes message_data
|
var typeName = func(clearWriter); // bytes message_data
|
||||||
|
if ((seqno & 1) != 0)
|
||||||
Helpers.Log(1, $"Sending {typeName,-50} #{(short)msgId.GetHashCode():X4}");
|
Helpers.Log(1, $"Sending {typeName,-50} #{(short)msgId.GetHashCode():X4}");
|
||||||
|
else
|
||||||
|
Helpers.Log(1, $"Sending {typeName,-50} {_session.MsgIdToStamp(msgId):u} (svc)");
|
||||||
int clearLength = (int)clearStream.Length - prepend; // length before padding (= 32 + message_data_length)
|
int clearLength = (int)clearStream.Length - prepend; // length before padding (= 32 + message_data_length)
|
||||||
int padding = (0x7FFFFFF0 - clearLength) % 16;
|
int padding = (0x7FFFFFF0 - clearLength) % 16;
|
||||||
#if !MTPROTO1
|
#if !MTPROTO1
|
||||||
|
|
@ -204,7 +230,6 @@ namespace WTelegram
|
||||||
writer.Write(msgKeyLarge, msgKeyOffset, 16); // int128 msg_key
|
writer.Write(msgKeyLarge, msgKeyOffset, 16); // int128 msg_key
|
||||||
writer.Write(encrypted_data); // bytes encrypted_data
|
writer.Write(encrypted_data); // bytes encrypted_data
|
||||||
}
|
}
|
||||||
|
|
||||||
var buffer = memStream.GetBuffer();
|
var buffer = memStream.GetBuffer();
|
||||||
int frameLength = (int)memStream.Length;
|
int frameLength = (int)memStream.Length;
|
||||||
//TODO: support Quick Ack?
|
//TODO: support Quick Ack?
|
||||||
|
|
@ -215,12 +240,18 @@ namespace WTelegram
|
||||||
//TODO: support Transport obfuscation?
|
//TODO: support Transport obfuscation?
|
||||||
|
|
||||||
await _networkStream.WriteAsync(frame);
|
await _networkStream.WriteAsync(frame);
|
||||||
_lastSentMsg = msgSerializer;
|
_lastSentMsg = func;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_sendSemaphore.Release();
|
||||||
|
}
|
||||||
|
return msgId;
|
||||||
}
|
}
|
||||||
|
|
||||||
internal async Task<ITLObject> RecvInternalAsync()
|
internal async Task<ITLObject> RecvInternalAsync(CancellationToken ct)
|
||||||
{
|
{
|
||||||
var data = await RecvFrameAsync();
|
var data = await RecvFrameAsync(ct);
|
||||||
if (data.Length == 4 && data[3] == 0xFF)
|
if (data.Length == 4 && data[3] == 0xFF)
|
||||||
{
|
{
|
||||||
int error_code = -BinaryPrimitives.ReadInt32LittleEndian(data);
|
int error_code = -BinaryPrimitives.ReadInt32LittleEndian(data);
|
||||||
|
|
@ -240,7 +271,7 @@ namespace WTelegram
|
||||||
if (length != data.Length - 20) throw new ApplicationException($"Unexpected unencrypted length {length} != {data.Length - 20}");
|
if (length != data.Length - 20) throw new ApplicationException($"Unexpected unencrypted length {length} != {data.Length - 20}");
|
||||||
|
|
||||||
var obj = reader.ReadTLObject();
|
var obj = reader.ReadTLObject();
|
||||||
Helpers.Log(1, $"Receiving {obj.GetType().Name,-50} timestamp={_session.MsgIdToStamp(msgId)} isResponse={(msgId & 2) != 0} unencrypted");
|
Helpers.Log(1, $"Receiving {obj.GetType().Name,-50} {_session.MsgIdToStamp(msgId):u} {((msgId & 2) == 0 ? "": "NAR")} unencrypted");
|
||||||
return obj;
|
return obj;
|
||||||
}
|
}
|
||||||
else if (authKeyId != _session.AuthKeyID)
|
else if (authKeyId != _session.AuthKeyID)
|
||||||
|
|
@ -284,9 +315,9 @@ namespace WTelegram
|
||||||
throw new ApplicationException($"Mismatch between MsgKey & decrypted SHA1");
|
throw new ApplicationException($"Mismatch between MsgKey & decrypted SHA1");
|
||||||
#endif
|
#endif
|
||||||
var obj = reader.ReadTLObject(type => type == typeof(RpcResult));
|
var obj = reader.ReadTLObject(type => type == typeof(RpcResult));
|
||||||
|
Helpers.Log(1, $"Receiving {obj.GetType().Name,-50} {_session.MsgIdToStamp(msgId):u} {((seqno & 1) != 0 ? "" : "(svc)")} {((msgId & 2) == 0 ? "" : "NAR")}");
|
||||||
if (obj is RpcResult rpcResult)
|
if (obj is RpcResult rpcResult)
|
||||||
DeserializeRpcResult(reader, rpcResult); // necessary hack because some RPC return bare types like bool or int[]
|
DeserializeRpcResult(reader, rpcResult); // necessary hack because some RPC return bare types like bool or int[]
|
||||||
Helpers.Log(1, $"Receiving {obj.GetType().Name,-50} timestamp={_session.MsgIdToStamp(msgId)} isResponse={(msgId & 2) != 0} {(seqno == -1 ? "clearText" : "isContent")}={(seqno & 1) != 0}");
|
|
||||||
return obj;
|
return obj;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -298,10 +329,10 @@ namespace WTelegram
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task<byte[]> RecvFrameAsync()
|
private async Task<byte[]> RecvFrameAsync(CancellationToken ct)
|
||||||
{
|
{
|
||||||
byte[] frame = new byte[8];
|
byte[] frame = new byte[8];
|
||||||
if (await FullReadAsync(_networkStream, frame, 8) != 8)
|
if (await FullReadAsync(_networkStream, frame, 8, ct) != 8)
|
||||||
throw new ApplicationException("Could not read frame prefix : Connection shut down");
|
throw new ApplicationException("Could not read frame prefix : Connection shut down");
|
||||||
int length = BinaryPrimitives.ReadInt32LittleEndian(frame) - 12;
|
int length = BinaryPrimitives.ReadInt32LittleEndian(frame) - 12;
|
||||||
if (length <= 0 || length >= 0x10000)
|
if (length <= 0 || length >= 0x10000)
|
||||||
|
|
@ -313,35 +344,50 @@ namespace WTelegram
|
||||||
_frame_seqRx = seqno + 1;
|
_frame_seqRx = seqno + 1;
|
||||||
}
|
}
|
||||||
var payload = new byte[length];
|
var payload = new byte[length];
|
||||||
if (await FullReadAsync(_networkStream, payload, length) != length)
|
if (await FullReadAsync(_networkStream, payload, length, ct) != length)
|
||||||
throw new ApplicationException("Could not read frame data : Connection shut down");
|
throw new ApplicationException("Could not read frame data : Connection shut down");
|
||||||
uint crc32 = Force.Crc32.Crc32Algorithm.Compute(frame, 0, 8);
|
uint crc32 = Force.Crc32.Crc32Algorithm.Compute(frame, 0, 8);
|
||||||
crc32 = Force.Crc32.Crc32Algorithm.Append(crc32, payload);
|
crc32 = Force.Crc32.Crc32Algorithm.Append(crc32, payload);
|
||||||
if (await FullReadAsync(_networkStream, frame, 4) != 4)
|
if (await FullReadAsync(_networkStream, frame, 4, ct) != 4)
|
||||||
throw new ApplicationException("Could not read frame CRC : Connection shut down");
|
throw new ApplicationException("Could not read frame CRC : Connection shut down");
|
||||||
if (crc32 != BinaryPrimitives.ReadUInt32LittleEndian(frame))
|
if (crc32 != BinaryPrimitives.ReadUInt32LittleEndian(frame))
|
||||||
throw new ApplicationException("Invalid envelope CRC32");
|
throw new ApplicationException("Invalid envelope CRC32");
|
||||||
return payload;
|
return payload;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static async Task<int> FullReadAsync(Stream stream, byte[] buffer, int length)
|
private static async Task<int> FullReadAsync(Stream stream, byte[] buffer, int length, CancellationToken ct = default)
|
||||||
{
|
{
|
||||||
for (int offset = 0; offset != length;)
|
for (int offset = 0; offset != length;)
|
||||||
{
|
{
|
||||||
var read = await stream.ReadAsync(buffer.AsMemory(offset, length - offset));
|
var read = await stream.ReadAsync(buffer.AsMemory(offset, length - offset), ct);
|
||||||
if (read == 0) return offset;
|
if (read == 0) return offset;
|
||||||
offset += read;
|
offset += read;
|
||||||
}
|
}
|
||||||
return length;
|
return length;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void DeserializeRpcResult(BinaryReader reader, RpcResult rpcResult)
|
private bool DeserializeRpcResult(BinaryReader reader, RpcResult rpcResult)
|
||||||
{
|
{
|
||||||
if ((rpcResult.req_msg_id = reader.ReadInt64()) == _session.LastSentMsgId)
|
var msgId = rpcResult.req_msg_id = reader.ReadInt64();
|
||||||
rpcResult.result = reader.ReadTLValue(_lastRpcResultType);
|
(Type type, TaskCompletionSource<object> tcs) request;
|
||||||
|
lock (_pendingRequests)
|
||||||
|
if (_pendingRequests.TryGetValue(msgId, out request))
|
||||||
|
_pendingRequests.Remove(msgId);
|
||||||
|
if (request.type != null)
|
||||||
|
{
|
||||||
|
rpcResult.result = reader.ReadTLValue(request.type);
|
||||||
|
Helpers.Log(1, $" result → {request.type.Name,-48} #{(short)msgId.GetHashCode():X4}");
|
||||||
|
Task.Run(() => request.tcs.SetResult(rpcResult.result)); // to avoid deadlock, see https://blog.stephencleary.com/2012/12/dont-block-in-asynchronous-code.html
|
||||||
|
}
|
||||||
else
|
else
|
||||||
|
{
|
||||||
rpcResult.result = reader.ReadTLObject();
|
rpcResult.result = reader.ReadTLObject();
|
||||||
Helpers.Log(1, $" → {rpcResult.result.GetType().Name,-50} #{(short)rpcResult.req_msg_id.GetHashCode():X4}");
|
if (_session.MsgIdToStamp(msgId) >= _session.SessionStart)
|
||||||
|
Helpers.Log(4, $" result → {rpcResult.result?.GetType().Name,-48}) for unknown msgId #{(short)msgId.GetHashCode():X4}");
|
||||||
|
else
|
||||||
|
Helpers.Log(1, $" result → {rpcResult.result?.GetType().Name,-48} for past msgId #{(short)msgId.GetHashCode():X4}");
|
||||||
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public class RpcException : Exception
|
public class RpcException : Exception
|
||||||
|
|
@ -352,49 +398,81 @@ namespace WTelegram
|
||||||
|
|
||||||
public async Task<X> CallAsync<X>(ITLFunction request)
|
public async Task<X> CallAsync<X>(ITLFunction request)
|
||||||
{
|
{
|
||||||
await SendAsync(request);
|
retry:
|
||||||
// TODO: create a background reactor system that handles incoming packets and wake up awaiting tasks when their result has arrived
|
var msgId = await SendAsync(request, true);
|
||||||
// This would allow parallelization of Send task and avoid the risk of calling RecvInternal concurrently
|
object result;
|
||||||
_lastRpcResultType = typeof(X);
|
if (_session.AuthKeyID == 0)
|
||||||
for (; ;) //TODO: implement a timeout
|
|
||||||
{
|
{
|
||||||
var reply = await RecvInternalAsync();
|
_rawRequest = new TaskCompletionSource<object>();
|
||||||
if (reply is X plainResult) return plainResult;
|
result = await _rawRequest.Task;
|
||||||
else if (reply is RpcResult rpcResult && rpcResult.req_msg_id == _session.LastSentMsgId)
|
}
|
||||||
|
else
|
||||||
{
|
{
|
||||||
if (rpcResult.result is RpcError rpcError)
|
var tcs = new TaskCompletionSource<object>();
|
||||||
|
lock (_pendingRequests)
|
||||||
|
_pendingRequests[msgId] = (typeof(X), tcs);
|
||||||
|
result = await tcs.Task;
|
||||||
|
}
|
||||||
|
switch (result)
|
||||||
{
|
{
|
||||||
|
case X resultX: return resultX;
|
||||||
|
case RpcError rpcError:
|
||||||
int migrateDC;
|
int migrateDC;
|
||||||
if (rpcError.error_code == 303 && ((migrateDC = rpcError.error_message.IndexOf("_MIGRATE_")) > 0))
|
if (rpcError.error_code == 303 && ((migrateDC = rpcError.error_message.IndexOf("_MIGRATE_")) > 0))
|
||||||
{
|
{
|
||||||
migrateDC = int.Parse(rpcError.error_message[(migrateDC + 9)..]);
|
migrateDC = int.Parse(rpcError.error_message[(migrateDC + 9)..]);
|
||||||
await MigrateDCAsync(migrateDC);
|
await MigrateDCAsync(migrateDC);
|
||||||
await SendAsync(request);
|
goto retry;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
throw new RpcException(rpcError.error_code, rpcError.error_message);
|
throw new RpcException(rpcError.error_code, rpcError.error_message);
|
||||||
}
|
default:
|
||||||
else if (rpcResult.result is X result)
|
throw new ApplicationException($"{request.GetType().Name} call got a result of type {result.GetType().Name} instead of {typeof(X).Name}");
|
||||||
return result;
|
|
||||||
else
|
|
||||||
throw new ApplicationException($"{request.GetType().Name} call got a result of type {rpcResult.result.GetType().Name} instead of {typeof(X).Name}");
|
|
||||||
}
|
|
||||||
else
|
|
||||||
await HandleMessageAsync(reply);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task CheckMsgsToAck()
|
private MsgsAck CheckMsgsToAck()
|
||||||
{
|
{
|
||||||
MsgsAck msgsAck = null;
|
|
||||||
lock (_msgsToAck)
|
lock (_msgsToAck)
|
||||||
if (_msgsToAck.Count != 0)
|
|
||||||
{
|
{
|
||||||
msgsAck = new MsgsAck { msg_ids = _msgsToAck.ToArray() };
|
if (_msgsToAck.Count == 0) return null;
|
||||||
|
var msgsAck = new MsgsAck { msg_ids = _msgsToAck.ToArray() };
|
||||||
_msgsToAck.Clear();
|
_msgsToAck.Clear();
|
||||||
|
return msgsAck;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private ITLFunction MakeContainer(params (ITLFunction func, (long msgId, int seqno))[] msgs)
|
||||||
|
=> writer =>
|
||||||
|
{
|
||||||
|
writer.Write(0x73F1F8DC);
|
||||||
|
writer.Write(msgs.Length);
|
||||||
|
foreach (var (func, (msgId, seqno)) in msgs)
|
||||||
|
{
|
||||||
|
writer.Write(msgId);
|
||||||
|
writer.Write(seqno);
|
||||||
|
var patchPos = writer.BaseStream.Position;
|
||||||
|
writer.Write(0);
|
||||||
|
var typeName = func(writer);
|
||||||
|
if ((seqno & 1) != 0)
|
||||||
|
Helpers.Log(1, $"Sending → {typeName,-50} #{(short)msgId.GetHashCode():X4}");
|
||||||
|
else
|
||||||
|
Helpers.Log(1, $"Sending → {typeName,-50} {_session.MsgIdToStamp(msgId):u} (svc)");
|
||||||
|
writer.BaseStream.Position = patchPos;
|
||||||
|
writer.Write((int)(writer.BaseStream.Length - patchPos - 4)); // patch bytes field
|
||||||
|
writer.Seek(0, SeekOrigin.End);
|
||||||
|
}
|
||||||
|
return "as MsgContainer";
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
private async Task Reactor(CancellationToken ct)
|
||||||
|
{
|
||||||
|
while (!ct.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
var obj = await RecvInternalAsync(ct);
|
||||||
|
await HandleMessageAsync(obj);
|
||||||
}
|
}
|
||||||
if (msgsAck != null)
|
|
||||||
await SendAsync(msgsAck, false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task HandleMessageAsync(ITLObject obj)
|
private async Task HandleMessageAsync(ITLObject obj)
|
||||||
|
|
@ -404,7 +482,9 @@ namespace WTelegram
|
||||||
case MsgContainer container:
|
case MsgContainer container:
|
||||||
foreach (var msg in container.messages)
|
foreach (var msg in container.messages)
|
||||||
{
|
{
|
||||||
Helpers.Log(1, $" → {msg.body?.GetType().Name,-48} timestamp={_session.MsgIdToStamp(msg.msg_id)} isResponse={(msg.msg_id & 2) != 0} {(msg.seqno == -1 ? "clearText" : "isContent")}={(msg.seqno & 1) != 0}");
|
var typeName = msg.body?.GetType().Name;
|
||||||
|
if (typeName == "RpcResult") typeName += $" ({((RpcResult)msg.body).result.GetType().Name})";
|
||||||
|
Helpers.Log(1, $" → {typeName,-48} {_session.MsgIdToStamp(msg.msg_id):u} {((msg.seqno & 1) != 0 ? "" : "(svc)")} {((msg.msg_id & 2) == 0 ? "" : "NAR")}");
|
||||||
if ((msg.seqno & 1) != 0) lock (_msgsToAck) _msgsToAck.Add(msg.msg_id);
|
if ((msg.seqno & 1) != 0) lock (_msgsToAck) _msgsToAck.Add(msg.msg_id);
|
||||||
if (msg.body != null) await HandleMessageAsync(msg.body);
|
if (msg.body != null) await HandleMessageAsync(msg.body);
|
||||||
}
|
}
|
||||||
|
|
@ -412,17 +492,31 @@ namespace WTelegram
|
||||||
case BadServerSalt badServerSalt:
|
case BadServerSalt badServerSalt:
|
||||||
_session.Salt = badServerSalt.new_server_salt;
|
_session.Salt = badServerSalt.new_server_salt;
|
||||||
if (badServerSalt.bad_msg_id == _session.LastSentMsgId)
|
if (badServerSalt.bad_msg_id == _session.LastSentMsgId)
|
||||||
await SendAsync(_lastSentMsg);
|
{
|
||||||
|
var newMsgId = await SendAsync(_lastSentMsg, true);
|
||||||
|
lock (_pendingRequests)
|
||||||
|
if (_pendingRequests.TryGetValue(badServerSalt.bad_msg_id, out var t))
|
||||||
|
{
|
||||||
|
_pendingRequests.Remove(badServerSalt.bad_msg_id);
|
||||||
|
_pendingRequests[newMsgId] = t;
|
||||||
|
}
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case BadMsgNotification badMsgNotification:
|
case BadMsgNotification badMsgNotification:
|
||||||
Helpers.Log(3, $"BadMsgNotification {badMsgNotification.error_code} for msg {badMsgNotification.bad_msg_seqno}");
|
Helpers.Log(3, $"BadMsgNotification {badMsgNotification.error_code} for msg #{(short)badMsgNotification.bad_msg_id.GetHashCode():X4}");
|
||||||
break;
|
break;
|
||||||
case RpcResult rpcResult:
|
case RpcResult rpcResult:
|
||||||
if (_session.MsgIdToStamp(rpcResult.req_msg_id) >= _session.SessionStart)
|
//if (_session.MsgIdToStamp(rpcResult.req_msg_id) >= _session.SessionStart)
|
||||||
throw new ApplicationException($"Got RpcResult({rpcResult.result.GetType().Name}) for unknown msgId {rpcResult.req_msg_id}");
|
break; // tcs wake up was already done in DeserializeRpcResult
|
||||||
break; // silently ignore results for msg_id from previous sessions
|
|
||||||
default:
|
default:
|
||||||
if (_updateHandler != null) await _updateHandler?.Invoke(obj);
|
if (_rawRequest != null)
|
||||||
|
{
|
||||||
|
var rawRequest = _rawRequest;
|
||||||
|
_ = Task.Run(() => rawRequest.SetResult(obj)); // to avoid deadlock, see https://blog.stephencleary.com/2012/12/dont-block-in-asynchronous-code.html
|
||||||
|
_rawRequest = null;
|
||||||
|
}
|
||||||
|
else if (_updateHandler != null)
|
||||||
|
await _updateHandler?.Invoke(obj);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -71,12 +71,16 @@ namespace WTelegram
|
||||||
|
|
||||||
internal (long msgId, int seqno) NewMsg(bool isContent)
|
internal (long msgId, int seqno) NewMsg(bool isContent)
|
||||||
{
|
{
|
||||||
|
int seqno;
|
||||||
long msgId = DateTime.UtcNow.Ticks + ServerTicksOffset - 621355968000000000L;
|
long msgId = DateTime.UtcNow.Ticks + ServerTicksOffset - 621355968000000000L;
|
||||||
msgId = msgId * 428 + (msgId >> 24) * 25110956; // approximately unixtime*2^32 and divisible by 4
|
msgId = msgId * 428 + (msgId >> 24) * 25110956; // approximately unixtime*2^32 and divisible by 4
|
||||||
|
lock (this)
|
||||||
|
{
|
||||||
if (msgId <= LastSentMsgId) msgId = LastSentMsgId += 4; else LastSentMsgId = msgId;
|
if (msgId <= LastSentMsgId) msgId = LastSentMsgId += 4; else LastSentMsgId = msgId;
|
||||||
|
|
||||||
int seqno = isContent ? Seqno++ * 2 + 1 : Seqno * 2;
|
seqno = isContent ? Seqno++ * 2 + 1 : Seqno * 2;
|
||||||
Save();
|
Save();
|
||||||
|
}
|
||||||
return (msgId, seqno);
|
return (msgId, seqno);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -51,14 +51,14 @@ namespace TL
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
internal static ITLObject ReadTLObject(this BinaryReader reader, Func<Type, bool> customRead = null)
|
internal static ITLObject ReadTLObject(this BinaryReader reader, Func<Type, bool> notifyType = null)
|
||||||
{
|
{
|
||||||
var ctorNb = reader.ReadUInt32();
|
var ctorNb = reader.ReadUInt32();
|
||||||
if (ctorNb == NullCtor) return null;
|
if (ctorNb == NullCtor) return null;
|
||||||
if (!Table.TryGetValue(ctorNb, out var type))
|
if (!Table.TryGetValue(ctorNb, out var type))
|
||||||
throw new ApplicationException($"Cannot find type for ctor #{ctorNb:x}");
|
throw new ApplicationException($"Cannot find type for ctor #{ctorNb:x}");
|
||||||
var obj = Activator.CreateInstance(type);
|
var obj = Activator.CreateInstance(type);
|
||||||
if (customRead?.Invoke(type) == true) return (ITLObject)obj;
|
if (notifyType?.Invoke(type) == true) return (ITLObject) obj;
|
||||||
var fields = obj.GetType().GetFields().GroupBy(f => f.DeclaringType).Reverse().SelectMany(g => g);
|
var fields = obj.GetType().GetFields().GroupBy(f => f.DeclaringType).Reverse().SelectMany(g => g);
|
||||||
int flags = 0;
|
int flags = 0;
|
||||||
IfFlagAttribute ifFlag;
|
IfFlagAttribute ifFlag;
|
||||||
|
|
@ -244,7 +244,7 @@ namespace TL
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
Helpers.Log(4, ex.ToString());
|
Helpers.Log(4, "While deserializing vector<%Message>: " + ex.ToString());
|
||||||
}
|
}
|
||||||
reader.BaseStream.Position = pos + array[i].bytes;
|
reader.BaseStream.Position = pos + array[i].bytes;
|
||||||
}
|
}
|
||||||
|
|
@ -255,7 +255,6 @@ namespace TL
|
||||||
{
|
{
|
||||||
using var reader = new BinaryReader(new GZipStream(new MemoryStream(obj.packed_data), CompressionMode.Decompress));
|
using var reader = new BinaryReader(new GZipStream(new MemoryStream(obj.packed_data), CompressionMode.Decompress));
|
||||||
var result = ReadTLObject(reader);
|
var result = ReadTLObject(reader);
|
||||||
Helpers.Log(1, $" → {result.GetType().Name}");
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue