diff --git a/controlstream.go b/controlstream.go index 94a7386..64eea4a 100644 --- a/controlstream.go +++ b/controlstream.go @@ -30,12 +30,9 @@ type controlStream struct { } func (s *controlStream) sendPktLogin() error { - s.common.pkt0.sendSeqLock() - defer s.common.pkt0.sendSeqUnlock() - // The reply to the auth packet will contain a 6 bytes long auth ID with the first 2 bytes set to our ID. authStartID := []byte{0x63, 0x00} - p := []byte{0x80, 0x00, 0x00, 0x00, 0x00, 0x00, byte(s.common.pkt0.sendSeq), byte(s.common.pkt0.sendSeq >> 8), + p := []byte{0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID), byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID), 0x00, 0x00, 0x00, 0x70, 0x01, 0x00, 0x00, byte(s.authInnerSendSeq), @@ -52,14 +49,10 @@ func (s *controlStream) sendPktLogin() error { 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} - if err := s.common.send(p); err != nil { - return err - } - if err := s.common.send(p); err != nil { + if err := s.common.pkt0.sendTrackedPacket(&s.common, p); err != nil { return err } - s.common.pkt0.sendSeq++ s.authInnerSendSeq++ return nil } @@ -82,10 +75,7 @@ func (s *controlStream) sendPktAuth(magic byte) error { // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 - s.common.pkt0.sendSeqLock() - defer s.common.pkt0.sendSeqUnlock() - - p := []byte{0x40, 0x00, 0x00, 0x00, 0x00, 0x00, byte(s.common.pkt0.sendSeq), byte(s.common.pkt0.sendSeq >> 8), + p := []byte{0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID), byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID), 0x00, 0x00, 0x00, 0x30, 0x01, magic, 0x00, byte(s.authInnerSendSeq), @@ -94,23 +84,16 @@ func (s *controlStream) sendPktAuth(magic byte) error { 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} - if err := s.common.send(p); err != nil { + if err := s.common.pkt0.sendTrackedPacket(&s.common, p); err != nil { return err } - if err := s.common.send(p); err != nil { - return err - } - s.common.pkt0.sendSeq++ s.authInnerSendSeq++ return nil } func (s *controlStream) sendRequestSerialAndAudio() error { - s.common.pkt0.sendSeqLock() - defer s.common.pkt0.sendSeqUnlock() - log.Print("requesting serial and audio stream") - p := []byte{0x90, 0x00, 0x00, 0x00, 0x00, 0x00, byte(s.common.pkt0.sendSeq), byte(s.common.pkt0.sendSeq >> 8), + p := []byte{0x90, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID), byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID), 0x00, 0x00, 0x00, 0x80, 0x01, 0x03, 0x00, byte(s.authInnerSendSeq), @@ -129,14 +112,10 @@ func (s *controlStream) sendRequestSerialAndAudio() error { 0x00, 0x00, 0xbb, 0x80, 0x00, 0x00, 0xc3, 0x52, 0x00, 0x00, 0xc3, 0x53, 0x00, 0x00, 0x00, 0xa0, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} - if err := s.common.send(p); err != nil { - return err - } - if err := s.common.send(p); err != nil { + if err := s.common.pkt0.sendTrackedPacket(&s.common, p); err != nil { return err } - s.common.pkt0.sendSeq++ s.authInnerSendSeq++ return nil @@ -244,8 +223,6 @@ func (s *controlStream) handleRead(r []byte) error { func (s *controlStream) loop() { startTime := time.Now() - s.common.pkt0.startPeriodicSend(&s.common) - s.secondAuthTimer = time.NewTimer(time.Second) reauthTicker := time.NewTicker(60 * time.Second) statusLogTicker := time.NewTicker(3 * time.Second) @@ -299,7 +276,8 @@ func (s *controlStream) start() error { return err } - s.common.pkt0.sendSeq = 1 + s.common.pkt0.startPeriodicSend(&s.common) + if err := s.sendPktLogin(); err != nil { return err } diff --git a/pkt0.go b/pkt0.go index 099e2ab..9e6353a 100644 --- a/pkt0.go +++ b/pkt0.go @@ -1,8 +1,13 @@ package main import ( + "bytes" + "encoding/binary" + "math" "sync" "time" + + "github.com/nonoo/kappanhang/log" ) type pkt0Type struct { @@ -11,6 +16,8 @@ type pkt0Type struct { sendTicker *time.Ticker + txSeqBuf txSeqBufStruct + periodicStopNeededChan chan bool periodicStopFinishedChan chan bool } @@ -23,13 +30,81 @@ func (p *pkt0Type) sendSeqUnlock() { p.mutex.Unlock() } -func (p *pkt0Type) send(s *streamCommon) error { +func (p *pkt0Type) retransmitRange(s *streamCommon, start, end uint16) error { + if int(math.Abs(float64(start)-float64(end))) > len(p.txSeqBuf.entries) { + log.Debug(s.name+"/can't retransmit #", start, "-", end, " - range too big") + return nil + } + + log.Debug("got retransmit request for #", start, "-", end) + for { + d := p.txSeqBuf.get(seqNum(start)) + if d != nil { + log.Debug(s.name+"/retransmitting #", start) + if err := s.send(d); err != nil { + return err + } + } else { + log.Debug(s.name+"/can't retransmit #", start, " - not found") + } + + if start == end { + break + } + start++ + } + return nil +} + +func (p *pkt0Type) handle(s *streamCommon, r []byte) error { + if len(r) < 16 { + return nil + } + + if bytes.Equal(r[:6], []byte{0x10, 0x00, 0x00, 0x00, 0x01, 0x00}) { + seq := binary.LittleEndian.Uint16(r[6:8]) + d := p.txSeqBuf.get(seqNum(seq)) + if d != nil { + log.Debug(s.name+"/retransmitting #", seq) + if err := s.send(d); err != nil { + return err + } + } else { + log.Debug(s.name+"/can't retransmit #", seq, " - not found") + } + } else if bytes.Equal(r[:6], []byte{0x18, 0x00, 0x00, 0x00, 0x01, 0x00}) { + r = r[16:] + for len(r) >= 4 { + start := binary.LittleEndian.Uint16(r[0:2]) + end := binary.LittleEndian.Uint16(r[2:4]) + if err := p.retransmitRange(s, start, end); err != nil { + return err + } + r = r[4:] + } + } + return nil +} + +func (p *pkt0Type) isIdlePkt0(r []byte) bool { + return len(r) == 16 && bytes.Equal(r[:6], []byte{0x10, 0x00, 0x00, 0x00, 0x00, 0x00}) +} + +func (p *pkt0Type) isPkt0(r []byte) bool { + return len(r) >= 16 && (p.isIdlePkt0(r) || + bytes.Equal(r[:6], []byte{0x10, 0x00, 0x00, 0x00, 0x01, 0x00}) || // Retransmit request for 1 packet. + bytes.Equal(r[:6], []byte{0x18, 0x00, 0x00, 0x00, 0x01, 0x00})) // Retransmit request for ranges. +} + +// The radio can request retransmit for tracked packets. If there are no tracked packets to send, idle pkt0 +// packets are periodically sent. +func (p *pkt0Type) sendTrackedPacket(s *streamCommon, d []byte) error { p.sendSeqLock() defer p.sendSeqUnlock() - d := []byte{0x10, 0x00, 0x00, 0x00, 0x00, 0x00, byte(p.sendSeq), byte(p.sendSeq >> 8), - byte(s.localSID >> 24), byte(s.localSID >> 16), byte(s.localSID >> 8), byte(s.localSID), - byte(s.remoteSID >> 24), byte(s.remoteSID >> 16), byte(s.remoteSID >> 8), byte(s.remoteSID)} + d[6] = byte(p.sendSeq) + d[7] = byte(p.sendSeq >> 8) + p.txSeqBuf.add(seqNum(p.sendSeq), d) if err := s.send(d); err != nil { return err } @@ -37,11 +112,18 @@ func (p *pkt0Type) send(s *streamCommon) error { return nil } +func (p *pkt0Type) sendIdle(s *streamCommon) error { + d := []byte{0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + byte(s.localSID >> 24), byte(s.localSID >> 16), byte(s.localSID >> 8), byte(s.localSID), + byte(s.remoteSID >> 24), byte(s.remoteSID >> 16), byte(s.remoteSID >> 8), byte(s.remoteSID)} + return p.sendTrackedPacket(s, d) +} + func (p *pkt0Type) loop(s *streamCommon) { for { select { case <-p.sendTicker.C: - if err := p.send(s); err != nil { + if err := p.sendIdle(s); err != nil { reportError(err) } case <-p.periodicStopNeededChan: @@ -52,6 +134,7 @@ func (p *pkt0Type) loop(s *streamCommon) { } func (p *pkt0Type) startPeriodicSend(s *streamCommon) { + p.sendSeq = 1 p.sendTicker = time.NewTicker(100 * time.Millisecond) p.periodicStopNeededChan = make(chan bool) diff --git a/serialstream.go b/serialstream.go index 369efef..3fd5d88 100644 --- a/serialstream.go +++ b/serialstream.go @@ -36,26 +36,19 @@ type serialStream struct { } func (s *serialStream) send(d []byte) error { - s.common.pkt0.sendSeqLock() - defer s.common.pkt0.sendSeqUnlock() - l := byte(len(d)) - p := append([]byte{0x15 + l, 0x00, 0x00, 0x00, 0x00, 0x00, byte(s.common.pkt0.sendSeq), byte(s.common.pkt0.sendSeq >> 8), + p := append([]byte{0x15 + l, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID), byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID), 0xc1, l, 0x00, byte(s.sendSeq >> 8), byte(s.sendSeq)}, d...) - if err := s.common.send(p); err != nil { + if err := s.common.pkt0.sendTrackedPacket(&s.common, p); err != nil { return err } - s.common.pkt0.sendSeq++ s.sendSeq++ return nil } func (s *serialStream) sendOpenClose(close bool) error { - s.common.pkt0.sendSeqLock() - defer s.common.pkt0.sendSeqUnlock() - var magic byte if close { magic = 0x00 @@ -63,14 +56,13 @@ func (s *serialStream) sendOpenClose(close bool) error { magic = 0x05 } - p := []byte{0x16, 0x00, 0x00, 0x00, 0x00, 0x00, byte(s.common.pkt0.sendSeq), byte(s.common.pkt0.sendSeq >> 8), + p := []byte{0x16, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID), byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID), 0xc0, 0x01, 0x00, byte(s.sendSeq >> 8), byte(s.sendSeq), magic} - if err := s.common.send(p); err != nil { + if err := s.common.pkt0.sendTrackedPacket(&s.common, p); err != nil { return err } - s.common.pkt0.sendSeq++ s.sendSeq++ return nil } @@ -92,7 +84,7 @@ func (s *serialStream) handleRxSeqBufEntry(e seqBufEntry) { s.lastReceivedSeq = gotSeq s.receivedSerialData = true - if len(e.data) == 16 { // Do not send pkt0s. + if s.common.pkt0.isPkt0(e.data) { return } @@ -118,9 +110,8 @@ func (s *serialStream) handleSerialPacket(r []byte) error { } func (s *serialStream) handleRead(r []byte) error { - // We add both serial data and pkt0 to the seqbuf. - if (len(r) == 16 && bytes.Equal(r[:6], []byte{0x10, 0x00, 0x00, 0x00, 0x00, 0x00})) || // Pkt0? - (len(r) >= 22 && r[16] == 0xc1 && r[0]-0x15 == r[17]) { // Serial data? + // We add both idle pkt0 and serial data to the seqbuf. + if s.common.pkt0.isIdlePkt0(r) || (len(r) >= 22 && r[16] == 0xc1 && r[0]-0x15 == r[17]) { return s.handleSerialPacket(r) } return nil diff --git a/streamcommon.go b/streamcommon.go index 5d43b60..fe801f2 100644 --- a/streamcommon.go +++ b/streamcommon.go @@ -51,7 +51,12 @@ func (s *streamCommon) reader() { if err := s.pkt7.handle(s, r); err != nil { reportError(err) } + // Don't let pkt7 packets further downstream. continue + } else if s.pkt0.isPkt0(r) { + if err := s.pkt0.handle(s, r); err != nil { + reportError(err) + } } select { diff --git a/txseqbuf.go b/txseqbuf.go new file mode 100644 index 0000000..31b146e --- /dev/null +++ b/txseqbuf.go @@ -0,0 +1,38 @@ +package main + +import "time" + +type txSeqBufStruct struct { + entries []seqBufEntry +} + +func (s *txSeqBufStruct) add(seq seqNum, p []byte) { + s.entries = append(s.entries, seqBufEntry{ + seq: seq, + data: p, + addedAt: time.Now(), + }) + s.purgeOldEntries() +} + +func (s *txSeqBufStruct) purgeOldEntries() { + for len(s.entries) > 0 && time.Since(s.entries[0].addedAt) > time.Second { + s.entries = s.entries[1:] + } +} + +func (s *txSeqBufStruct) get(seq seqNum) (d []byte) { + if len(s.entries) == 0 { + return nil + } + + // Searching from backwards, as we expect most queries for latest entries. + for i := len(s.entries) - 1; i >= 0; i-- { + if s.entries[i].seq == seq { + d = s.entries[i].data + break + } + } + s.purgeOldEntries() + return d +}