From fbd85f1b31ca42d45cb6323e32b7e8de51e69396 Mon Sep 17 00:00:00 2001 From: Morten Nielsen Date: Fri, 31 Jul 2020 00:17:11 -0700 Subject: [PATCH] Ntrip client returns an independent stream instead of using events --- src/NmeaParser/Gnss/Ntrip/Client.cs | 141 ++++++++++----------- src/SampleApp.WinDesktop/NtripView.xaml.cs | 68 +++------- 2 files changed, 87 insertions(+), 122 deletions(-) diff --git a/src/NmeaParser/Gnss/Ntrip/Client.cs b/src/NmeaParser/Gnss/Ntrip/Client.cs index a519ab5..0a1aeb1 100644 --- a/src/NmeaParser/Gnss/Ntrip/Client.cs +++ b/src/NmeaParser/Gnss/Ntrip/Client.cs @@ -14,8 +14,10 @@ using System; using System.Collections.Generic; +using System.IO; using System.Linq; using System.Net.Sockets; +using System.Threading; using System.Threading.Tasks; namespace NmeaParser.Gnss.Ntrip @@ -23,14 +25,11 @@ namespace NmeaParser.Gnss.Ntrip /// /// NTRIP Client for querying an NTRIP server and opening an NTRIP stream /// - public class Client : IDisposable + public class Client { private readonly string _host; private readonly int _port; private string? _auth; - private Socket? sckt; - private bool connected; - private Task? runningTask; /// /// Initializes a new instance of the class @@ -110,97 +109,97 @@ namespace NmeaParser.Gnss.Ntrip sckt.Send(data); return sckt; } + /// /// Connects to the endpoint for the specified /// /// - public void Connect(NtripStream stream) + public Stream OpenStream(NtripStream stream) { if (stream == null) throw new ArgumentNullException(nameof(stream)); - Connect(stream.Mountpoint); + return OpenStream(stream.Mountpoint); } /// /// Connects to the endpoint for the specified /// - /// - public void Connect(string mountPoint) + /// + public Stream OpenStream(string mountPoint) { if (mountPoint == null) throw new ArgumentNullException(nameof(mountPoint)); if (string.IsNullOrWhiteSpace(mountPoint)) throw new ArgumentException(nameof(mountPoint)); - if (sckt != null) throw new Exception("Connection already open"); - sckt = Request(mountPoint); - connected = true; - runningTask = Task.Run(ReceiveThread); - } - - private async Task ReceiveThread() - { - byte[] buffer = new byte[65536]; - while (connected && sckt != null) - { - int count = sckt.Receive(buffer); - if (count > 0) - { - DataReceived?.Invoke(this, buffer.Take(count).ToArray()); - } - await Task.Yield(); - if (!sckt.Connected) - { - if (connected) - { - connected = false; - Disconnected?.Invoke(this, EventArgs.Empty); - } - break; - } - } - sckt?.Shutdown(SocketShutdown.Both); - sckt?.Dispose(); - sckt = null; + var sckt = Request(mountPoint); + return new NtripDataStream(sckt); } - /// - /// Shuts down the stream - /// - /// - public Task CloseAsync() + private class NtripDataStream : System.IO.Stream { - if (runningTask != null) + private Socket m_socket; + public NtripDataStream(Socket socket) { - connected = false; - var t = runningTask; - runningTask = null; - return t; + m_socket = socket; + } + public override bool CanRead => true; + + public override bool CanSeek => false; + + public override bool CanWrite => false; + + public override long Length => -1; + + long position = 0; + public override long Position { get => position; set => throw new NotSupportedException(); } + + public override void Flush() => throw new NotSupportedException(); + + public override int Read(byte[] buffer, int offset, int count) + { + int read = m_socket.Receive(buffer, offset, count, SocketFlags.None); + position += read; + return read; + } +#if !NETSTANDARD1_4 + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + TaskCompletionSource tcs = new TaskCompletionSource(); + if (cancellationToken.CanBeCanceled) + cancellationToken.Register(() => tcs.TrySetCanceled()); + m_socket.BeginReceive(buffer, offset, count, SocketFlags.None, ReceiveCallback, tcs); + return tcs.Task; + } + + private void ReceiveCallback(IAsyncResult ar) + { + TaskCompletionSource tcs = (TaskCompletionSource)ar.AsyncState; + if (tcs.Task.IsCanceled) return; + try + { + int bytesRead = m_socket.EndReceive(ar); + position += bytesRead; + tcs.TrySetResult(bytesRead); + } + catch (System.Exception ex) + { + tcs.TrySetException(ex); + } } -#if NETSTANDARD || NETFX - return Task.FromResult(null); -#else - return Task.CompletedTask; #endif + public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(); + + public override void SetLength(long value) => throw new NotSupportedException(); + + public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException(); + protected override void Dispose(bool disposing) + { + m_socket.Dispose(); + base.Dispose(disposing); + } + + public override int ReadTimeout { get => m_socket.ReceiveTimeout; set => m_socket.ReceiveTimeout = value; } } - - /// - public void Dispose() - { - _ = CloseAsync(); - } - - /// - /// Fired when bytes has been received from the stream - /// - public event EventHandler? DataReceived; - - /// - /// Fired if the socket connection was dropped, and the connection was closed. - /// - /// - /// This event is useful for handling network glitches, and trying to retry connection by calling again a few times. - /// - public event EventHandler? Disconnected; } } diff --git a/src/SampleApp.WinDesktop/NtripView.xaml.cs b/src/SampleApp.WinDesktop/NtripView.xaml.cs index 565576f..583ab89 100644 --- a/src/SampleApp.WinDesktop/NtripView.xaml.cs +++ b/src/SampleApp.WinDesktop/NtripView.xaml.cs @@ -2,6 +2,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.IO; using System.Linq; using System.Text; using System.Threading.Tasks; @@ -36,7 +37,6 @@ namespace SampleApp.WinDesktop return; } client = new NmeaParser.Gnss.Ntrip.Client(host.Text, portNumber, username.Text, password.Password); - client.DataReceived += Client_DataReceived; List sources; try { @@ -49,66 +49,32 @@ namespace SampleApp.WinDesktop } sourceList.ItemsSource = sources.OrderBy(s=>s.CountryCode); } - Func stop; - private async void Connect_Click(object sender, RoutedEventArgs e) + Stream ntripStream; + private void Connect_Click(object sender, RoutedEventArgs e) { - var stream = ((Button)sender).DataContext as NtripStream; - if (stream == null) + var streaminfo = ((Button)sender).DataContext as NtripStream; + if (streaminfo == null) return; - if (stop != null) + ntripStream?.Dispose(); + var stream = ntripStream = client.OpenStream(streaminfo.Mountpoint); + _ = Task.Run(async () => { - try + byte[] buffer = new byte[1024]; + while (true) { - await stop(); - } - catch { } - stop = null; - } - counter = 0; - client.Connect(stream.Mountpoint); - client.Disconnected += (s, e) => - { - Debug.WriteLine("NTRIP Stream Disconnected. Retrying..."); - // Try and reconnect after a disconnect - client.Connect(stream.Mountpoint); - }; - stop = () => client.CloseAsync(); - ntripstatus.Text = $"Connected"; - } - - System.Threading.Tasks.Task writingTask; - object writeLock = new object(); - long counter = 0; - private async void Client_DataReceived(object sender, byte[] rtcm) - { - var device = MainWindow.currentDevice; - if (device != null && device.CanWrite) - { - try - { - //lock (writeLock) + var count = await stream.ReadAsync(buffer); + var device = MainWindow.currentDevice; + if (device != null && device.CanWrite) { - await device.WriteAsync(rtcm, 0, rtcm.Length); - counter += rtcm.Length; + await device.WriteAsync(buffer, 0, count); Dispatcher.Invoke(() => { - ntripstatus.Text = $"Transmitted {counter} bytes"; + ntripstatus.Text = $"Transmitted {ntripStream.Position} bytes"; }); } } - catch - { - - } - } - //ParseData(rtcm); - } - Queue rtcmData = new Queue(); - private void ParseData(byte[] rtcm) - { - foreach (var b in rtcm) - rtcmData.Enqueue(b); - + }); + ntripstatus.Text = $"Connected"; } } }