diff --git a/src/NmeaParser/Gnss/Ntrip/Client.cs b/src/NmeaParser/Gnss/Ntrip/Client.cs
index a519ab5..0a1aeb1 100644
--- a/src/NmeaParser/Gnss/Ntrip/Client.cs
+++ b/src/NmeaParser/Gnss/Ntrip/Client.cs
@@ -14,8 +14,10 @@
using System;
using System.Collections.Generic;
+using System.IO;
using System.Linq;
using System.Net.Sockets;
+using System.Threading;
using System.Threading.Tasks;
namespace NmeaParser.Gnss.Ntrip
@@ -23,14 +25,11 @@ namespace NmeaParser.Gnss.Ntrip
///
/// NTRIP Client for querying an NTRIP server and opening an NTRIP stream
///
- public class Client : IDisposable
+ public class Client
{
private readonly string _host;
private readonly int _port;
private string? _auth;
- private Socket? sckt;
- private bool connected;
- private Task? runningTask;
///
/// Initializes a new instance of the class
@@ -110,97 +109,97 @@ namespace NmeaParser.Gnss.Ntrip
sckt.Send(data);
return sckt;
}
+
///
/// Connects to the endpoint for the specified
///
///
- public void Connect(NtripStream stream)
+ public Stream OpenStream(NtripStream stream)
{
if (stream == null)
throw new ArgumentNullException(nameof(stream));
- Connect(stream.Mountpoint);
+ return OpenStream(stream.Mountpoint);
}
///
/// Connects to the endpoint for the specified
///
- ///
- public void Connect(string mountPoint)
+ ///
+ public Stream OpenStream(string mountPoint)
{
if (mountPoint == null)
throw new ArgumentNullException(nameof(mountPoint));
if (string.IsNullOrWhiteSpace(mountPoint))
throw new ArgumentException(nameof(mountPoint));
- if (sckt != null) throw new Exception("Connection already open");
- sckt = Request(mountPoint);
- connected = true;
- runningTask = Task.Run(ReceiveThread);
- }
-
- private async Task ReceiveThread()
- {
- byte[] buffer = new byte[65536];
- while (connected && sckt != null)
- {
- int count = sckt.Receive(buffer);
- if (count > 0)
- {
- DataReceived?.Invoke(this, buffer.Take(count).ToArray());
- }
- await Task.Yield();
- if (!sckt.Connected)
- {
- if (connected)
- {
- connected = false;
- Disconnected?.Invoke(this, EventArgs.Empty);
- }
- break;
- }
- }
- sckt?.Shutdown(SocketShutdown.Both);
- sckt?.Dispose();
- sckt = null;
+ var sckt = Request(mountPoint);
+ return new NtripDataStream(sckt);
}
- ///
- /// Shuts down the stream
- ///
- ///
- public Task CloseAsync()
+ private class NtripDataStream : System.IO.Stream
{
- if (runningTask != null)
+ private Socket m_socket;
+ public NtripDataStream(Socket socket)
{
- connected = false;
- var t = runningTask;
- runningTask = null;
- return t;
+ m_socket = socket;
+ }
+ public override bool CanRead => true;
+
+ public override bool CanSeek => false;
+
+ public override bool CanWrite => false;
+
+ public override long Length => -1;
+
+ long position = 0;
+ public override long Position { get => position; set => throw new NotSupportedException(); }
+
+ public override void Flush() => throw new NotSupportedException();
+
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ int read = m_socket.Receive(buffer, offset, count, SocketFlags.None);
+ position += read;
+ return read;
+ }
+#if !NETSTANDARD1_4
+ public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ TaskCompletionSource tcs = new TaskCompletionSource();
+ if (cancellationToken.CanBeCanceled)
+ cancellationToken.Register(() => tcs.TrySetCanceled());
+ m_socket.BeginReceive(buffer, offset, count, SocketFlags.None, ReceiveCallback, tcs);
+ return tcs.Task;
+ }
+
+ private void ReceiveCallback(IAsyncResult ar)
+ {
+ TaskCompletionSource tcs = (TaskCompletionSource)ar.AsyncState;
+ if (tcs.Task.IsCanceled) return;
+ try
+ {
+ int bytesRead = m_socket.EndReceive(ar);
+ position += bytesRead;
+ tcs.TrySetResult(bytesRead);
+ }
+ catch (System.Exception ex)
+ {
+ tcs.TrySetException(ex);
+ }
}
-#if NETSTANDARD || NETFX
- return Task.FromResult