diff --git a/serialstream.go b/serialstream.go index 837a59f..4c2819b 100644 --- a/serialstream.go +++ b/serialstream.go @@ -2,12 +2,14 @@ package main import ( "bytes" + "encoding/binary" "time" "github.com/nonoo/kappanhang/log" ) const maxSerialFrameLength = 80 // Max. frame length according to Hamlib. +const serialRxSeqBufLength = 100 * time.Millisecond type serialStream struct { common streamCommon @@ -17,6 +19,13 @@ type serialStream struct { sendSeq uint16 + lastSeqBufFrontRxSeq uint16 + rxSeqBuf seqBuf + rxSeqBufEntryChan chan seqBufEntry + + receivedSerialData bool + lastReceivedSeq uint16 + readFromSerialPort struct { buf bytes.Buffer frameStarted bool @@ -67,20 +76,85 @@ func (s *serialStream) sendOpenClose(close bool) error { return nil } -func (s *serialStream) handleRead(r []byte) { - if len(r) >= 22 { - if r[16] == 0xc1 && r[0]-0x15 == r[17] { - r = r[21:] +func (s *serialStream) handleRxSeqBufEntry(e seqBufEntry) { + gotSeq := uint16(e.seq) + if s.receivedSerialData { + expectedSeq := s.lastReceivedSeq + 1 + if expectedSeq != gotSeq { + var missingPkts int + if gotSeq > expectedSeq { + missingPkts = int(gotSeq) - int(expectedSeq) + } else { + missingPkts = int(gotSeq) + 65536 - int(expectedSeq) + } + log.Error("lost ", missingPkts, " packets") + } + } + s.lastReceivedSeq = gotSeq + s.receivedSerialData = true - // log.Print("fromradio ", r) + if len(e.data) == 16 { // Do not send pkt0s. + return + } - s.serialPort.write <- r - if s.tcpsrv.toClient != nil { - s.tcpsrv.toClient <- r + e.data = e.data[21:] + + s.serialPort.write <- e.data + if s.tcpsrv.toClient != nil { + s.tcpsrv.toClient <- e.data + } +} + +func (s *serialStream) requestRetransmitIfNeeded(gotSeq uint16) error { + prevExpectedSeq := gotSeq - 1 + if s.lastSeqBufFrontRxSeq != prevExpectedSeq { + var missingPkts int + var sr seqNumRange + if prevExpectedSeq > s.lastSeqBufFrontRxSeq { + sr[0] = s.lastSeqBufFrontRxSeq + sr[1] = prevExpectedSeq + missingPkts = int(prevExpectedSeq) - int(s.lastSeqBufFrontRxSeq) + } else { + sr[0] = prevExpectedSeq + sr[1] = s.lastSeqBufFrontRxSeq + missingPkts = int(prevExpectedSeq) + 65536 - int(s.lastSeqBufFrontRxSeq) + } + if missingPkts == 1 { + log.Debug("request pkt #", sr[1], " retransmit") + if err := s.common.sendRetransmitRequest(sr[1]); err != nil { + return err + } + } else if missingPkts < 50 { + log.Debug("request pkt #", sr[0], "-#", sr[1], " retransmit") + if err := s.common.sendRetransmitRequestForRanges([]seqNumRange{sr}); err != nil { + return err } } } + s.lastSeqBufFrontRxSeq = gotSeq + return nil +} +func (s *serialStream) handleSerialPacket(r []byte) error { + gotSeq := binary.LittleEndian.Uint16(r[6:8]) + addedToFront, _ := s.rxSeqBuf.add(seqNum(gotSeq), r) + + // If the packet is not added to the front of the seqbuf, then it means that it was an answer for a + // retransmit request (or it was an out of order packet which we don't want start a retransmit). + if !addedToFront { + return nil + } + + return s.requestRetransmitIfNeeded(gotSeq) +} + +func (s *serialStream) handleRead(r []byte) error { + // We add both serial data and pkt0 to the seqbuf. + if (len(r) == 16 && bytes.Equal(r[:6], []byte{0x10, 0x00, 0x00, 0x00, 0x00, 0x00})) || // Pkt0? + (len(r) >= 22 && r[16] == 0xc1 && r[0]-0x15 == r[17]) { // Serial data? + return s.handleSerialPacket(r) + } + return nil } func (s *serialStream) gotDataForRadio(r []byte) { @@ -117,8 +191,6 @@ func (s *serialStream) gotDataForRadio(r []byte) { for _, b := range r { s.readFromSerialPort.buf.WriteByte(b) if b == 0xfc || b == 0xfd || s.readFromSerialPort.buf.Len() == maxSerialFrameLength { - // log.Print("toradio ", s.readFromSerialPort.buf.Bytes()) - if err := s.send(s.readFromSerialPort.buf.Bytes()); err != nil { reportError(err) } @@ -136,7 +208,11 @@ func (s *serialStream) loop() { for { select { case r := <-s.common.readChan: - s.handleRead(r) + if err := s.handleRead(r); err != nil { + reportError(err) + } + case e := <-s.rxSeqBufEntryChan: + s.handleRxSeqBufEntry(e) case r := <-s.serialPort.read: s.gotDataForRadio(r) case r := <-s.tcpsrv.fromClient: @@ -196,6 +272,8 @@ func (s *serialStream) init() error { if err := s.common.init("serial", 50002); err != nil { return err } + s.rxSeqBufEntryChan = make(chan seqBufEntry) + s.rxSeqBuf.init(serialRxSeqBufLength, 0xffff, 0, s.rxSeqBufEntryChan) return nil } @@ -212,4 +290,5 @@ func (s *serialStream) deinit() { <-s.deinitFinishedChan } s.common.deinit() + s.rxSeqBuf.deinit() }