diff --git a/pkt7.go b/pkt7.go index e4788f3..4ae0eb9 100644 --- a/pkt7.go +++ b/pkt7.go @@ -27,6 +27,8 @@ type pkt7Type struct { periodicStopFinishedChan chan bool } +var controlStreamLatency time.Duration + func (p *pkt7Type) isPkt7(r []byte) bool { return len(r) == 21 && bytes.Equal(r[1:6], []byte{0x00, 0x00, 0x00, 0x07, 0x00}) // Note that the first byte can be 0x15 or 0x00, so we ignore that. } @@ -54,6 +56,8 @@ func (p *pkt7Type) handle(s *streamCommon, r []byte) error { p.latency += time.Since(p.lastSendAt) p.latency /= 2 statusLog.reportRTTLatency(p.latency) + + controlStreamLatency = p.latency } } diff --git a/seqbuf.go b/seqbuf.go index cf1c4e9..dce356f 100644 --- a/seqbuf.go +++ b/seqbuf.go @@ -228,8 +228,12 @@ func (s *seqBuf) add(seq seqNum, data []byte) error { func (s *seqBuf) checkLockTimeout() (timeout bool, shouldRetryIn time.Duration) { timeSinceLastInvalidSeq := time.Since(s.lockedAt) - if s.length > timeSinceLastInvalidSeq { - shouldRetryIn = s.length - timeSinceLastInvalidSeq + lockDuration := s.length + if lockDuration < controlStreamLatency*2 { + lockDuration = controlStreamLatency * 2 + } + if lockDuration > timeSinceLastInvalidSeq { + shouldRetryIn = lockDuration - timeSinceLastInvalidSeq return }