Completely handle retransmit requests

This commit is contained in:
Nonoo 2020-10-25 20:18:24 +01:00
parent de1f40e171
commit f28a0b46cb
5 changed files with 146 additions and 51 deletions

View file

@ -30,12 +30,9 @@ type controlStream struct {
}
func (s *controlStream) sendPktLogin() error {
s.common.pkt0.sendSeqLock()
defer s.common.pkt0.sendSeqUnlock()
// The reply to the auth packet will contain a 6 bytes long auth ID with the first 2 bytes set to our ID.
authStartID := []byte{0x63, 0x00}
p := []byte{0x80, 0x00, 0x00, 0x00, 0x00, 0x00, byte(s.common.pkt0.sendSeq), byte(s.common.pkt0.sendSeq >> 8),
p := []byte{0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID),
byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID),
0x00, 0x00, 0x00, 0x70, 0x01, 0x00, 0x00, byte(s.authInnerSendSeq),
@ -52,14 +49,10 @@ func (s *controlStream) sendPktLogin() error {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
if err := s.common.send(p); err != nil {
return err
}
if err := s.common.send(p); err != nil {
if err := s.common.pkt0.sendTrackedPacket(&s.common, p); err != nil {
return err
}
s.common.pkt0.sendSeq++
s.authInnerSendSeq++
return nil
}
@ -82,10 +75,7 @@ func (s *controlStream) sendPktAuth(magic byte) error {
// 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
// 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00
s.common.pkt0.sendSeqLock()
defer s.common.pkt0.sendSeqUnlock()
p := []byte{0x40, 0x00, 0x00, 0x00, 0x00, 0x00, byte(s.common.pkt0.sendSeq), byte(s.common.pkt0.sendSeq >> 8),
p := []byte{0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID),
byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID),
0x00, 0x00, 0x00, 0x30, 0x01, magic, 0x00, byte(s.authInnerSendSeq),
@ -94,23 +84,16 @@ func (s *controlStream) sendPktAuth(magic byte) error {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
if err := s.common.send(p); err != nil {
if err := s.common.pkt0.sendTrackedPacket(&s.common, p); err != nil {
return err
}
if err := s.common.send(p); err != nil {
return err
}
s.common.pkt0.sendSeq++
s.authInnerSendSeq++
return nil
}
func (s *controlStream) sendRequestSerialAndAudio() error {
s.common.pkt0.sendSeqLock()
defer s.common.pkt0.sendSeqUnlock()
log.Print("requesting serial and audio stream")
p := []byte{0x90, 0x00, 0x00, 0x00, 0x00, 0x00, byte(s.common.pkt0.sendSeq), byte(s.common.pkt0.sendSeq >> 8),
p := []byte{0x90, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID),
byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID),
0x00, 0x00, 0x00, 0x80, 0x01, 0x03, 0x00, byte(s.authInnerSendSeq),
@ -129,14 +112,10 @@ func (s *controlStream) sendRequestSerialAndAudio() error {
0x00, 0x00, 0xbb, 0x80, 0x00, 0x00, 0xc3, 0x52,
0x00, 0x00, 0xc3, 0x53, 0x00, 0x00, 0x00, 0xa0,
0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
if err := s.common.send(p); err != nil {
return err
}
if err := s.common.send(p); err != nil {
if err := s.common.pkt0.sendTrackedPacket(&s.common, p); err != nil {
return err
}
s.common.pkt0.sendSeq++
s.authInnerSendSeq++
return nil
@ -244,8 +223,6 @@ func (s *controlStream) handleRead(r []byte) error {
func (s *controlStream) loop() {
startTime := time.Now()
s.common.pkt0.startPeriodicSend(&s.common)
s.secondAuthTimer = time.NewTimer(time.Second)
reauthTicker := time.NewTicker(60 * time.Second)
statusLogTicker := time.NewTicker(3 * time.Second)
@ -299,7 +276,8 @@ func (s *controlStream) start() error {
return err
}
s.common.pkt0.sendSeq = 1
s.common.pkt0.startPeriodicSend(&s.common)
if err := s.sendPktLogin(); err != nil {
return err
}

93
pkt0.go
View file

@ -1,8 +1,13 @@
package main
import (
"bytes"
"encoding/binary"
"math"
"sync"
"time"
"github.com/nonoo/kappanhang/log"
)
type pkt0Type struct {
@ -11,6 +16,8 @@ type pkt0Type struct {
sendTicker *time.Ticker
txSeqBuf txSeqBufStruct
periodicStopNeededChan chan bool
periodicStopFinishedChan chan bool
}
@ -23,13 +30,81 @@ func (p *pkt0Type) sendSeqUnlock() {
p.mutex.Unlock()
}
func (p *pkt0Type) send(s *streamCommon) error {
func (p *pkt0Type) retransmitRange(s *streamCommon, start, end uint16) error {
if int(math.Abs(float64(start)-float64(end))) > len(p.txSeqBuf.entries) {
log.Debug(s.name+"/can't retransmit #", start, "-", end, " - range too big")
return nil
}
log.Debug("got retransmit request for #", start, "-", end)
for {
d := p.txSeqBuf.get(seqNum(start))
if d != nil {
log.Debug(s.name+"/retransmitting #", start)
if err := s.send(d); err != nil {
return err
}
} else {
log.Debug(s.name+"/can't retransmit #", start, " - not found")
}
if start == end {
break
}
start++
}
return nil
}
func (p *pkt0Type) handle(s *streamCommon, r []byte) error {
if len(r) < 16 {
return nil
}
if bytes.Equal(r[:6], []byte{0x10, 0x00, 0x00, 0x00, 0x01, 0x00}) {
seq := binary.LittleEndian.Uint16(r[6:8])
d := p.txSeqBuf.get(seqNum(seq))
if d != nil {
log.Debug(s.name+"/retransmitting #", seq)
if err := s.send(d); err != nil {
return err
}
} else {
log.Debug(s.name+"/can't retransmit #", seq, " - not found")
}
} else if bytes.Equal(r[:6], []byte{0x18, 0x00, 0x00, 0x00, 0x01, 0x00}) {
r = r[16:]
for len(r) >= 4 {
start := binary.LittleEndian.Uint16(r[0:2])
end := binary.LittleEndian.Uint16(r[2:4])
if err := p.retransmitRange(s, start, end); err != nil {
return err
}
r = r[4:]
}
}
return nil
}
func (p *pkt0Type) isIdlePkt0(r []byte) bool {
return len(r) == 16 && bytes.Equal(r[:6], []byte{0x10, 0x00, 0x00, 0x00, 0x00, 0x00})
}
func (p *pkt0Type) isPkt0(r []byte) bool {
return len(r) >= 16 && (p.isIdlePkt0(r) ||
bytes.Equal(r[:6], []byte{0x10, 0x00, 0x00, 0x00, 0x01, 0x00}) || // Retransmit request for 1 packet.
bytes.Equal(r[:6], []byte{0x18, 0x00, 0x00, 0x00, 0x01, 0x00})) // Retransmit request for ranges.
}
// The radio can request retransmit for tracked packets. If there are no tracked packets to send, idle pkt0
// packets are periodically sent.
func (p *pkt0Type) sendTrackedPacket(s *streamCommon, d []byte) error {
p.sendSeqLock()
defer p.sendSeqUnlock()
d := []byte{0x10, 0x00, 0x00, 0x00, 0x00, 0x00, byte(p.sendSeq), byte(p.sendSeq >> 8),
byte(s.localSID >> 24), byte(s.localSID >> 16), byte(s.localSID >> 8), byte(s.localSID),
byte(s.remoteSID >> 24), byte(s.remoteSID >> 16), byte(s.remoteSID >> 8), byte(s.remoteSID)}
d[6] = byte(p.sendSeq)
d[7] = byte(p.sendSeq >> 8)
p.txSeqBuf.add(seqNum(p.sendSeq), d)
if err := s.send(d); err != nil {
return err
}
@ -37,11 +112,18 @@ func (p *pkt0Type) send(s *streamCommon) error {
return nil
}
func (p *pkt0Type) sendIdle(s *streamCommon) error {
d := []byte{0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
byte(s.localSID >> 24), byte(s.localSID >> 16), byte(s.localSID >> 8), byte(s.localSID),
byte(s.remoteSID >> 24), byte(s.remoteSID >> 16), byte(s.remoteSID >> 8), byte(s.remoteSID)}
return p.sendTrackedPacket(s, d)
}
func (p *pkt0Type) loop(s *streamCommon) {
for {
select {
case <-p.sendTicker.C:
if err := p.send(s); err != nil {
if err := p.sendIdle(s); err != nil {
reportError(err)
}
case <-p.periodicStopNeededChan:
@ -52,6 +134,7 @@ func (p *pkt0Type) loop(s *streamCommon) {
}
func (p *pkt0Type) startPeriodicSend(s *streamCommon) {
p.sendSeq = 1
p.sendTicker = time.NewTicker(100 * time.Millisecond)
p.periodicStopNeededChan = make(chan bool)

View file

@ -36,26 +36,19 @@ type serialStream struct {
}
func (s *serialStream) send(d []byte) error {
s.common.pkt0.sendSeqLock()
defer s.common.pkt0.sendSeqUnlock()
l := byte(len(d))
p := append([]byte{0x15 + l, 0x00, 0x00, 0x00, 0x00, 0x00, byte(s.common.pkt0.sendSeq), byte(s.common.pkt0.sendSeq >> 8),
p := append([]byte{0x15 + l, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID),
byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID),
0xc1, l, 0x00, byte(s.sendSeq >> 8), byte(s.sendSeq)}, d...)
if err := s.common.send(p); err != nil {
if err := s.common.pkt0.sendTrackedPacket(&s.common, p); err != nil {
return err
}
s.common.pkt0.sendSeq++
s.sendSeq++
return nil
}
func (s *serialStream) sendOpenClose(close bool) error {
s.common.pkt0.sendSeqLock()
defer s.common.pkt0.sendSeqUnlock()
var magic byte
if close {
magic = 0x00
@ -63,14 +56,13 @@ func (s *serialStream) sendOpenClose(close bool) error {
magic = 0x05
}
p := []byte{0x16, 0x00, 0x00, 0x00, 0x00, 0x00, byte(s.common.pkt0.sendSeq), byte(s.common.pkt0.sendSeq >> 8),
p := []byte{0x16, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID),
byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID),
0xc0, 0x01, 0x00, byte(s.sendSeq >> 8), byte(s.sendSeq), magic}
if err := s.common.send(p); err != nil {
if err := s.common.pkt0.sendTrackedPacket(&s.common, p); err != nil {
return err
}
s.common.pkt0.sendSeq++
s.sendSeq++
return nil
}
@ -92,7 +84,7 @@ func (s *serialStream) handleRxSeqBufEntry(e seqBufEntry) {
s.lastReceivedSeq = gotSeq
s.receivedSerialData = true
if len(e.data) == 16 { // Do not send pkt0s.
if s.common.pkt0.isPkt0(e.data) {
return
}
@ -118,9 +110,8 @@ func (s *serialStream) handleSerialPacket(r []byte) error {
}
func (s *serialStream) handleRead(r []byte) error {
// We add both serial data and pkt0 to the seqbuf.
if (len(r) == 16 && bytes.Equal(r[:6], []byte{0x10, 0x00, 0x00, 0x00, 0x00, 0x00})) || // Pkt0?
(len(r) >= 22 && r[16] == 0xc1 && r[0]-0x15 == r[17]) { // Serial data?
// We add both idle pkt0 and serial data to the seqbuf.
if s.common.pkt0.isIdlePkt0(r) || (len(r) >= 22 && r[16] == 0xc1 && r[0]-0x15 == r[17]) {
return s.handleSerialPacket(r)
}
return nil

View file

@ -51,7 +51,12 @@ func (s *streamCommon) reader() {
if err := s.pkt7.handle(s, r); err != nil {
reportError(err)
}
// Don't let pkt7 packets further downstream.
continue
} else if s.pkt0.isPkt0(r) {
if err := s.pkt0.handle(s, r); err != nil {
reportError(err)
}
}
select {

38
txseqbuf.go Normal file
View file

@ -0,0 +1,38 @@
package main
import "time"
type txSeqBufStruct struct {
entries []seqBufEntry
}
func (s *txSeqBufStruct) add(seq seqNum, p []byte) {
s.entries = append(s.entries, seqBufEntry{
seq: seq,
data: p,
addedAt: time.Now(),
})
s.purgeOldEntries()
}
func (s *txSeqBufStruct) purgeOldEntries() {
for len(s.entries) > 0 && time.Since(s.entries[0].addedAt) > time.Second {
s.entries = s.entries[1:]
}
}
func (s *txSeqBufStruct) get(seq seqNum) (d []byte) {
if len(s.entries) == 0 {
return nil
}
// Searching from backwards, as we expect most queries for latest entries.
for i := len(s.entries) - 1; i >= 0; i-- {
if s.entries[i].seq == seq {
d = s.entries[i].data
break
}
}
s.purgeOldEntries()
return d
}