diff --git a/pkt0.go b/pkt0.go index d160869..dec70ef 100644 --- a/pkt0.go +++ b/pkt0.go @@ -7,24 +7,22 @@ import ( "time" ) +const pkt0DefaultSendInterval = 100 * time.Millisecond +const pkt0IdleAfter = time.Second +const pkt0IdleSendInterval = time.Second + type pkt0Type struct { sendSeq uint16 - mutex sync.Mutex + mutex sync.Mutex // Protects sendSeq - sendTicker *time.Ticker + sendTimer *time.Timer + lastTrackedSentAt time.Time txSeqBuf txSeqBufStruct - periodicStopNeededChan chan bool - periodicStopFinishedChan chan bool -} - -func (p *pkt0Type) sendSeqLock() { - p.mutex.Lock() -} - -func (p *pkt0Type) sendSeqUnlock() { - p.mutex.Unlock() + periodicIntervalResetChan chan bool + periodicStopNeededChan chan bool + periodicStopFinishedChan chan bool } func (p *pkt0Type) retransmitRange(s *streamCommon, start, end uint16) error { @@ -106,8 +104,8 @@ func (p *pkt0Type) isPkt0(r []byte) bool { // The radio can request retransmit for tracked packets. If there are no tracked packets to send, idle pkt0 // packets are periodically sent. func (p *pkt0Type) sendTrackedPacket(s *streamCommon, d []byte) error { - p.sendSeqLock() - defer p.sendSeqUnlock() + p.mutex.Lock() + defer p.mutex.Unlock() // if s.name == "audio" { // if drop == 0 && time.Now().UnixNano()%100 == 0 { @@ -131,6 +129,18 @@ func (p *pkt0Type) sendTrackedPacket(s *streamCommon, d []byte) error { } // } p.sendSeq++ + + if !p.isIdlePkt0(d) { + p.lastTrackedSentAt = time.Now() + if p.periodicIntervalResetChan != nil { + // Non-blocking send. + select { + case p.periodicIntervalResetChan <- true: + default: + } + } + } + return nil } @@ -148,11 +158,23 @@ func (p *pkt0Type) sendIdle(s *streamCommon, tracked bool, seqIfUntracked uint16 func (p *pkt0Type) loop(s *streamCommon) { for { select { - case <-p.sendTicker.C: + case <-p.periodicIntervalResetChan: + if !p.sendTimer.Stop() { + <-p.sendTimer.C + } + p.sendTimer.Reset(pkt0DefaultSendInterval) + case <-p.sendTimer.C: if err := p.sendIdle(s, true, 0); err != nil { reportError(err) } + + if time.Since(p.lastTrackedSentAt) >= pkt0IdleAfter { + p.sendTimer.Reset(pkt0IdleSendInterval) + } else { + p.sendTimer.Reset(pkt0DefaultSendInterval) + } case <-p.periodicStopNeededChan: + p.sendTimer.Stop() p.periodicStopFinishedChan <- true return } @@ -160,22 +182,21 @@ func (p *pkt0Type) loop(s *streamCommon) { } func (p *pkt0Type) startPeriodicSend(s *streamCommon) { - p.sendTicker = time.NewTicker(100 * time.Millisecond) + p.sendTimer = time.NewTimer(pkt0IdleSendInterval) + p.periodicIntervalResetChan = make(chan bool) p.periodicStopNeededChan = make(chan bool) p.periodicStopFinishedChan = make(chan bool) go p.loop(s) } func (p *pkt0Type) stopPeriodicSend() { - if p.sendTicker == nil { // Periodic send has not started? + if p.sendTimer == nil { // Periodic send has not started? return } p.periodicStopNeededChan <- true <-p.periodicStopFinishedChan - - p.sendTicker.Stop() } func (p *pkt0Type) init(s *streamCommon) {