// // Copyright (c) 2014 Morten Nielsen // // Licensed under the Microsoft Public License (Ms-PL) (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://opensource.org/licenses/Ms-PL.html // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Threading.Tasks; namespace NmeaParser { /// /// An abstract generic NMEA device that reads a stream at a decreased pace, /// mostly used to emulate NMEA input from files and strings. /// public abstract class BufferedStreamDevice : NmeaDevice { BufferedStream m_stream; int m_readSpeed; /// /// Initializes a new instance of the class. /// protected BufferedStreamDevice() : this(1000) { } /// /// Initializes a new instance of the class. /// /// The time to wait between each group of lines being read in milliseconds protected BufferedStreamDevice(int readSpeed) { m_readSpeed = readSpeed; } /// /// Gets the stream to perform buffer reads on. /// /// [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1024:UsePropertiesWhereAppropriate")] protected abstract Task GetStreamAsync(); /// /// Opens the stream asynchronous. /// /// protected sealed async override Task OpenStreamAsync() { var stream = await GetStreamAsync(); StreamReader sr = new StreamReader(stream); m_stream = new BufferedStream(sr, m_readSpeed); return m_stream; } /// /// Closes the stream asynchronous. /// /// The stream. /// protected override Task CloseStreamAsync(System.IO.Stream stream) { m_stream.Dispose(); return Task.FromResult(true); } // stream that slowly populates a buffer from a StreamReader to simulate nmea messages coming // in lastLineRead by lastLineRead at a steady stream private class BufferedStream : Stream { private StreamReader m_sr; private byte[] m_buffer = new byte[0]; private System.Threading.Timer m_timer; private object lockObj = new object(); private string groupToken = null; private string lastLineRead = null; /// /// Initializes a new instance of the class. /// /// The stream. /// The read speed in milliseconds. public BufferedStream(StreamReader stream, int readSpeed) { m_sr = stream; m_timer = new System.Threading.Timer(OnRead, null, 0, readSpeed); //read a group of lines every 'readSpeed' milliseconds } private void OnRead(object state) { if (lastLineRead != null) AppendToBuffer(lastLineRead); //Get the group token if we don't have one while (groupToken == null && (lastLineRead == null || !lastLineRead.StartsWith("$", StringComparison.Ordinal))) { lastLineRead = ReadLine(); //seek forward to first nmea token AppendToBuffer(lastLineRead); } if(groupToken == null) { var values = lastLineRead.Trim().Split(new char[] { ',' }); if (values.Length > 0) groupToken = values[0]; } lastLineRead = ReadLine(); while (!lastLineRead.StartsWith(groupToken, StringComparison.Ordinal)) //keep reading until messages start repeating again { AppendToBuffer(lastLineRead); lastLineRead = ReadLine(); } } private void AppendToBuffer(string line) { var bytes = Encoding.UTF8.GetBytes(line); lock (lockObj) { byte[] newBuffer = new byte[m_buffer.Length + bytes.Length]; m_buffer.CopyTo(newBuffer, 0); bytes.CopyTo(newBuffer, m_buffer.Length); m_buffer = newBuffer; } } private string ReadLine() { if (m_sr.EndOfStream) m_sr.BaseStream.Seek(0, SeekOrigin.Begin); //start over return m_sr.ReadLine() + '\n'; } /// /// Gets a value indicating whether this instance can read. /// /// /// true if this instance can read; otherwise, false. /// public override bool CanRead { get { return true; } } /// /// Gets a value indicating whether this instance can seek. /// /// /// true if this instance can seek; otherwise, false. /// public override bool CanSeek { get { return false; } } /// /// Gets a value indicating whether this instance can write. /// /// /// true if this instance can write; otherwise, false. /// public override bool CanWrite { get { return false; } } /// /// Flushes this instance. /// public override void Flush() { } /// /// Gets the length. /// /// /// The length. /// public override long Length { get { return m_sr.BaseStream.Length; } } /// /// Gets or sets the position. /// /// /// The position. /// /// public override long Position { get { return m_sr.BaseStream.Position; } set { throw new NotSupportedException(); } } /// /// Reads the specified buffer. /// /// The buffer. /// The offset. /// The count. /// public override int Read(byte[] buffer, int offset, int count) { lock (lockObj) { if (this.m_buffer.Length <= count) { int length = this.m_buffer.Length; this.m_buffer.CopyTo(buffer, 0); this.m_buffer = new byte[0]; return length; } else { Array.Copy(this.m_buffer, buffer, count); byte[] newBuffer = new byte[this.m_buffer.Length - count]; Array.Copy(this.m_buffer, count, newBuffer, 0, newBuffer.Length); this.m_buffer = newBuffer; return count; } } } /// /// Seeks the specified offset. /// /// The offset. /// The origin. /// /// public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } /// /// Sets the length. /// /// The value. /// public override void SetLength(long value) { throw new NotSupportedException(); } /// /// Writes the specified buffer to the device. /// /// The buffer. /// The offset. /// The count. /// public override void Write(byte[] buffer, int offset, int count) { throw new NotSupportedException(); } /// /// Releases unmanaged and - optionally - managed resources. /// /// true to release both managed and unmanaged resources; false to release only unmanaged resources. protected override void Dispose(bool disposing) { base.Dispose(disposing); m_sr.Dispose(); m_timer.Dispose(); } } } }