diff --git a/src/NmeaParser/BufferedStreamDevice.cs b/src/NmeaParser/BufferedStreamDevice.cs index 3099fe5..e51b9e1 100644 --- a/src/NmeaParser/BufferedStreamDevice.cs +++ b/src/NmeaParser/BufferedStreamDevice.cs @@ -14,9 +14,12 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.IO; using System.Linq; +using System.Runtime.CompilerServices; using System.Text; +using System.Threading; using System.Threading.Tasks; namespace NmeaParser @@ -28,7 +31,7 @@ namespace NmeaParser public abstract class BufferedStreamDevice : NmeaDevice { private BufferedStream? m_stream; - private readonly int m_readSpeed; + private readonly BurstEmulationSettings emulationSettings = new BurstEmulationSettings(); /// /// Initializes a new instance of the class. @@ -39,10 +42,10 @@ namespace NmeaParser /// /// Initializes a new instance of the class. /// - /// The time to wait between each group of lines being read in milliseconds - protected BufferedStreamDevice(int readSpeed) + /// The time to wait between each group of lines being read in milliseconds + protected BufferedStreamDevice(int burstRate) { - m_readSpeed = readSpeed; + BurstRate = TimeSpan.FromMilliseconds(burstRate); } /// @@ -55,12 +58,53 @@ namespace NmeaParser /// protected sealed async override Task OpenStreamAsync() { - var stream = await GetStreamAsync(); + var stream = await GetStreamAsync().ConfigureAwait(false); StreamReader sr = new StreamReader(stream); - m_stream = new BufferedStream(sr, m_readSpeed); + m_stream = new BufferedStream(sr, emulationSettings); return m_stream; } + /// + /// Gets or sets the emulated baud rate. Defaults to 115200 + /// + /// + /// Note that if the baud rate gets very low, while keeping a high , the stream will not be able to keep + /// up the burstrate. For high-frequency bursts, make sure you have a corresponding high emualated baud rate. + /// + public uint EmulatedBaudRate + { + get => emulationSettings.EmulatedBaudRate; + set => emulationSettings.EmulatedBaudRate = value; + } + + /// + /// Gets or sets the emulated burst rate - that is the frequency of each burst of messages. Defaults to 1 second (1hz). + /// + /// + /// Note that if the burst rate gets very high, while keeping a low , the stream will not be able to keep + /// up the burstrate. For high-frequency bursts, make sure you have a corresponding high emualated baud rate. + /// + public TimeSpan BurstRate + { + get => emulationSettings.BurstRate; + set + { + if (value.TotalMilliseconds < 1) + throw new ArgumentOutOfRangeException(nameof(BurstRate), "Burst rate must be at least 1 ms"); + emulationSettings.BurstRate = value; + } + } + + /// + /// Gets or sets the separator between each burst of data. Defaults to . + /// + /// + public BurstEmulationSeparator BurstSeparator + { + get => emulationSettings.Separator; + set => emulationSettings.Separator = value; + } + /// protected override Task CloseStreamAsync(System.IO.Stream stream) { @@ -68,54 +112,97 @@ namespace NmeaParser return Task.FromResult(true); } + private class BurstEmulationSettings + { + public uint EmulatedBaudRate { get; set; } = 115200; + public TimeSpan BurstRate { get; set; } = TimeSpan.FromSeconds(1); + public BurstEmulationSeparator Separator { get; set; } + } + + /// + /// Defined how a burst of data is separated + /// + /// + public enum BurstEmulationSeparator + { + /// + /// The first NMEA token encountered will be used as an indicator for pauses between bursts + /// + FirstToken, + /// + /// An empty line in the NMEA stream should indicate a pause in the burst of messages + /// + EmptyLine + } + // stream that slowly populates a buffer from a StreamReader to simulate nmea messages coming // in lastLineRead by lastLineRead at a steady stream private class BufferedStream : Stream { - private bool isDisposed; private readonly StreamReader m_sr; private byte[] m_buffer = new byte[0]; - private readonly System.Threading.Timer m_timer; private readonly object lockObj = new object(); private string? groupToken = null; - private string? lastLineRead = null; + private BurstEmulationSettings m_settings; + private CancellationTokenSource m_tcs; + private Task m_readTask; + /// /// Initializes a new instance of the class. /// /// The stream. - /// The read speed in milliseconds. - public BufferedStream(StreamReader stream, int readSpeed) + /// Emulation settings. + public BufferedStream(StreamReader stream, BurstEmulationSettings settings) { + m_settings = settings; m_sr = stream; - m_timer = new System.Threading.Timer(OnRead, null, 0, readSpeed); //read a group of lines every 'readSpeed' milliseconds + m_tcs = new CancellationTokenSource(); + m_readTask = StartReadLoop(m_tcs.Token); } - private void OnRead(object state) + + private async Task StartReadLoop(CancellationToken cancellationToken) { - if (lastLineRead != null) - AppendToBuffer(lastLineRead); - //Get the group token if we don't have one - while (groupToken == null && (lastLineRead == null || !lastLineRead.StartsWith("$", StringComparison.Ordinal))) + await Task.Yield(); + var start = Stopwatch.GetTimestamp(); + while (!cancellationToken.IsCancellationRequested) { - lastLineRead = ReadLine(); //seek forward to first nmea token - if (isDisposed) - return; - if(lastLineRead != null) - AppendToBuffer(lastLineRead); + var line = ReadLine(); + if (line != null) + { + // Group token is the first message type received - every time we see it, we'll take a short burst break + if (groupToken == null && line.StartsWith("$", StringComparison.Ordinal)) + { + var values = line.Trim().Split(new char[] { ',' }); + if (values.Length > 0) + groupToken = values[0]; + } + if (m_settings.Separator == BurstEmulationSeparator.EmptyLine && string.IsNullOrWhiteSpace(line) || + m_settings.Separator == BurstEmulationSeparator.FirstToken && groupToken != null && line.StartsWith(groupToken, StringComparison.Ordinal)) + { + // Emulate the burst pause + var now = Stopwatch.GetTimestamp(); + var delay = (now - start) / (double)Stopwatch.Frequency; + if (delay < m_settings.BurstRate.TotalSeconds) + await Task.Delay(TimeSpan.FromSeconds(m_settings.BurstRate.TotalSeconds - delay)).ConfigureAwait(false); + else + { + Debug.WriteLine("Warning: baud rate too slow for amount of data, or burst rate too fast"); + } + if (cancellationToken.IsCancellationRequested) + return; + start = Stopwatch.GetTimestamp(); + } + if (!string.IsNullOrWhiteSpace(line)) + { + await AppendToBuffer(line).ConfigureAwait(false); + } + } } - if(groupToken == null && lastLineRead != null) - { - var values = lastLineRead.Trim().Split(new char[] { ',' }); - if (values.Length > 0) - groupToken = values[0]; - } - lastLineRead = ReadLine(); - while (!isDisposed && lastLineRead?.StartsWith(groupToken, StringComparison.Ordinal) == false) //keep reading until messages start repeating again - { - AppendToBuffer(lastLineRead); - lastLineRead = ReadLine(); - } } - private void AppendToBuffer(string line) + + private double pendingDelay = 0; + + private async Task AppendToBuffer(string line) { var bytes = Encoding.UTF8.GetBytes(line); lock (lockObj) @@ -125,39 +212,49 @@ namespace NmeaParser bytes.CopyTo(newBuffer, m_buffer.Length); m_buffer = newBuffer; } + var delay = bytes.Length * 10d / m_settings.EmulatedBaudRate; // 8 bits + 1 parity + 1 stop bit = 10bits per byte; + + pendingDelay += delay; + if (pendingDelay < 0.016) //No reason to wait under the 16ms - Task.Delay not that accurate anyway + { + return; + } + // Task.Delay isn't very accurate so use the stopwatch to get the real delay and use difference to fix it later + var start = Stopwatch.GetTimestamp(); + await Task.Delay(TimeSpan.FromSeconds(pendingDelay)).ConfigureAwait(false); + var end = Stopwatch.GetTimestamp(); + pendingDelay -= (end - start) / (double)Stopwatch.Frequency; } + private string? ReadLine() { - if (isDisposed) + if (m_tcs.IsCancellationRequested) return null; if (m_sr.EndOfStream) m_sr.BaseStream.Seek(0, SeekOrigin.Begin); //start over return m_sr.ReadLine() + '\n'; } - - /// - public override bool CanRead { get { return true; } } /// - public override bool CanSeek { get { return false; } } + public override bool CanRead => true; /// - public override bool CanWrite { get { return false; } } + public override bool CanSeek => false; + + /// + public override bool CanWrite => false; /// public override void Flush() { } /// - public override long Length { get { return m_sr.BaseStream.Length; } } + public override long Length => m_sr.BaseStream.Length; /// public override long Position { - get { return m_sr.BaseStream.Position; } - set - { - throw new NotSupportedException(); - } + get => m_sr.BaseStream.Position; + set => throw new NotSupportedException(); } /// @@ -184,28 +281,18 @@ namespace NmeaParser } /// - public override long Seek(long offset, SeekOrigin origin) - { - throw new NotSupportedException(); - } + public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(); /// - public override void SetLength(long value) - { - throw new NotSupportedException(); - } + public override void SetLength(long value) => throw new NotSupportedException(); /// - public override void Write(byte[] buffer, int offset, int count) - { - throw new NotSupportedException(); - } - + public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException(); + /// protected override void Dispose(bool disposing) { - isDisposed = true; - m_timer.Dispose(); + m_tcs.Cancel(); m_sr.Dispose(); base.Dispose(disposing); } diff --git a/src/SampleApp.WinDesktop/MainWindow.xaml.cs b/src/SampleApp.WinDesktop/MainWindow.xaml.cs index 84aa06c..2d93ca0 100644 --- a/src/SampleApp.WinDesktop/MainWindow.xaml.cs +++ b/src/SampleApp.WinDesktop/MainWindow.xaml.cs @@ -36,7 +36,7 @@ namespace SampleApp.WinDesktop //var device = new NmeaParser.SerialPortDevice(portName); //Use a log file for playing back logged data - var device = new NmeaParser.NmeaFileDevice("NmeaSampleData.txt"); + var device = new NmeaParser.NmeaFileDevice("NmeaSampleData.txt") { EmulatedBaudRate = 9600, BurstRate = TimeSpan.FromSeconds(1d) }; StartDevice(device); }