Better handle bare requests. Handle more service messages.

This commit is contained in:
Wizou 2021-08-14 15:15:41 +02:00
parent 39f03ed78f
commit cc83944985
7 changed files with 100 additions and 53 deletions

View file

@ -127,5 +127,5 @@ Here are the main expected developments:
- [x] Separate background task for reading/handling update messages independently
- [x] Support MTProto 2.0
- [x] Support users with 2FA enabled
- [x] Support main service messages
- [ ] Support secret chats end-to-end encryption & PFS
- [ ] Support all service messages

View file

@ -29,12 +29,13 @@ namespace WTelegram
private NetworkStream _networkStream;
private int _frame_seqTx = 0, _frame_seqRx = 0;
private ITLFunction _lastSentMsg;
private long _lastRecvMsgId;
private readonly List<long> _msgsToAck = new();
private readonly Random _random = new();
private readonly SHA256 _sha256 = SHA256.Create();
private int _unexpectedSaltChange;
private Task _reactorTask;
private TaskCompletionSource<object> _rawRequest;
private long _bareRequest;
private readonly Dictionary<long, (Type type, TaskCompletionSource<object> tcs)> _pendingRequests = new();
private readonly SemaphoreSlim _sendSemaphore = new(1);
private CancellationTokenSource _cts;
@ -302,7 +303,7 @@ namespace WTelegram
if (authKeyId == 0) // Unencrypted message
{
using var reader = new BinaryReader(new MemoryStream(data, 8, data.Length - 8));
long msgId = reader.ReadInt64();
long msgId = _lastRecvMsgId = reader.ReadInt64();
if ((msgId & 1) == 0) throw new ApplicationException($"Invalid server msgId {msgId}");
int length = reader.ReadInt32();
if (length != data.Length - 20) throw new ApplicationException($"Unexpected unencrypted length {length} != {data.Length - 20}");
@ -324,11 +325,11 @@ namespace WTelegram
if (decrypted_data.Length < 36) // header below+ctorNb
throw new ApplicationException($"Decrypted packet too small: {decrypted_data.Length}");
using var reader = new BinaryReader(new MemoryStream(decrypted_data));
var serverSalt = reader.ReadInt64(); // int64 salt
var sessionId = reader.ReadInt64(); // int64 session_id
var msgId = reader.ReadInt64(); // int64 message_id
var seqno = reader.ReadInt32(); // int32 msg_seqno
var length = reader.ReadInt32(); // int32 message_data_length
var serverSalt = reader.ReadInt64(); // int64 salt
var sessionId = reader.ReadInt64(); // int64 session_id
var msgId = _lastRecvMsgId = reader.ReadInt64();// int64 message_id
var seqno = reader.ReadInt32(); // int32 msg_seqno
var length = reader.ReadInt32(); // int32 message_data_length
if (serverSalt != _session.Salt)
{
@ -418,17 +419,13 @@ namespace WTelegram
private RpcResult ReadRpcResult(BinaryReader reader)
{
long msgId = reader.ReadInt64();
(Type type, TaskCompletionSource<object> tcs) request;
lock (_pendingRequests)
if (_pendingRequests.TryGetValue(msgId, out request))
_pendingRequests.Remove(msgId);
var (type, tcs) = PullPendingRequest(msgId);
object result;
if (request.type != null)
if (tcs != null)
{
result = reader.ReadTLValue(request.type);
result = reader.ReadTLValue(type);
Log(1, "");
Task.Run(() => request.tcs.SetResult(result)); // to avoid deadlock, see https://blog.stephencleary.com/2012/12/dont-block-in-asynchronous-code.html
return new RpcResult { req_msg_id = msgId, result = result };
Task.Run(() => tcs.SetResult(result)); // in Task.Run to avoid deadlock, see https://blog.stephencleary.com/2012/12/dont-block-in-asynchronous-code.html
}
else
{
@ -437,8 +434,8 @@ namespace WTelegram
Log(4, "for unknown msgId ");
else
Log(1, "for past msgId ");
return new RpcResult { req_msg_id = msgId, result = result };
}
return new RpcResult { req_msg_id = msgId, result = result };
void Log(int level, string msgIdprefix)
{
@ -449,29 +446,33 @@ namespace WTelegram
}
}
public class RpcException : Exception
private (Type type, TaskCompletionSource<object> tcs) PullPendingRequest(long msgId)
{
public readonly int Code;
public RpcException(int code, string message) : base(message) => Code = code;
(Type type, TaskCompletionSource<object> tcs) request;
lock (_pendingRequests)
if (_pendingRequests.TryGetValue(msgId, out request))
_pendingRequests.Remove(msgId);
return request;
}
internal async Task<X> CallBareAsync<X>(ITLFunction request)
{
var msgId = await SendAsync(request, false);
var tcs = new TaskCompletionSource<object>();
lock (_pendingRequests)
_pendingRequests[msgId] = (typeof(X), tcs);
_bareRequest = msgId;
return (X)await tcs.Task;
}
public async Task<X> CallAsync<X>(ITLFunction request)
{
retry:
var msgId = await SendAsync(request, true);
object result;
if (_session.AuthKeyID == 0)
{
_rawRequest = new TaskCompletionSource<object>();
result = await _rawRequest.Task;
}
else
{
var tcs = new TaskCompletionSource<object>();
lock (_pendingRequests)
_pendingRequests[msgId] = (typeof(X), tcs);
result = await tcs.Task;
}
var tcs = new TaskCompletionSource<object>();
lock (_pendingRequests)
_pendingRequests[msgId] = (typeof(X), tcs);
var result = await tcs.Task;
switch (result)
{
case X resultX: return resultX;
@ -549,6 +550,10 @@ namespace WTelegram
if (msg.body != null)
await HandleMessageAsync(msg.body);
break;
case MsgCopy msgCopy:
if (msgCopy?.orig_message?.body != null)
await HandleMessageAsync(msgCopy.orig_message.body);
break;
case BadServerSalt badServerSalt:
_session.Salt = badServerSalt.new_server_salt;
if (badServerSalt.bad_msg_id == _session.LastSentMsgId)
@ -562,22 +567,45 @@ namespace WTelegram
}
}
break;
case BadMsgNotification badMsgNotification:
Helpers.Log(3, $"BadMsgNotification {badMsgNotification.error_code} for msg #{(short)badMsgNotification.bad_msg_id.GetHashCode():X4}");
case Ping ping:
_ = SendAsync(MakeFunction(new Pong { msg_id = _lastRecvMsgId, ping_id = ping.ping_id }), false);
break;
case Pong pong:
await SetResult(pong.msg_id, pong);
break;
case FutureSalts futureSalts:
await SetResult(futureSalts.req_msg_id, futureSalts);
break;
case RpcResult rpcResult:
break; // wake-up of waiting task was already done in ReadRpcResult
break; // SetResult was already done in ReadRpcResult
case MsgsAck msgsAck:
break; // we don't do anything with these, for now
case BadMsgNotification badMsgNotification:
Helpers.Log(4, $"BadMsgNotification {badMsgNotification.error_code} for msg #{(short)badMsgNotification.bad_msg_id.GetHashCode():X4}");
goto default;
default:
if (_rawRequest != null)
if (_bareRequest != 0)
{
var rawRequest = _rawRequest;
_ = Task.Run(() => rawRequest.SetResult(obj)); // to avoid deadlock, see https://blog.stephencleary.com/2012/12/dont-block-in-asynchronous-code.html
_rawRequest = null;
var (type, tcs) = PullPendingRequest(_bareRequest);
if (obj.GetType().IsAssignableTo(type))
{
_bareRequest = 0;
_ = Task.Run(() => tcs.SetResult(obj));
}
}
else if (_updateHandler != null)
if (_updateHandler != null)
await _updateHandler?.Invoke(obj);
break;
}
async Task SetResult(long msgId, object result)
{
var (type, tcs) = PullPendingRequest(msgId);
if (tcs != null)
_ = Task.Run(() => tcs.SetResult(result));
else if (_updateHandler != null)
await _updateHandler?.Invoke(obj);
}
}
public async Task<User> UserAuthIfNeeded(CodeSettings settings = null)

View file

@ -122,10 +122,17 @@ namespace WTelegram
}
if (typeInfosByLayer[0]["Message"].SameName.ID == 0x5BB8E511) typeInfosByLayer[0].Remove("Message");
var methods = new List<TypeInfo>();
if (schema.methods.Count != 0)
{
typeInfos = typeInfosByLayer[0];
var ping = schema.methods.FirstOrDefault(m => m.method == "ping");
if (ping != null)
{
var typeInfo = new TypeInfo { ReturnName = ping.type };
typeInfo.Structs.Add(new Constructor { id = ping.id, @params = ping.@params, predicate = ping.method, type = ping.type });
ctorToTypes[int.Parse(ping.id)] = CSharpName(ping.method);
WriteTypeInfo(sw, typeInfo, "", false);
}
sw.WriteLine("}");
sw.WriteLine("");
sw.WriteLine("namespace WTelegram\t\t// ---functions---");
@ -149,7 +156,7 @@ namespace WTelegram
}
sw.WriteLine("}");
if (tableCs != null) UpdateTable(tableCs, methods);
if (tableCs != null) UpdateTable(tableCs);
}
void WriteTypeInfo(StreamWriter sw, TypeInfo typeInfo, string layerPrefix, bool isMethod)
@ -345,18 +352,20 @@ namespace WTelegram
if (style == -1) return;
sw.WriteLine();
var callAsync = "CallAsync";
if (method.type.Length == 1 && style != 1) funcName += $"<{returnType}>";
if (currentJson != "TL.MTProto")
sw.WriteLine($"{tabIndent}///<summary>See <a href=\"https://core.telegram.org/method/{method.method}\"/></summary>");
else
{
if (method.type is not "FutureSalts" and not "Pong") callAsync = "CallBareAsync";
sw.Write($"{tabIndent}//{method.method}#{ctorNb:x8} ");
if (method.type.Length == 1) sw.Write($"{{{method.type}:Type}} ");
foreach (var parm in method.@params) sw.Write($"{parm.name}:{parm.type} ");
sw.WriteLine($"= {method.type}");
}
if (style == 0) sw.WriteLine($"{tabIndent}public Task<{returnType}> {funcName}() => CallAsync<{returnType}>({funcName});");
if (style == 0) sw.WriteLine($"{tabIndent}public Task<{returnType}> {funcName}() => {callAsync}<{returnType}>({funcName});");
if (style == 0) sw.Write($"{tabIndent}public static string {funcName}(BinaryWriter writer");
if (style == 1) sw.Write($"{tabIndent}public static ITLFunction {funcName}(");
if (style == 2) sw.Write($"{tabIndent}public Task<{returnType}> {funcName}(");
@ -390,7 +399,7 @@ namespace WTelegram
sw.WriteLine(")");
if (style != 0) tabIndent += "\t";
if (style == 1) sw.WriteLine($"{tabIndent}=> writer =>");
if (style == 2) sw.WriteLine($"{tabIndent}=> CallAsync<{returnType}>(writer =>");
if (style == 2) sw.WriteLine($"{tabIndent}=> {callAsync}<{returnType}>(writer =>");
sw.WriteLine(tabIndent + "{");
sw.WriteLine($"{tabIndent}\twriter.Write(0x{ctorNb:X8});");
foreach (var parm in method.@params) // serialize request
@ -451,7 +460,7 @@ namespace WTelegram
if (style != 0) tabIndent = tabIndent[0..^1];
}
void UpdateTable(string tableCs, List<TypeInfo> methods)
void UpdateTable(string tableCs)
{
var myTag = $"\t\t\t// from {currentJson}:";
var seen_ids = new HashSet<int>();

View file

@ -13,6 +13,7 @@
{
public abstract int ID { get; }
public abstract string Title { get; }
/// <summary>returns true if you're banned of any of these rights</summary>
public abstract bool IsBanned(ChatBannedRights.Flags flags = 0);
protected abstract InputPeer ToInputPeer();
public static implicit operator InputPeer(ChatBase chat) => chat.ToInputPeer();
@ -28,7 +29,6 @@
{
public override int ID => id;
public override string Title => title;
/// <summary>returns true if you're banned of any of these rights</summary>
public override bool IsBanned(ChatBannedRights.Flags flags = 0) => ((default_banned_rights?.flags ?? 0) & flags) != 0;
protected override InputPeer ToInputPeer() => new InputPeerChat { chat_id = id };
}

View file

@ -222,6 +222,9 @@ namespace TL
public partial class DestroyAuthKeyNone : DestroyAuthKeyRes { }
[TLDef(0xEA109B13)] //destroy_auth_key_fail#ea109b13 = DestroyAuthKeyRes
public partial class DestroyAuthKeyFail : DestroyAuthKeyRes { }
[TLDef(0x7ABE77EC)] //ping#7abe77ec ping_id:long = Pong
public partial class Ping : ITLObject { public long ping_id; }
}
namespace WTelegram // ---functions---
@ -233,7 +236,7 @@ namespace WTelegram // ---functions---
{
//req_pq_multi#be7e8ef1 nonce:int128 = ResPQ
public Task<ResPQ> ReqPqMulti(Int128 nonce)
=> CallAsync<ResPQ>(writer =>
=> CallBareAsync<ResPQ>(writer =>
{
writer.Write(0xBE7E8EF1);
writer.Write(nonce);
@ -242,7 +245,7 @@ namespace WTelegram // ---functions---
//req_DH_params#d712e4be nonce:int128 server_nonce:int128 p:bytes q:bytes public_key_fingerprint:long encrypted_data:bytes = Server_DH_Params
public Task<ServerDHParams> ReqDHParams(Int128 nonce, Int128 server_nonce, byte[] p, byte[] q, long public_key_fingerprint, byte[] encrypted_data)
=> CallAsync<ServerDHParams>(writer =>
=> CallBareAsync<ServerDHParams>(writer =>
{
writer.Write(0xD712E4BE);
writer.Write(nonce);
@ -256,7 +259,7 @@ namespace WTelegram // ---functions---
//set_client_DH_params#f5045f1f nonce:int128 server_nonce:int128 encrypted_data:bytes = Set_client_DH_params_answer
public Task<SetClientDHParamsAnswer> SetClientDHParams(Int128 nonce, Int128 server_nonce, byte[] encrypted_data)
=> CallAsync<SetClientDHParamsAnswer>(writer =>
=> CallBareAsync<SetClientDHParamsAnswer>(writer =>
{
writer.Write(0xF5045F1F);
writer.Write(nonce);
@ -267,7 +270,7 @@ namespace WTelegram // ---functions---
//rpc_drop_answer#58e4a740 req_msg_id:long = RpcDropAnswer
public Task<RpcDropAnswer> RpcDropAnswer(long req_msg_id)
=> CallAsync<RpcDropAnswer>(writer =>
=> CallBareAsync<RpcDropAnswer>(writer =>
{
writer.Write(0x58E4A740);
writer.Write(req_msg_id);
@ -304,7 +307,7 @@ namespace WTelegram // ---functions---
//destroy_session#e7512126 session_id:long = DestroySessionRes
public Task<DestroySessionRes> DestroySession(long session_id)
=> CallAsync<DestroySessionRes>(writer =>
=> CallBareAsync<DestroySessionRes>(writer =>
{
writer.Write(0xE7512126);
writer.Write(session_id);
@ -313,7 +316,7 @@ namespace WTelegram // ---functions---
//destroy_auth_key#d1435160 = DestroyAuthKeyRes
public Task<DestroyAuthKeyRes> DestroyAuthKey()
=> CallAsync<DestroyAuthKeyRes>(writer =>
=> CallBareAsync<DestroyAuthKeyRes>(writer =>
{
writer.Write(0xD1435160);
return "DestroyAuthKey";

View file

@ -52,6 +52,7 @@ namespace TL
[0xF660E1D4] = typeof(DestroyAuthKeyOk),
[0x0A9F2259] = typeof(DestroyAuthKeyNone),
[0xEA109B13] = typeof(DestroyAuthKeyFail),
[0x7ABE77EC] = typeof(Ping),
// from TL.Schema:
[0xBC799737] = typeof(BoolFalse),
[0x997275B5] = typeof(BoolTrue),

View file

@ -236,6 +236,12 @@ namespace TL
#endif
}
public class RpcException : Exception
{
public readonly int Code;
public RpcException(int code, string message) : base(message) => Code = code;
}
[AttributeUsage(AttributeTargets.Class)]
public class TLDefAttribute : Attribute
{