kappanhang/pkt0.go

216 lines
5.2 KiB
Go
Raw Permalink Normal View History

package main
import (
2020-10-25 20:18:24 +01:00
"bytes"
"encoding/binary"
"sync"
"time"
)
const pkt0DefaultSendInterval = 100 * time.Millisecond
const pkt0IdleAfter = time.Second
const pkt0IdleSendInterval = time.Second
type pkt0Type struct {
sendSeq uint16
mutex sync.Mutex // Protects sendSeq
sendTimer *time.Timer
lastTrackedSentAt time.Time
2020-10-25 20:18:24 +01:00
txSeqBuf txSeqBufStruct
periodicIntervalResetChan chan bool
periodicStopNeededChan chan bool
periodicStopFinishedChan chan bool
}
2020-10-25 20:18:24 +01:00
func (p *pkt0Type) retransmitRange(s *streamCommon, start, end uint16) error {
2020-10-29 17:23:31 +01:00
log.Debug(s.name+"/got retransmit request for #", start, "-", end)
2020-10-25 20:18:24 +01:00
for {
2020-10-28 18:03:35 +01:00
netstat.reportRetransmit(1)
2020-10-25 20:18:24 +01:00
d := p.txSeqBuf.get(seqNum(start))
if d != nil {
log.Debug(s.name+"/retransmitting #", start)
if err := s.send(d); err != nil {
return err
2020-10-25 20:18:24 +01:00
}
if err := s.send(d); err != nil {
return err
}
2020-10-25 20:18:24 +01:00
} else {
2020-11-05 23:13:46 +01:00
log.Debug(s.name+"/can't retransmit #", start, " - not found ")
// Sending an idle with the requested seqnum.
if err := p.sendIdle(s, false, start); err != nil {
return err
}
if err := p.sendIdle(s, false, start); err != nil {
return err
}
2020-10-25 20:18:24 +01:00
}
if start == end {
break
}
start++
}
return nil
}
func (p *pkt0Type) handle(s *streamCommon, r []byte) error {
if len(r) < 16 {
return nil
}
if bytes.Equal(r[:6], []byte{0x10, 0x00, 0x00, 0x00, 0x01, 0x00}) {
seq := binary.LittleEndian.Uint16(r[6:8])
d := p.txSeqBuf.get(seqNum(seq))
2020-10-29 17:23:31 +01:00
log.Debug(s.name+"/got retransmit request for #", seq)
2020-10-25 20:18:24 +01:00
if d != nil {
log.Debug(s.name+"/retransmitting #", seq)
2020-10-28 18:03:35 +01:00
netstat.reportRetransmit(1)
if err := s.send(d); err != nil {
return err
2020-10-25 20:18:24 +01:00
}
if err := s.send(d); err != nil {
return err
}
2020-10-25 20:18:24 +01:00
} else {
log.Debug(s.name+"/can't retransmit #", seq, " - not found")
// Sending an idle with the requested seqnum.
if err := p.sendIdle(s, false, seq); err != nil {
return err
}
if err := p.sendIdle(s, false, seq); err != nil {
return err
}
2020-10-25 20:18:24 +01:00
}
} else if bytes.Equal(r[:6], []byte{0x18, 0x00, 0x00, 0x00, 0x01, 0x00}) {
r = r[16:]
for len(r) >= 4 {
start := binary.LittleEndian.Uint16(r[0:2])
end := binary.LittleEndian.Uint16(r[2:4])
if err := p.retransmitRange(s, start, end); err != nil {
return err
}
r = r[4:]
}
}
return nil
}
func (p *pkt0Type) isIdlePkt0(r []byte) bool {
return len(r) == 16 && bytes.Equal(r[:6], []byte{0x10, 0x00, 0x00, 0x00, 0x00, 0x00})
}
func (p *pkt0Type) isPkt0(r []byte) bool {
return len(r) >= 16 && (p.isIdlePkt0(r) ||
bytes.Equal(r[:6], []byte{0x10, 0x00, 0x00, 0x00, 0x01, 0x00}) || // Retransmit request for 1 packet.
bytes.Equal(r[:6], []byte{0x18, 0x00, 0x00, 0x00, 0x01, 0x00})) // Retransmit request for ranges.
}
//var drop int
2020-10-25 20:18:24 +01:00
// The radio can request retransmit for tracked packets. If there are no tracked packets to send, idle pkt0
// packets are periodically sent.
func (p *pkt0Type) sendTrackedPacket(s *streamCommon, d []byte) error {
p.mutex.Lock()
defer p.mutex.Unlock()
// if s.name == "audio" {
// if drop == 0 && time.Now().UnixNano()%100 == 0 {
// log.Print(s.name+"/drop start - ", p.sendSeq)
// drop = 1
// } else if drop > 0 {
// drop++
// if drop == 3 {
// log.Print(s.name+"/drop stop - ", p.sendSeq)
// drop = 0
// }
// }
// }
2020-10-25 20:18:24 +01:00
d[6] = byte(p.sendSeq)
d[7] = byte(p.sendSeq >> 8)
p.txSeqBuf.add(seqNum(p.sendSeq), d)
// if s.name != "audio" || drop == 0 {
if err := s.send(d); err != nil {
return err
}
// }
p.sendSeq++
if !p.isIdlePkt0(d) {
p.lastTrackedSentAt = time.Now()
if p.periodicIntervalResetChan != nil {
// Non-blocking send.
select {
case p.periodicIntervalResetChan <- true:
default:
}
}
}
return nil
}
func (p *pkt0Type) sendIdle(s *streamCommon, tracked bool, seqIfUntracked uint16) error {
d := []byte{0x10, 0x00, 0x00, 0x00, 0x00, 0x00, byte(seqIfUntracked), byte(seqIfUntracked >> 8),
2020-10-25 20:18:24 +01:00
byte(s.localSID >> 24), byte(s.localSID >> 16), byte(s.localSID >> 8), byte(s.localSID),
byte(s.remoteSID >> 24), byte(s.remoteSID >> 16), byte(s.remoteSID >> 8), byte(s.remoteSID)}
if tracked {
return p.sendTrackedPacket(s, d)
}
2020-10-30 10:04:23 +01:00
return s.send(d)
2020-10-25 20:18:24 +01:00
}
func (p *pkt0Type) loop(s *streamCommon) {
for {
select {
case <-p.periodicIntervalResetChan:
if !p.sendTimer.Stop() {
<-p.sendTimer.C
}
p.sendTimer.Reset(pkt0DefaultSendInterval)
case <-p.sendTimer.C:
if err := p.sendIdle(s, true, 0); err != nil {
reportError(err)
}
if time.Since(p.lastTrackedSentAt) >= pkt0IdleAfter {
p.sendTimer.Reset(pkt0IdleSendInterval)
} else {
p.sendTimer.Reset(pkt0DefaultSendInterval)
}
case <-p.periodicStopNeededChan:
p.sendTimer.Stop()
p.periodicStopFinishedChan <- true
return
}
}
}
func (p *pkt0Type) startPeriodicSend(s *streamCommon) {
p.sendTimer = time.NewTimer(pkt0IdleSendInterval)
p.periodicIntervalResetChan = make(chan bool)
p.periodicStopNeededChan = make(chan bool)
p.periodicStopFinishedChan = make(chan bool)
go p.loop(s)
}
func (p *pkt0Type) stopPeriodicSend() {
if p.sendTimer == nil { // Periodic send has not started?
return
}
p.periodicStopNeededChan <- true
<-p.periodicStopFinishedChan
}
func (p *pkt0Type) init(s *streamCommon) {
p.sendSeq = 1
}