kappanhang/seqbuf.go

417 lines
9.4 KiB
Go
Raw Normal View History

2020-10-20 10:48:55 +02:00
package main
import (
"errors"
2020-10-20 14:14:05 +02:00
"sync"
2020-10-20 10:48:55 +02:00
"time"
)
type seqNum int
2020-11-21 11:01:37 +01:00
func (s *seqNum) inc(maxSeqNum seqNum) seqNum {
if *s == maxSeqNum {
return 0
}
return *s + 1
}
func (s *seqNum) dec(maxSeqNum seqNum) seqNum {
if *s == 0 {
return maxSeqNum
}
return *s - 1
}
type seqNumRange [2]seqNum
func (r *seqNumRange) getDiff(maxSeqNum seqNum) (diff int) {
from := r[0]
to := r[1]
if to >= from {
diff = int(to) - int(from)
} else {
r[0] = from
r[1] = to
diff = (int(maxSeqNum) + 1) - int(from) + int(to)
}
return
}
2020-10-20 10:48:55 +02:00
type seqBufEntry struct {
2020-11-21 11:01:37 +01:00
seq seqNum
data []byte
2020-10-20 10:48:55 +02:00
}
2020-11-21 11:01:37 +01:00
type requestRetransmitCallbackType func(r seqNumRange) error
2020-10-20 10:48:55 +02:00
type seqBuf struct {
2020-11-21 11:01:37 +01:00
length time.Duration
maxSeqNum seqNum
maxSeqNumDiff seqNum
requestRetransmitCallback requestRetransmitCallbackType
// Available entries coming out from the seqbuf will be sent to entryChan.
entryChan chan seqBufEntry
// If this is true then the seqBuf is locked, which means no entries will be sent to entryChan.
lockedByInvalidSeq bool
lockedAt time.Time
// This is false until no packets have been sent to the entryChan.
alreadyReturnedFirstSeq bool
// The seqNum of the last packet sent to entryChan.
lastReturnedSeq seqNum
requestedRetransmit bool
lastRequestedRetransmitRange seqNumRange
ignoreMissingPktsUntilEnabled bool
ignoreMissingPktsUntilSeq seqNum
2020-10-20 10:48:55 +02:00
// Note that the most recently added entry is stored as the 0th entry.
entries []seqBufEntry
2020-10-20 14:14:05 +02:00
mutex sync.RWMutex
2020-10-20 10:48:55 +02:00
2020-10-20 14:14:05 +02:00
entryAddedChan chan bool
watcherCloseNeededChan chan bool
watcherCloseDoneChan chan bool
2020-11-21 11:01:37 +01:00
errOutOfOrder error
2020-10-20 10:48:55 +02:00
}
2020-10-20 14:14:05 +02:00
// func (s *seqBuf) string() (out string) {
// if len(s.entries) == 0 {
// return "empty"
// }
// for _, e := range s.entries {
// if out != "" {
// out += " "
// }
// out += fmt.Sprint(e.seq)
// }
// return out
// }
2020-10-20 10:48:55 +02:00
func (s *seqBuf) createEntry(seq seqNum, data []byte) seqBufEntry {
return seqBufEntry{
2020-11-21 11:01:37 +01:00
seq: seq,
data: data,
2020-10-20 10:48:55 +02:00
}
}
2020-10-20 14:14:05 +02:00
func (s *seqBuf) notifyWatcher() {
select {
case s.entryAddedChan <- true:
default:
}
}
2020-10-20 10:48:55 +02:00
func (s *seqBuf) addToFront(seq seqNum, data []byte) {
2020-10-20 14:14:05 +02:00
e := s.createEntry(seq, data)
s.entries = append([]seqBufEntry{e}, s.entries...)
s.notifyWatcher()
2020-10-20 10:48:55 +02:00
}
func (s *seqBuf) addToBack(seq seqNum, data []byte) {
2020-10-20 14:14:05 +02:00
e := s.createEntry(seq, data)
s.entries = append(s.entries, e)
s.notifyWatcher()
2020-10-20 10:48:55 +02:00
}
2020-11-21 11:01:37 +01:00
func (s *seqBuf) insert(seq seqNum, data []byte, toPos int) {
2020-10-20 10:48:55 +02:00
if toPos == 0 {
s.addToFront(seq, data)
2020-11-21 11:01:37 +01:00
return
2020-10-20 10:48:55 +02:00
}
if toPos >= len(s.entries) {
s.addToBack(seq, data)
2020-11-21 11:01:37 +01:00
return
2020-10-20 10:48:55 +02:00
}
sliceBefore := s.entries[:toPos]
sliceAfter := s.entries[toPos:]
2020-10-20 14:14:05 +02:00
e := s.createEntry(seq, data)
s.entries = append(sliceBefore, append([]seqBufEntry{e}, sliceAfter...)...)
s.notifyWatcher()
2020-10-20 10:48:55 +02:00
}
func (s *seqBuf) getDiff(seq1, seq2 seqNum) seqNum {
2020-11-21 11:29:36 +01:00
if seq1 >= seq2 {
2020-10-20 10:48:55 +02:00
return seq1 - seq2
}
seq2Overflowed := s.maxSeqNum + 1 - seq2
return seq2Overflowed + seq1
}
2020-11-21 11:29:36 +01:00
type seqBufCompareResult int
2020-10-20 10:48:55 +02:00
const (
2020-11-21 11:29:36 +01:00
larger = seqBufCompareResult(iota)
smaller
2020-10-20 10:48:55 +02:00
equal
)
2020-11-21 11:29:36 +01:00
// Compares seq to toSeq, considering the seq turnover at maxSeqNum.
// Example: returns larger for seq=2 toSeq=1
// returns smaller for seq=0 toSeq=1
// returns smaller for seq=39 toSeq=1 if maxSeqNum is 40
func (s *seqBuf) compareSeq(seq, toSeq seqNum) seqBufCompareResult {
diff1 := s.getDiff(seq, toSeq)
diff2 := s.getDiff(toSeq, seq)
2020-10-20 10:48:55 +02:00
if diff1 == diff2 {
return equal
}
if diff1 > diff2 {
// This will cause an insert at the current position.
if s.maxSeqNumDiff > 0 && diff2 > s.maxSeqNumDiff {
2020-11-21 11:29:36 +01:00
return larger
2020-10-20 10:48:55 +02:00
}
2020-11-21 11:29:36 +01:00
return smaller
2020-10-20 10:48:55 +02:00
}
2020-11-21 11:29:36 +01:00
return larger
2020-10-20 10:48:55 +02:00
}
2020-11-21 11:01:37 +01:00
func (s *seqBuf) add(seq seqNum, data []byte) error {
2020-10-20 14:14:05 +02:00
s.mutex.Lock()
defer s.mutex.Unlock()
2020-10-20 10:48:55 +02:00
// log.Debug("inserting ", seq)
// defer func() {
// log.Print(s.String())
// }()
if seq > s.maxSeqNum {
2020-11-21 11:01:37 +01:00
return errors.New("seq out of range")
2020-10-20 10:48:55 +02:00
}
if len(s.entries) == 0 {
s.addToFront(seq, data)
2020-11-21 11:01:37 +01:00
return nil
2020-10-20 10:48:55 +02:00
}
2020-11-21 11:01:37 +01:00
if s.entries[0].seq == seq { // Dropping duplicate seq.
return nil
2020-10-20 10:48:55 +02:00
}
// Checking the first entry.
2020-11-21 11:29:36 +01:00
if s.compareSeq(seq, s.entries[0].seq) == larger {
2020-10-20 10:48:55 +02:00
s.addToFront(seq, data)
2020-11-21 11:01:37 +01:00
return nil
2020-10-20 10:48:55 +02:00
}
// Parsing through other entries if there are more than 1.
for i := 1; i < len(s.entries); i++ {
2020-11-21 11:01:37 +01:00
// This seqnum is already in the queue? Ignoring it.
2020-10-20 10:48:55 +02:00
if s.entries[i].seq == seq {
2020-11-21 11:01:37 +01:00
return nil
2020-10-20 10:48:55 +02:00
}
2020-11-21 11:29:36 +01:00
if s.compareSeq(seq, s.entries[i].seq) == larger {
2020-10-20 10:48:55 +02:00
// log.Debug("left for ", s.entries[i].seq)
2020-11-21 11:01:37 +01:00
s.insert(seq, data, i)
return nil
2020-10-20 10:48:55 +02:00
}
// log.Debug("right for ", s.entries[i].seq)
}
// No place found for the item?
s.addToBack(seq, data)
2020-11-21 11:01:37 +01:00
return nil
2020-10-20 10:48:55 +02:00
}
2020-11-21 11:01:37 +01:00
func (s *seqBuf) checkLockTimeout() (timeout bool, shouldRetryIn time.Duration) {
timeSinceLastInvalidSeq := time.Since(s.lockedAt)
if s.length > timeSinceLastInvalidSeq {
shouldRetryIn = s.length - timeSinceLastInvalidSeq
return
}
s.lockedByInvalidSeq = false
// log.Debug("lock timeout")
2020-10-20 14:14:05 +02:00
2020-11-21 11:01:37 +01:00
if s.requestedRetransmit {
s.ignoreMissingPktsUntilSeq = s.lastRequestedRetransmitRange[1]
s.ignoreMissingPktsUntilEnabled = true
2020-10-20 14:14:05 +02:00
}
2020-11-21 11:01:37 +01:00
return true, 0
}
// Returns true if all entries from the requested retransmit range have been received.
func (s *seqBuf) gotRetransmitRange() bool {
entryIdx := len(s.entries)
rangeSeq := s.lastRequestedRetransmitRange[0]
for {
entryIdx--
if entryIdx < 0 {
return false
}
if s.entries[entryIdx].seq != rangeSeq {
// log.Debug("entry idx ", entryIdx, " seq #", s.entries[entryIdx].seq, " does not match ", rangeSeq)
// log.Debug(s.string())
return false
}
if rangeSeq == s.lastRequestedRetransmitRange[1] {
return true
}
rangeSeq = rangeSeq.inc(s.maxSeqNum)
2020-10-20 14:14:05 +02:00
}
}
2020-11-21 11:01:37 +01:00
// shouldRetryIn is only filled when no entry is available, but there are entries in the seqbuf.
// err is not nil if the seqbuf is empty.
func (s *seqBuf) get() (e seqBufEntry, shouldRetryIn time.Duration, err error) {
2020-10-20 14:14:05 +02:00
s.mutex.Lock()
defer s.mutex.Unlock()
2020-10-20 10:48:55 +02:00
if len(s.entries) == 0 {
2020-11-21 11:01:37 +01:00
return e, 0, errors.New("seqbuf is empty")
2020-10-20 10:48:55 +02:00
}
2020-11-21 11:01:37 +01:00
entryCount := len(s.entries)
lastEntryIdx := entryCount - 1
2020-10-20 14:14:05 +02:00
e = s.entries[lastEntryIdx]
2020-11-21 11:01:37 +01:00
if s.alreadyReturnedFirstSeq {
if s.lockedByInvalidSeq {
if s.requestedRetransmit && s.gotRetransmitRange() {
s.lockedByInvalidSeq = false
// log.Debug("lock over")
} else {
var timeout bool
if timeout, shouldRetryIn = s.checkLockTimeout(); !timeout {
return
}
}
} else {
2020-11-21 11:29:36 +01:00
if s.compareSeq(e.seq, seqNum(s.lastReturnedSeq)) != larger {
2020-11-21 11:01:37 +01:00
// log.Debug("ignoring out of order seq ", e.seq)
s.entries = s.entries[:lastEntryIdx]
err = s.errOutOfOrder
return
}
if s.ignoreMissingPktsUntilEnabled {
2020-11-21 11:29:36 +01:00
if s.compareSeq(e.seq, s.ignoreMissingPktsUntilSeq) == larger {
2020-11-21 11:01:37 +01:00
// log.Debug("ignore over ", e.seq, " ", s.ignoreMissingPktsUntilSeq)
s.ignoreMissingPktsUntilEnabled = false
2020-11-21 18:44:13 +01:00
} //else {
// log.Debug("ignoring missing pkt, seq #", e.seq, " until ", s.ignoreMissingPktsUntilSeq)
//}
2020-11-21 11:01:37 +01:00
} else {
expectedNextSeq := s.lastReturnedSeq.inc(s.maxSeqNum)
if e.seq != expectedNextSeq {
// log.Debug("lock on, expected seq ", expectedNextSeq, " got ", e.seq)
s.lockedByInvalidSeq = true
s.lockedAt = time.Now()
s.requestedRetransmit = false
s.ignoreMissingPktsUntilEnabled = false
shouldRetryIn = s.length
if s.requestRetransmitCallback != nil {
s.lastRequestedRetransmitRange[0] = expectedNextSeq
s.lastRequestedRetransmitRange[1] = e.seq.dec(s.maxSeqNum)
if err = s.requestRetransmitCallback(s.lastRequestedRetransmitRange); err == nil {
s.requestedRetransmit = true
}
}
return
}
}
}
}
s.lastReturnedSeq = e.seq
s.alreadyReturnedFirstSeq = true
2020-10-20 10:48:55 +02:00
s.entries = s.entries[:lastEntryIdx]
2020-11-21 11:01:37 +01:00
return e, 0, nil
2020-10-20 14:14:05 +02:00
}
func (s *seqBuf) watcher() {
defer func() {
s.watcherCloseDoneChan <- true
}()
2020-10-26 19:46:08 +01:00
entryAvailableTimer := time.NewTimer(0)
<-entryAvailableTimer.C
2020-10-20 14:14:05 +02:00
var entryAvailableTimerRunning bool
for {
retry := true
2020-10-20 17:40:24 +02:00
for retry {
2020-10-20 17:40:24 +02:00
retry = false
2020-11-21 11:01:37 +01:00
e, t, err := s.get()
if err == nil && t == 0 {
if s.entryChan != nil {
select {
case s.entryChan <- e:
case <-s.watcherCloseNeededChan:
return
2020-10-20 14:14:05 +02:00
}
2020-11-21 11:01:37 +01:00
}
2020-10-20 17:40:24 +02:00
2020-11-21 11:01:37 +01:00
// We may have further available entries.
retry = true
} else {
if err == s.errOutOfOrder {
2020-10-20 17:40:24 +02:00
retry = true
2020-11-21 11:01:37 +01:00
} else if !entryAvailableTimerRunning && t > 0 {
// An entry will be available later, waiting for it.
entryAvailableTimer.Reset(t)
entryAvailableTimerRunning = true
2020-10-20 14:14:05 +02:00
}
}
}
select {
case <-s.watcherCloseNeededChan:
return
case <-s.entryAddedChan:
case <-entryAvailableTimer.C:
entryAvailableTimerRunning = false
}
}
2020-10-20 10:48:55 +02:00
}
// Setting a max. seqnum diff is optional. If it's 0 then the diff will be half of the maxSeqNum range.
// Available entries coming out from the seqbuf will be sent to entryChan.
2020-11-21 11:01:37 +01:00
func (s *seqBuf) init(length time.Duration, maxSeqNum, maxSeqNumDiff seqNum, entryChan chan seqBufEntry,
requestRetransmitCallback requestRetransmitCallbackType) {
2020-10-20 10:48:55 +02:00
s.length = length
s.maxSeqNum = maxSeqNum
s.maxSeqNumDiff = maxSeqNumDiff
2020-10-20 14:14:05 +02:00
s.entryChan = entryChan
2020-11-21 11:01:37 +01:00
s.requestRetransmitCallback = requestRetransmitCallback
2020-10-20 14:14:05 +02:00
s.entryAddedChan = make(chan bool)
s.watcherCloseNeededChan = make(chan bool)
s.watcherCloseDoneChan = make(chan bool)
2020-11-21 11:01:37 +01:00
s.errOutOfOrder = errors.New("out of order pkt")
2020-10-20 14:14:05 +02:00
go s.watcher()
2020-10-20 10:48:55 +02:00
}
2020-10-20 14:14:05 +02:00
func (s *seqBuf) deinit() {
if s.watcherCloseNeededChan == nil { // Init has not ran?
return
}
s.watcherCloseNeededChan <- true
<-s.watcherCloseDoneChan
}