Improve buffered stream to more accurately emulate slow baud rates

This commit is contained in:
Morten Nielsen 2020-07-28 20:34:03 -07:00
parent 9a71ca4db6
commit 6a19aaf44a
2 changed files with 150 additions and 63 deletions

View file

@ -14,9 +14,12 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace NmeaParser
@ -28,7 +31,7 @@ namespace NmeaParser
public abstract class BufferedStreamDevice : NmeaDevice
{
private BufferedStream? m_stream;
private readonly int m_readSpeed;
private readonly BurstEmulationSettings emulationSettings = new BurstEmulationSettings();
/// <summary>
/// Initializes a new instance of the <see cref="BufferedStreamDevice"/> class.
@ -39,10 +42,10 @@ namespace NmeaParser
/// <summary>
/// Initializes a new instance of the <see cref="BufferedStreamDevice"/> class.
/// </summary>
/// <param name="readSpeed">The time to wait between each group of lines being read in milliseconds</param>
protected BufferedStreamDevice(int readSpeed)
/// <param name="burstRate">The time to wait between each group of lines being read in milliseconds</param>
protected BufferedStreamDevice(int burstRate)
{
m_readSpeed = readSpeed;
BurstRate = TimeSpan.FromMilliseconds(burstRate);
}
/// <summary>
@ -55,12 +58,53 @@ namespace NmeaParser
/// <inheritdoc />
protected sealed async override Task<System.IO.Stream> OpenStreamAsync()
{
var stream = await GetStreamAsync();
var stream = await GetStreamAsync().ConfigureAwait(false);
StreamReader sr = new StreamReader(stream);
m_stream = new BufferedStream(sr, m_readSpeed);
m_stream = new BufferedStream(sr, emulationSettings);
return m_stream;
}
/// <summary>
/// Gets or sets the emulated baud rate. Defaults to 115200
/// </summary>
/// <remarks>
/// Note that if the baud rate gets very low, while keeping a high <see cref="BurstRate"/>, the stream will not be able to keep
/// up the burstrate. For high-frequency bursts, make sure you have a corresponding high emualated baud rate.
/// </remarks>
public uint EmulatedBaudRate
{
get => emulationSettings.EmulatedBaudRate;
set => emulationSettings.EmulatedBaudRate = value;
}
/// <summary>
/// Gets or sets the emulated burst rate - that is the frequency of each burst of messages. Defaults to 1 second (1hz).
/// </summary>
/// <remarks>
/// Note that if the burst rate gets very high, while keeping a low <see cref="EmulatedBaudRate"/>, the stream will not be able to keep
/// up the burstrate. For high-frequency bursts, make sure you have a corresponding high emualated baud rate.
/// </remarks>
public TimeSpan BurstRate
{
get => emulationSettings.BurstRate;
set
{
if (value.TotalMilliseconds < 1)
throw new ArgumentOutOfRangeException(nameof(BurstRate), "Burst rate must be at least 1 ms");
emulationSettings.BurstRate = value;
}
}
/// <summary>
/// Gets or sets the separator between each burst of data. Defaults to <see cref="BurstEmulationSeparator.FirstToken"/>.
/// </summary>
/// <seealso cref="EnableBurstEmulation"/>
public BurstEmulationSeparator BurstSeparator
{
get => emulationSettings.Separator;
set => emulationSettings.Separator = value;
}
/// <inheritdoc />
protected override Task CloseStreamAsync(System.IO.Stream stream)
{
@ -68,54 +112,97 @@ namespace NmeaParser
return Task.FromResult(true);
}
private class BurstEmulationSettings
{
public uint EmulatedBaudRate { get; set; } = 115200;
public TimeSpan BurstRate { get; set; } = TimeSpan.FromSeconds(1);
public BurstEmulationSeparator Separator { get; set; }
}
/// <summary>
/// Defined how a burst of data is separated
/// </summary>
/// <seealso cref="BufferedStreamDevice.BurstSeparator"/>
public enum BurstEmulationSeparator
{
/// <summary>
/// The first NMEA token encountered will be used as an indicator for pauses between bursts
/// </summary>
FirstToken,
/// <summary>
/// An empty line in the NMEA stream should indicate a pause in the burst of messages
/// </summary>
EmptyLine
}
// stream that slowly populates a buffer from a StreamReader to simulate nmea messages coming
// in lastLineRead by lastLineRead at a steady stream
private class BufferedStream : Stream
{
private bool isDisposed;
private readonly StreamReader m_sr;
private byte[] m_buffer = new byte[0];
private readonly System.Threading.Timer m_timer;
private readonly object lockObj = new object();
private string? groupToken = null;
private string? lastLineRead = null;
private BurstEmulationSettings m_settings;
private CancellationTokenSource m_tcs;
private Task m_readTask;
/// <summary>
/// Initializes a new instance of the <see cref="BufferedStream"/> class.
/// </summary>
/// <param name="stream">The stream.</param>
/// <param name="readSpeed">The read speed in milliseconds.</param>
public BufferedStream(StreamReader stream, int readSpeed)
/// <param name="settings">Emulation settings.</param>
public BufferedStream(StreamReader stream, BurstEmulationSettings settings)
{
m_settings = settings;
m_sr = stream;
m_timer = new System.Threading.Timer(OnRead, null, 0, readSpeed); //read a group of lines every 'readSpeed' milliseconds
m_tcs = new CancellationTokenSource();
m_readTask = StartReadLoop(m_tcs.Token);
}
private void OnRead(object state)
private async Task StartReadLoop(CancellationToken cancellationToken)
{
if (lastLineRead != null)
AppendToBuffer(lastLineRead);
//Get the group token if we don't have one
while (groupToken == null && (lastLineRead == null || !lastLineRead.StartsWith("$", StringComparison.Ordinal)))
await Task.Yield();
var start = Stopwatch.GetTimestamp();
while (!cancellationToken.IsCancellationRequested)
{
lastLineRead = ReadLine(); //seek forward to first nmea token
if (isDisposed)
return;
if(lastLineRead != null)
AppendToBuffer(lastLineRead);
var line = ReadLine();
if (line != null)
{
// Group token is the first message type received - every time we see it, we'll take a short burst break
if (groupToken == null && line.StartsWith("$", StringComparison.Ordinal))
{
var values = line.Trim().Split(new char[] { ',' });
if (values.Length > 0)
groupToken = values[0];
}
if (m_settings.Separator == BurstEmulationSeparator.EmptyLine && string.IsNullOrWhiteSpace(line) ||
m_settings.Separator == BurstEmulationSeparator.FirstToken && groupToken != null && line.StartsWith(groupToken, StringComparison.Ordinal))
{
// Emulate the burst pause
var now = Stopwatch.GetTimestamp();
var delay = (now - start) / (double)Stopwatch.Frequency;
if (delay < m_settings.BurstRate.TotalSeconds)
await Task.Delay(TimeSpan.FromSeconds(m_settings.BurstRate.TotalSeconds - delay)).ConfigureAwait(false);
else
{
Debug.WriteLine("Warning: baud rate too slow for amount of data, or burst rate too fast");
}
if (cancellationToken.IsCancellationRequested)
return;
start = Stopwatch.GetTimestamp();
}
if (!string.IsNullOrWhiteSpace(line))
{
await AppendToBuffer(line).ConfigureAwait(false);
}
}
}
if(groupToken == null && lastLineRead != null)
{
var values = lastLineRead.Trim().Split(new char[] { ',' });
if (values.Length > 0)
groupToken = values[0];
}
lastLineRead = ReadLine();
while (!isDisposed && lastLineRead?.StartsWith(groupToken, StringComparison.Ordinal) == false) //keep reading until messages start repeating again
{
AppendToBuffer(lastLineRead);
lastLineRead = ReadLine();
}
}
private void AppendToBuffer(string line)
private double pendingDelay = 0;
private async Task AppendToBuffer(string line)
{
var bytes = Encoding.UTF8.GetBytes(line);
lock (lockObj)
@ -125,39 +212,49 @@ namespace NmeaParser
bytes.CopyTo(newBuffer, m_buffer.Length);
m_buffer = newBuffer;
}
var delay = bytes.Length * 10d / m_settings.EmulatedBaudRate; // 8 bits + 1 parity + 1 stop bit = 10bits per byte;
pendingDelay += delay;
if (pendingDelay < 0.016) //No reason to wait under the 16ms - Task.Delay not that accurate anyway
{
return;
}
// Task.Delay isn't very accurate so use the stopwatch to get the real delay and use difference to fix it later
var start = Stopwatch.GetTimestamp();
await Task.Delay(TimeSpan.FromSeconds(pendingDelay)).ConfigureAwait(false);
var end = Stopwatch.GetTimestamp();
pendingDelay -= (end - start) / (double)Stopwatch.Frequency;
}
private string? ReadLine()
{
if (isDisposed)
if (m_tcs.IsCancellationRequested)
return null;
if (m_sr.EndOfStream)
m_sr.BaseStream.Seek(0, SeekOrigin.Begin); //start over
return m_sr.ReadLine() + '\n';
}
/// <inheritdoc />
public override bool CanRead { get { return true; } }
/// <inheritdoc />
public override bool CanSeek { get { return false; } }
public override bool CanRead => true;
/// <inheritdoc />
public override bool CanWrite { get { return false; } }
public override bool CanSeek => false;
/// <inheritdoc />
public override bool CanWrite => false;
/// <inheritdoc />
public override void Flush() { }
/// <inheritdoc />
public override long Length { get { return m_sr.BaseStream.Length; } }
public override long Length => m_sr.BaseStream.Length;
/// <inheritdoc />
public override long Position
{
get { return m_sr.BaseStream.Position; }
set
{
throw new NotSupportedException();
}
get => m_sr.BaseStream.Position;
set => throw new NotSupportedException();
}
/// <inheritdoc />
@ -184,28 +281,18 @@ namespace NmeaParser
}
/// <inheritdoc />
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
/// <inheritdoc />
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override void SetLength(long value) => throw new NotSupportedException();
/// <inheritdoc />
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
/// <inheritdoc />
protected override void Dispose(bool disposing)
{
isDisposed = true;
m_timer.Dispose();
m_tcs.Cancel();
m_sr.Dispose();
base.Dispose(disposing);
}

View file

@ -36,7 +36,7 @@ namespace SampleApp.WinDesktop
//var device = new NmeaParser.SerialPortDevice(portName);
//Use a log file for playing back logged data
var device = new NmeaParser.NmeaFileDevice("NmeaSampleData.txt");
var device = new NmeaParser.NmeaFileDevice("NmeaSampleData.txt") { EmulatedBaudRate = 9600, BurstRate = TimeSpan.FromSeconds(1d) };
StartDevice(device);
}