Better IPv6 support/fallback. Support RpcError in response to an API returning an array

This commit is contained in:
Wizou 2021-10-01 02:22:26 +02:00
parent d72f7a6398
commit e7c5d2eb27
7 changed files with 89 additions and 23 deletions

2
.github/ci.yml vendored
View file

@ -2,7 +2,7 @@ pr: none
trigger:
- master
name: 1.0.0-ci.$(Rev:r)
name: 1.0.1-ci.$(Rev:r)
pool:
vmImage: ubuntu-latest

2
.github/release.yml vendored
View file

@ -1,7 +1,7 @@
pr: none
trigger: none
name: 1.0.0
name: 1.0.$(Rev:r)
pool:
vmImage: ubuntu-latest

View file

@ -26,6 +26,7 @@ namespace WTelegram
public Config TLConfig { get; private set; }
public int MaxAutoReconnects { get; set; } = 5; // number of automatic reconnections on connection/reactor failure
public bool IsMainDC => (_dcSession?.DataCenter?.id ?? 0) == _session.MainDC;
public bool Connected => _tcpClient?.Connected ?? false;
private readonly Func<string, string> _config;
private readonly int _apiId;
@ -120,13 +121,20 @@ namespace WTelegram
// disconnect and eventually forget user and disconnect other sessions
public void Reset(bool resetUser = true, bool resetSessions = true)
{
try
{
if (CheckMsgsToAck() is MsgsAck msgsAck)
SendAsync(MakeFunction(msgsAck), false).Wait(1000);
}
catch (Exception)
{
}
_cts?.Cancel();
_sendSemaphore = new(0);
_reactorTask = null;
_tcpClient?.Dispose();
_tcpClient = null;
_connecting = null;
if (resetSessions)
{
@ -154,8 +162,49 @@ namespace WTelegram
{
var endpoint = _dcSession?.EndPoint ?? Compat.IPEndPoint_Parse(Config("server_address"));
Helpers.Log(2, $"Connecting to {endpoint}...");
_tcpClient = new TcpClient(endpoint.AddressFamily);
_tcpClient = new TcpClient(AddressFamily.InterNetworkV6) { Client = { DualMode = true } }; // this allows both IPv4 & IPv6
try
{
await _tcpClient.ConnectAsync(endpoint.Address, endpoint.Port);
}
catch (SocketException ex) // cannot connect to target endpoint, try to find an alternate
{
Helpers.Log(4, $"SocketException {ex.SocketErrorCode} ({ex.ErrorCode}): {ex.Message}");
if (_dcSession?.DataCenter == null) throw;
var triedEndpoints = new HashSet<IPEndPoint> { endpoint };
if (_session.DcOptions != null)
{
var altOptions = _session.DcOptions.Where(dco => dco.id == _dcSession.DataCenter.id && dco.flags != _dcSession.DataCenter.flags
&& (dco.flags & (DcOption.Flags.cdn | DcOption.Flags.tcpo_only | DcOption.Flags.media_only)) == 0)
.OrderBy(dco => dco.flags);
// try alternate addresses for this DC
foreach (var dcOption in altOptions)
{
endpoint = new(IPAddress.Parse(dcOption.ip_address), dcOption.port);
if (!triedEndpoints.Add(endpoint)) continue;
Helpers.Log(2, $"Connecting to {endpoint}...");
try
{
await _tcpClient.ConnectAsync(endpoint.Address, endpoint.Port);
_dcSession.DataCenter = dcOption;
break;
}
catch (SocketException) { }
}
}
if (!_tcpClient.Connected)
{
endpoint = Compat.IPEndPoint_Parse(Config("server_address")); // re-ask callback for an address
if (!triedEndpoints.Add(endpoint)) throw;
_dcSession.Client = null;
// is it address for a known DCSession?
_dcSession = _session.DCSessions.Values.FirstOrDefault(dcs => dcs.EndPoint.Equals(endpoint));
_dcSession ??= new() { Id = Helpers.RandomLong() };
_dcSession.Client = this;
Helpers.Log(2, $"Connecting to {endpoint}...");
await _tcpClient.ConnectAsync(endpoint.Address, endpoint.Port);
}
}
_networkStream = _tcpClient.GetStream();
await _networkStream.WriteAsync(IntermediateHeader, 0, 4);
_cts = new();
@ -178,10 +227,11 @@ namespace WTelegram
Config("lang_pack"),
Config("lang_code"),
Schema.Help_GetConfig));
_session.DcOptions = TLConfig.dc_options;
_saltChangeCounter = 0;
if (_dcSession.DataCenter == null)
{
_dcSession.DataCenter = TLConfig.dc_options.Where(dc => dc.id == TLConfig.this_dc)
_dcSession.DataCenter = _session.DcOptions.Where(dc => dc.id == TLConfig.this_dc)
.OrderByDescending(dc => dc.ip_address == endpoint.Address.ToString())
.ThenByDescending(dc => dc.port == endpoint.Port).First();
_session.DCSessions[TLConfig.this_dc] = _dcSession;
@ -195,13 +245,14 @@ namespace WTelegram
Helpers.Log(2, $"Connected to {(TLConfig.test_mode ? "Test DC" : "DC")} {TLConfig.this_dc}... {TLConfig.flags & (Config.Flags)~0xE00}");
}
public async Task<Client> GetClientForDC(int dcId, bool connect = true)
public async Task<Client> GetClientForDC(int dcId, bool media_only = true, bool connect = true)
{
if (_dcSession.DataCenter?.id == dcId) return this;
Session.DCSession altSession;
lock (_session)
{
altSession = GetOrCreateDCSession(dcId);
altSession = GetOrCreateDCSession(dcId, _dcSession.DataCenter.flags | (media_only ? DcOption.Flags.media_only : 0));
if (!altSession.Client.Connected) { altSession.Client.Dispose(); altSession.Client = null; }
altSession.Client ??= new Client(this, altSession);
}
Helpers.Log(2, $"Requested connection to DC {dcId}...");
@ -231,18 +282,17 @@ namespace WTelegram
return altSession.Client;
}
private Session.DCSession GetOrCreateDCSession(int dcId)
private Session.DCSession GetOrCreateDCSession(int dcId, DcOption.Flags flags)
{
if (_session.DCSessions.TryGetValue(dcId, out var dcSession))
return dcSession;
var prevFamily = _tcpClient.Client.RemoteEndPoint.AddressFamily;
var dcOptions = TLConfig.dc_options.Where(dc => dc.id == dcId && (dc.flags & (DcOption.Flags.media_only | DcOption.Flags.cdn)) == 0);
if (prevFamily == AddressFamily.InterNetworkV6) // try to stay in the same connectivity
dcOptions = dcOptions.OrderByDescending(dc => dc.flags & DcOption.Flags.ipv6); // list ipv6 first
else
dcOptions = dcOptions.OrderBy(dc => dc.flags & DcOption.Flags.ipv6); // list ipv4 first
if (dcSession.Client != null || dcSession.DataCenter.flags == flags)
return dcSession; // if we have already a session with this DC and we are connected or it is a perfect match, use it
// try to find the most appropriate DcOption for this DC
var dcOptions = _session.DcOptions.Where(dc => dc.id == dcId).OrderBy(dc => dc.flags ^ flags);
var dcOption = dcOptions.FirstOrDefault() ?? throw new ApplicationException($"Could not find adequate dc_option for DC {dcId}");
return dcSession = _session.DCSessions[dcId] = new Session.DCSession { DataCenter = dcOption, Id = Helpers.RandomLong() };
dcSession ??= new Session.DCSession { Id = Helpers.RandomLong() }; // create new session only if not already existing
dcSession.DataCenter = dcOption;
return _session.DCSessions[dcId] = dcSession;
}
internal DateTime MsgIdToStamp(long serverMsgId)
@ -587,7 +637,15 @@ namespace WTelegram
object result;
if (tcs != null)
{
if (!type.IsArray)
result = reader.ReadTLValue(type);
else if (reader.ReadUInt32() == Layer.RpcErrorCtor)
result = reader.ReadTLObject(Layer.RpcErrorCtor);
else
{
reader.BaseStream.Position -= 4;
result = reader.ReadTLValue(type);
}
if (type.IsEnum) result = Enum.ToObject(type, result);
Log(1, "");
tcs.SetResult(result);
@ -652,7 +710,7 @@ namespace WTelegram
// this is a hack to migrate _dcSession in-place (staying in same Client):
Session.DCSession dcSession;
lock (_session)
dcSession = GetOrCreateDCSession(number);
dcSession = GetOrCreateDCSession(number, _dcSession.DataCenter.flags);
Reset(false, false);
_session.MainDC = number;
_dcSession.Client = null;
@ -1060,7 +1118,7 @@ namespace WTelegram
const int ChunkSize = 128 * 1024;
int fileSize = 0;
Upload_File fileData;
var client = fileDC == 0 ? this : await GetClientForDC(fileDC);
var client = fileDC == 0 ? this : await GetClientForDC(fileDC, true);
do
{
Upload_FileBase fileBase;
@ -1072,7 +1130,7 @@ namespace WTelegram
catch (RpcException ex) when (ex.Code == 303 && ex.Message.StartsWith("FILE_MIGRATE_"))
{
var dcId = int.Parse(ex.Message[13..]);
client = await GetClientForDC(dcId);
client = await GetClientForDC(dcId, true);
fileBase = await client.Upload_GetFile(fileLocation, fileSize, ChunkSize);
}
fileData = fileBase as Upload_File;

View file

@ -50,7 +50,7 @@ namespace WTelegram
internal static IPEndPoint IPEndPoint_Parse(string addr)
{
int colon = addr.IndexOf(':');
int colon = addr.LastIndexOf(':');
return new IPEndPoint(IPAddress.Parse(addr[0..colon]), int.Parse(addr[(colon + 1)..]));
}

View file

@ -24,6 +24,7 @@ namespace TL
public override string Title => null;
public override bool IsBanned(ChatBannedRights.Flags flags = 0) => true;
protected override InputPeer ToInputPeer() => null;
public override string ToString() => $"ChatEmpty {id}";
}
partial class Chat
{
@ -31,6 +32,7 @@ namespace TL
public override string Title => title;
public override bool IsBanned(ChatBannedRights.Flags flags = 0) => ((default_banned_rights?.flags ?? 0) & flags) != 0;
protected override InputPeer ToInputPeer() => new InputPeerChat { chat_id = id };
public override string ToString() => $"Chat \"{title}\"";
}
partial class ChatForbidden
{
@ -38,6 +40,7 @@ namespace TL
public override string Title => title;
public override bool IsBanned(ChatBannedRights.Flags flags = 0) => true;
protected override InputPeer ToInputPeer() => new InputPeerChat { chat_id = id };
public override string ToString() => $"ChatForbidden {id} \"{title}\"";
}
partial class Channel
{
@ -46,6 +49,8 @@ namespace TL
public override bool IsBanned(ChatBannedRights.Flags flags = 0) => ((banned_rights?.flags ?? 0) & flags) != 0 || ((default_banned_rights?.flags ?? 0) & flags) != 0;
protected override InputPeer ToInputPeer() => new InputPeerChannel { channel_id = id, access_hash = access_hash };
public static implicit operator InputChannel(Channel channel) => new() { channel_id = channel.id, access_hash = channel.access_hash };
public override string ToString() =>
(flags.HasFlag(Flags.broadcast) ? "Channel " : "Group ") + (username != null ? '@' + username : $"\"{title}\"");
}
partial class ChannelForbidden
{
@ -53,6 +58,7 @@ namespace TL
public override string Title => title;
public override bool IsBanned(ChatBannedRights.Flags flags = 0) => true;
protected override InputPeer ToInputPeer() => new InputPeerChannel { channel_id = id, access_hash = access_hash };
public override string ToString() => $"ChannelForbidden {id} \"{title}\"";
}
partial class UserBase

View file

@ -14,6 +14,7 @@ namespace WTelegram
public TL.User User;
public int MainDC;
public Dictionary<int, DCSession> DCSessions = new();
public TL.DcOption[] DcOptions;
public class DCSession
{

View file

@ -10,6 +10,7 @@ namespace TL
internal const uint VectorCtor = 0x1CB5C415;
internal const uint NullCtor = 0x56730BCC;
internal const uint RpcResultCtor = 0xF35C6D01;
internal const uint RpcErrorCtor = 0x2144CA19;
internal const uint MsgContainerCtor = 0x73F1F8DC;
internal readonly static Dictionary<uint, Type> Table = new()