kappanhang/seqbuf.go

287 lines
6.4 KiB
Go

package main
import (
"errors"
"sync"
"time"
)
type seqNum int
type seqBufEntry struct {
seq seqNum
data []byte
addedAt time.Time
}
type seqBuf struct {
length time.Duration
maxSeqNum seqNum
maxSeqNumDiff seqNum
entryChan chan seqBufEntry
// Note that the most recently added entry is stored as the 0th entry.
entries []seqBufEntry
mutex sync.RWMutex
entryAddedChan chan bool
watcherCloseNeededChan chan bool
watcherCloseDoneChan chan bool
}
// func (s *seqBuf) string() (out string) {
// if len(s.entries) == 0 {
// return "empty"
// }
// for _, e := range s.entries {
// if out != "" {
// out += " "
// }
// out += fmt.Sprint(e.seq)
// }
// return out
// }
func (s *seqBuf) createEntry(seq seqNum, data []byte) seqBufEntry {
return seqBufEntry{
seq: seq,
data: data,
addedAt: time.Now(),
}
}
func (s *seqBuf) notifyWatcher() {
select {
case s.entryAddedChan <- true:
default:
}
}
func (s *seqBuf) addToFront(seq seqNum, data []byte) {
e := s.createEntry(seq, data)
s.entries = append([]seqBufEntry{e}, s.entries...)
s.notifyWatcher()
}
func (s *seqBuf) addToBack(seq seqNum, data []byte) {
e := s.createEntry(seq, data)
e.addedAt = time.Time{} // Release the packet from the seqbuf as soon as possible, as it is a late entry.
s.entries = append(s.entries, e)
s.notifyWatcher()
}
func (s *seqBuf) insert(seq seqNum, data []byte, toPos int) (addedToFront bool) {
if toPos == 0 {
s.addToFront(seq, data)
return true
}
if toPos >= len(s.entries) {
s.addToBack(seq, data)
return false
}
sliceBefore := s.entries[:toPos]
sliceAfter := s.entries[toPos:]
e := s.createEntry(seq, data)
e.addedAt = time.Time{} // Release the packet from the seqbuf as soon as possible, as it is a late entry.
s.entries = append(sliceBefore, append([]seqBufEntry{e}, sliceAfter...)...)
s.notifyWatcher()
return false
}
func (s *seqBuf) getDiff(seq1, seq2 seqNum) seqNum {
if seq1 > seq2 {
return seq1 - seq2
}
seq2Overflowed := s.maxSeqNum + 1 - seq2
return seq2Overflowed + seq1
}
const (
left = iota
right
equal
)
type direction int
// Decides the direction of which seq is closer to whichSeq, considering the seq turnover at maxSeqNum.
// Basically left means seq is larger than whichSeq, and right means seq is smaller than whichSeq.
// Example: returns left for seq=2 whichSeq=1
// returns right for seq=0 whichSeq=1
// returns right for seq=39 whichSeq=1 if maxSeqNum is 40
func (s *seqBuf) leftOrRightCloserToSeq(seq, whichSeq seqNum) direction {
diff1 := s.getDiff(seq, whichSeq)
diff2 := s.getDiff(whichSeq, seq)
if diff1 == diff2 {
return equal
}
if diff1 > diff2 {
// This will cause an insert at the current position.
if s.maxSeqNumDiff > 0 && diff2 > s.maxSeqNumDiff {
return left
}
return right
}
return left
}
func (s *seqBuf) add(seq seqNum, data []byte) (addedToFront bool, err error) {
s.mutex.Lock()
defer s.mutex.Unlock()
// log.Debug("inserting ", seq)
// defer func() {
// log.Print(s.String())
// }()
if seq > s.maxSeqNum {
return false, errors.New("seq out of range")
}
if len(s.entries) == 0 {
s.addToFront(seq, data)
return true, nil
}
if s.entries[0].seq == seq {
return false, errors.New("dropping duplicate seq")
}
// Checking the first entry.
if s.leftOrRightCloserToSeq(seq, s.entries[0].seq) == left {
s.addToFront(seq, data)
return true, nil
}
// Parsing through other entries if there are more than 1.
for i := 1; i < len(s.entries); i++ {
// This seqnum is already in the queue, but not as the first entry? It's not a duplicate packet.
// It can be a beginning of a new stream for example.
if s.entries[i].seq == seq {
// s.addToFront(seq, data)
return false, nil
}
if s.leftOrRightCloserToSeq(seq, s.entries[i].seq) == left {
// log.Debug("left for ", s.entries[i].seq)
return s.insert(seq, data, i), nil
}
// log.Debug("right for ", s.entries[i].seq)
}
// No place found for the item?
s.addToBack(seq, data)
return false, nil
}
func (s *seqBuf) getNextDataAvailableRemainingTime() (time.Duration, error) {
s.mutex.RLock()
defer s.mutex.RUnlock()
if len(s.entries) == 0 {
return 0, errors.New("seqbuf is empty")
}
lastEntryIdx := len(s.entries) - 1
inBufDuration := time.Since(s.entries[lastEntryIdx].addedAt)
if inBufDuration >= s.length {
return 0, nil
}
return s.length - inBufDuration, nil
}
func (s *seqBuf) get() (e seqBufEntry, err error) {
s.mutex.Lock()
defer s.mutex.Unlock()
if len(s.entries) == 0 {
return e, errors.New("seqbuf is empty")
}
lastEntryIdx := len(s.entries) - 1
if time.Since(s.entries[lastEntryIdx].addedAt) < s.length {
return e, errors.New("no available entries")
}
e = s.entries[lastEntryIdx]
s.entries = s.entries[:lastEntryIdx]
return e, nil
}
func (s *seqBuf) watcher() {
defer func() {
s.watcherCloseDoneChan <- true
}()
entryAvailableTimer := time.NewTimer(s.length)
entryAvailableTimer.Stop()
var entryAvailableTimerRunning bool
for {
retry := true
for retry {
retry = false
t, err := s.getNextDataAvailableRemainingTime()
if err == nil {
if t == 0 { // Do we have an entry available right now?
e, err := s.get()
if err == nil {
if s.entryChan != nil {
select {
case s.entryChan <- e:
case <-s.watcherCloseNeededChan:
return
}
}
} else {
log.Error(err)
}
// We may have further available entries.
retry = true
} else if !entryAvailableTimerRunning {
// An entry will be available later, waiting for it.
entryAvailableTimer.Reset(t)
entryAvailableTimerRunning = true
}
}
}
select {
case <-s.watcherCloseNeededChan:
return
case <-s.entryAddedChan:
case <-entryAvailableTimer.C:
entryAvailableTimerRunning = false
}
}
}
// Setting a max. seqnum diff is optional. If it's 0 then the diff will be half of the maxSeqNum range.
// Available entries coming out from the seqbuf will be sent to entryChan.
func (s *seqBuf) init(length time.Duration, maxSeqNum, maxSeqNumDiff seqNum, entryChan chan seqBufEntry) {
s.length = length
s.maxSeqNum = maxSeqNum
s.maxSeqNumDiff = maxSeqNumDiff
s.entryChan = entryChan
s.entryAddedChan = make(chan bool)
s.watcherCloseNeededChan = make(chan bool)
s.watcherCloseDoneChan = make(chan bool)
go s.watcher()
}
func (s *seqBuf) deinit() {
if s.watcherCloseNeededChan == nil { // Init has not ran?
return
}
s.watcherCloseNeededChan <- true
<-s.watcherCloseDoneChan
}