Ntrip client returns an independent stream instead of using events

This commit is contained in:
Morten Nielsen 2020-07-31 00:17:11 -07:00
parent 1331ad8fcf
commit fbd85f1b31
2 changed files with 87 additions and 122 deletions

View file

@ -14,8 +14,10 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
namespace NmeaParser.Gnss.Ntrip
@ -23,14 +25,11 @@ namespace NmeaParser.Gnss.Ntrip
/// <summary>
/// NTRIP Client for querying an NTRIP server and opening an NTRIP stream
/// </summary>
public class Client : IDisposable
public class Client
{
private readonly string _host;
private readonly int _port;
private string? _auth;
private Socket? sckt;
private bool connected;
private Task? runningTask;
/// <summary>
/// Initializes a new instance of the <see cref="Client"/> class
@ -110,97 +109,97 @@ namespace NmeaParser.Gnss.Ntrip
sckt.Send(data);
return sckt;
}
/// <summary>
/// Connects to the endpoint for the specified <see cref="NtripStream.Mountpoint"/>
/// </summary>
/// <param name="stream"></param>
public void Connect(NtripStream stream)
public Stream OpenStream(NtripStream stream)
{
if (stream == null)
throw new ArgumentNullException(nameof(stream));
Connect(stream.Mountpoint);
return OpenStream(stream.Mountpoint);
}
/// <summary>
/// Connects to the endpoint for the specified <see cref="NtripStream.Mountpoint"/>
/// </summary>
/// <param name="mountPoint"></param>
public void Connect(string mountPoint)
/// <param name="mountPoint"></param>
public Stream OpenStream(string mountPoint)
{
if (mountPoint == null)
throw new ArgumentNullException(nameof(mountPoint));
if (string.IsNullOrWhiteSpace(mountPoint))
throw new ArgumentException(nameof(mountPoint));
if (sckt != null) throw new Exception("Connection already open");
sckt = Request(mountPoint);
connected = true;
runningTask = Task.Run(ReceiveThread);
}
private async Task ReceiveThread()
{
byte[] buffer = new byte[65536];
while (connected && sckt != null)
{
int count = sckt.Receive(buffer);
if (count > 0)
{
DataReceived?.Invoke(this, buffer.Take(count).ToArray());
}
await Task.Yield();
if (!sckt.Connected)
{
if (connected)
{
connected = false;
Disconnected?.Invoke(this, EventArgs.Empty);
}
break;
}
}
sckt?.Shutdown(SocketShutdown.Both);
sckt?.Dispose();
sckt = null;
var sckt = Request(mountPoint);
return new NtripDataStream(sckt);
}
/// <summary>
/// Shuts down the stream
/// </summary>
/// <returns></returns>
public Task CloseAsync()
private class NtripDataStream : System.IO.Stream
{
if (runningTask != null)
private Socket m_socket;
public NtripDataStream(Socket socket)
{
connected = false;
var t = runningTask;
runningTask = null;
return t;
m_socket = socket;
}
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => -1;
long position = 0;
public override long Position { get => position; set => throw new NotSupportedException(); }
public override void Flush() => throw new NotSupportedException();
public override int Read(byte[] buffer, int offset, int count)
{
int read = m_socket.Receive(buffer, offset, count, SocketFlags.None);
position += read;
return read;
}
#if !NETSTANDARD1_4
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
TaskCompletionSource<int> tcs = new TaskCompletionSource<int>();
if (cancellationToken.CanBeCanceled)
cancellationToken.Register(() => tcs.TrySetCanceled());
m_socket.BeginReceive(buffer, offset, count, SocketFlags.None, ReceiveCallback, tcs);
return tcs.Task;
}
private void ReceiveCallback(IAsyncResult ar)
{
TaskCompletionSource<int> tcs = (TaskCompletionSource<int>)ar.AsyncState;
if (tcs.Task.IsCanceled) return;
try
{
int bytesRead = m_socket.EndReceive(ar);
position += bytesRead;
tcs.TrySetResult(bytesRead);
}
catch (System.Exception ex)
{
tcs.TrySetException(ex);
}
}
#if NETSTANDARD || NETFX
return Task.FromResult<object?>(null);
#else
return Task.CompletedTask;
#endif
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
protected override void Dispose(bool disposing)
{
m_socket.Dispose();
base.Dispose(disposing);
}
public override int ReadTimeout { get => m_socket.ReceiveTimeout; set => m_socket.ReceiveTimeout = value; }
}
/// <inheritdoc />
public void Dispose()
{
_ = CloseAsync();
}
/// <summary>
/// Fired when bytes has been received from the stream
/// </summary>
public event EventHandler<byte[]>? DataReceived;
/// <summary>
/// Fired if the socket connection was dropped, and the connection was closed.
/// </summary>
/// <remarks>
/// This event is useful for handling network glitches, and trying to retry connection by calling <see cref="Connect(string)"/> again a few times.
/// </remarks>
public event EventHandler? Disconnected;
}
}

View file

@ -2,6 +2,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
@ -36,7 +37,6 @@ namespace SampleApp.WinDesktop
return;
}
client = new NmeaParser.Gnss.Ntrip.Client(host.Text, portNumber, username.Text, password.Password);
client.DataReceived += Client_DataReceived;
List<NtripStream> sources;
try
{
@ -49,66 +49,32 @@ namespace SampleApp.WinDesktop
}
sourceList.ItemsSource = sources.OrderBy(s=>s.CountryCode);
}
Func<Task> stop;
private async void Connect_Click(object sender, RoutedEventArgs e)
Stream ntripStream;
private void Connect_Click(object sender, RoutedEventArgs e)
{
var stream = ((Button)sender).DataContext as NtripStream;
if (stream == null)
var streaminfo = ((Button)sender).DataContext as NtripStream;
if (streaminfo == null)
return;
if (stop != null)
ntripStream?.Dispose();
var stream = ntripStream = client.OpenStream(streaminfo.Mountpoint);
_ = Task.Run(async () =>
{
try
byte[] buffer = new byte[1024];
while (true)
{
await stop();
}
catch { }
stop = null;
}
counter = 0;
client.Connect(stream.Mountpoint);
client.Disconnected += (s, e) =>
{
Debug.WriteLine("NTRIP Stream Disconnected. Retrying...");
// Try and reconnect after a disconnect
client.Connect(stream.Mountpoint);
};
stop = () => client.CloseAsync();
ntripstatus.Text = $"Connected";
}
System.Threading.Tasks.Task writingTask;
object writeLock = new object();
long counter = 0;
private async void Client_DataReceived(object sender, byte[] rtcm)
{
var device = MainWindow.currentDevice;
if (device != null && device.CanWrite)
{
try
{
//lock (writeLock)
var count = await stream.ReadAsync(buffer);
var device = MainWindow.currentDevice;
if (device != null && device.CanWrite)
{
await device.WriteAsync(rtcm, 0, rtcm.Length);
counter += rtcm.Length;
await device.WriteAsync(buffer, 0, count);
Dispatcher.Invoke(() =>
{
ntripstatus.Text = $"Transmitted {counter} bytes";
ntripstatus.Text = $"Transmitted {ntripStream.Position} bytes";
});
}
}
catch
{
}
}
//ParseData(rtcm);
}
Queue<byte> rtcmData = new Queue<byte>();
private void ParseData(byte[] rtcm)
{
foreach (var b in rtcm)
rtcmData.Enqueue(b);
});
ntripstatus.Text = $"Connected";
}
}
}