diff --git a/seqbuf.go b/seqbuf.go new file mode 100644 index 0000000..9ecb8be --- /dev/null +++ b/seqbuf.go @@ -0,0 +1,175 @@ +package main + +import ( + "errors" + "fmt" + "time" +) + +type seqNum int + +type seqBufEntry struct { + seq seqNum + data []byte + addedAt time.Time +} + +type seqBuf struct { + length time.Duration + maxSeqNum seqNum + maxSeqNumDiff seqNum + + // Note that the most recently added entry is stored as the 0th entry. + entries []seqBufEntry +} + +func (s *seqBuf) String() (out string) { + if len(s.entries) == 0 { + return "empty" + } + for _, e := range s.entries { + if out != "" { + out += " " + } + out += fmt.Sprint(e.seq) + } + return out +} + +func (s *seqBuf) createEntry(seq seqNum, data []byte) seqBufEntry { + return seqBufEntry{ + seq: seq, + data: data, + addedAt: time.Now(), + } +} + +func (s *seqBuf) addToFront(seq seqNum, data []byte) { + s.entries = append([]seqBufEntry{s.createEntry(seq, data)}, s.entries...) +} + +func (s *seqBuf) addToBack(seq seqNum, data []byte) { + s.entries = append(s.entries, s.createEntry(seq, data)) +} + +func (s *seqBuf) insert(seq seqNum, data []byte, toPos int) { + if toPos == 0 { + s.addToFront(seq, data) + return + } + if toPos >= len(s.entries) { + s.addToBack(seq, data) + return + } + sliceBefore := s.entries[:toPos] + sliceAfter := s.entries[toPos:] + s.entries = append(sliceBefore, append([]seqBufEntry{s.createEntry(seq, data)}, sliceAfter...)...) +} + +func (s *seqBuf) getDiff(seq1, seq2 seqNum) seqNum { + if seq1 > seq2 { + return seq1 - seq2 + } + seq2Overflowed := s.maxSeqNum + 1 - seq2 + return seq2Overflowed + seq1 +} + +const ( + left = iota + right + equal +) + +type direction int + +// Decides the direction of which seq is closer to whichSeq, considering the seq turnover at maxSeqNum. +// Example: returns left for seq=2 whichSeq=1 +// returns right for seq=0 whichSeq=1 +// returns right for seq=39 whichSeq=1 if maxSeqNum is 40 +func (s *seqBuf) leftOrRightCloserToSeq(seq, whichSeq seqNum) direction { + diff1 := s.getDiff(seq, whichSeq) + diff2 := s.getDiff(whichSeq, seq) + + if diff1 == diff2 { + return equal + } + + if diff1 > diff2 { + // This will cause an insert at the current position. + if s.maxSeqNumDiff > 0 && diff2 > s.maxSeqNumDiff { + return left + } + + return right + } + + return left +} + +func (s *seqBuf) add(seq seqNum, data []byte) error { + // log.Debug("inserting ", seq) + // defer func() { + // log.Print(s.String()) + // }() + + if seq > s.maxSeqNum { + return errors.New("seq out of range") + } + + if len(s.entries) == 0 { + s.addToFront(seq, data) + return nil + } + + if s.entries[0].seq == seq { + return errors.New("dropping duplicate seq") + } + + // Checking the first entry. + if s.leftOrRightCloserToSeq(seq, s.entries[0].seq) == left { + s.addToFront(seq, data) + return nil + } + + // Parsing through other entries if there are more than 1. + for i := 1; i < len(s.entries); i++ { + // This seqnum is already in the queue, but not as the first entry? It's not a duplicate packet. + // It can be a beginning of a new stream for example. + if s.entries[i].seq == seq { + s.addToFront(seq, data) + return nil + } + + if s.leftOrRightCloserToSeq(seq, s.entries[i].seq) == left { + // log.Debug("left for ", s.entries[i].seq) + s.insert(seq, data, i) + return nil + } + // log.Debug("right for ", s.entries[i].seq) + } + + // No place found for the item? + s.addToBack(seq, data) + return nil +} + +func (s *seqBuf) get() (data []byte, err error) { + if len(s.entries) == 0 { + return nil, errors.New("seqbuf is empty") + } + lastEntryIdx := len(s.entries) - 1 + if time.Since(s.entries[lastEntryIdx].addedAt) < s.length { + return nil, errors.New("no available entries") + } + data = make([]byte, len(s.entries[lastEntryIdx].data)) + copy(data, s.entries[lastEntryIdx].data) + s.entries = s.entries[:lastEntryIdx] + return data, nil +} + +// Setting a max. seqnum diff is optional. If it's 0 then the diff will be half of the maxSeqNum range. +func (s *seqBuf) init(length time.Duration, maxSeqNum, maxSeqNumDiff seqNum) { + s.length = length + s.maxSeqNum = maxSeqNum + s.maxSeqNumDiff = maxSeqNumDiff +}