Add using rx seqbuf

This commit is contained in:
Nonoo 2020-10-20 14:14:05 +02:00
parent 1f601c11c7
commit d6217fe706
2 changed files with 159 additions and 54 deletions

View file

@ -10,6 +10,7 @@ import (
)
const audioTimeoutDuration = 3 * time.Second
const rxSeqBufLength = 100 * time.Millisecond
type audioStream struct {
common streamCommon
@ -17,6 +18,8 @@ type audioStream struct {
timeoutTimer *time.Timer
receivedAudio bool
lastReceivedAudioSeq uint16
rxSeqBuf seqBuf
rxSeqBufEntryChan chan seqBufEntry
audioSendSeq uint16
}
@ -25,6 +28,26 @@ func (s *audioStream) sendDisconnect() {
s.common.sendDisconnect()
}
// sendPart1 expects 1364 bytes of PCM data.
func (s *audioStream) sendPart1(pcmData []byte) {
s.common.send(append([]byte{0x6c, 0x05, 0x00, 0x00, 0x00, 0x00, byte(s.audioSendSeq), byte(s.audioSendSeq >> 8),
byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID),
byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID),
0x80, 0x00, byte((s.audioSendSeq - 1) >> 8), byte(s.audioSendSeq - 1), 0x00, 0x00, byte(len(pcmData) >> 8), byte(len(pcmData))},
pcmData...))
s.audioSendSeq++
}
// sendPart2 expects 556 bytes of PCM data.
func (s *audioStream) sendPart2(pcmData []byte) {
s.common.send(append([]byte{0x44, 0x02, 0x00, 0x00, 0x00, 0x00, byte(s.audioSendSeq), byte(s.audioSendSeq >> 8),
byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID),
byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID),
0x80, 0x00, byte((s.audioSendSeq - 1) >> 8), byte(s.audioSendSeq - 1), 0x00, 0x00, byte(len(pcmData) >> 8), byte(len(pcmData))},
pcmData...))
s.audioSendSeq++
}
func (s *audioStream) sendRetransmitRequest(seqNum uint16) {
p := []byte{0x10, 0x00, 0x00, 0x00, 0x01, 0x00, byte(seqNum), byte(seqNum >> 8),
byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID),
@ -51,13 +74,8 @@ func (s *audioStream) sendRetransmitRequestForRanges(seqNumRanges []seqNumRange)
s.common.send(p)
}
func (s *audioStream) handleAudioPacket(r []byte) {
if s.timeoutTimer != nil {
s.timeoutTimer.Stop()
s.timeoutTimer.Reset(audioTimeoutDuration)
}
gotSeq := binary.LittleEndian.Uint16(r[6:8])
func (s *audioStream) handleRxSeqBufEntry(e seqBufEntry) {
gotSeq := uint16(e.seq)
if s.receivedAudio {
expectedSeq := s.lastReceivedAudioSeq + 1
if expectedSeq != gotSeq {
@ -73,11 +91,22 @@ func (s *audioStream) handleAudioPacket(r []byte) {
s.lastReceivedAudioSeq = gotSeq
s.receivedAudio = true
// log.Print("got audio packet ", len(r[24:]), " bytes seq ", gotSeq)
// TODO: audioPipes.source.Write()
}
func (s *audioStream) handleAudioPacket(r []byte) {
if s.timeoutTimer != nil {
s.timeoutTimer.Stop()
s.timeoutTimer.Reset(audioTimeoutDuration)
}
gotSeq := binary.LittleEndian.Uint16(r[6:8])
err := s.rxSeqBuf.add(seqNum(gotSeq), r[24:])
if err != nil {
log.Error(err)
}
}
// TODO: audioPipes.sink.Read() + sendPart1(); sendPart2()
func (s *audioStream) handleRead(r []byte) {
@ -86,28 +115,10 @@ func (s *audioStream) handleRead(r []byte) {
}
}
// sendPart1 expects 1364 bytes of PCM data.
func (s *audioStream) sendPart1(pcmData []byte) {
s.common.send(append([]byte{0x6c, 0x05, 0x00, 0x00, 0x00, 0x00, byte(s.audioSendSeq), byte(s.audioSendSeq >> 8),
byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID),
byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID),
0x80, 0x00, byte((s.audioSendSeq - 1) >> 8), byte(s.audioSendSeq - 1), 0x00, 0x00, byte(len(pcmData) >> 8), byte(len(pcmData))},
pcmData...))
s.audioSendSeq++
}
// sendPart2 expects 556 bytes of PCM data.
func (s *audioStream) sendPart2(pcmData []byte) {
s.common.send(append([]byte{0x44, 0x02, 0x00, 0x00, 0x00, 0x00, byte(s.audioSendSeq), byte(s.audioSendSeq >> 8),
byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID),
byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID),
0x80, 0x00, byte((s.audioSendSeq - 1) >> 8), byte(s.audioSendSeq - 1), 0x00, 0x00, byte(len(pcmData) >> 8), byte(len(pcmData))},
pcmData...))
s.audioSendSeq++
}
func (s *audioStream) init() {
s.common.open("audio", 50003)
s.rxSeqBufEntryChan = make(chan seqBufEntry)
s.rxSeqBuf.init(rxSeqBufLength, 0xffff, 0, s.rxSeqBufEntryChan)
}
func (s *audioStream) start() {
@ -126,13 +137,14 @@ func (s *audioStream) start() {
testSendTicker := time.NewTicker(80 * time.Millisecond) // TODO: remove
var r []byte
for {
select {
case r = <-s.common.readChan:
case r := <-s.common.readChan:
s.handleRead(r)
case <-s.timeoutTimer.C:
exit(errors.New("timeout"))
case e := <-s.rxSeqBufEntryChan:
s.handleRxSeqBufEntry(e)
case <-testSendTicker.C: // TODO: remove
b1 := make([]byte, 1364)
s.sendPart1(b1)

139
seqbuf.go
View file

@ -2,8 +2,10 @@ package main
import (
"errors"
"fmt"
"sync"
"time"
"github.com/nonoo/kappanhang/log"
)
type seqNum int
@ -18,23 +20,29 @@ type seqBuf struct {
length time.Duration
maxSeqNum seqNum
maxSeqNumDiff seqNum
entryChan chan seqBufEntry
// Note that the most recently added entry is stored as the 0th entry.
entries []seqBufEntry
mutex sync.RWMutex
entryAddedChan chan bool
watcherCloseNeededChan chan bool
watcherCloseDoneChan chan bool
}
func (s *seqBuf) String() (out string) {
if len(s.entries) == 0 {
return "empty"
}
for _, e := range s.entries {
if out != "" {
out += " "
}
out += fmt.Sprint(e.seq)
}
return out
}
// func (s *seqBuf) string() (out string) {
// if len(s.entries) == 0 {
// return "empty"
// }
// for _, e := range s.entries {
// if out != "" {
// out += " "
// }
// out += fmt.Sprint(e.seq)
// }
// return out
// }
func (s *seqBuf) createEntry(seq seqNum, data []byte) seqBufEntry {
return seqBufEntry{
@ -44,12 +52,25 @@ func (s *seqBuf) createEntry(seq seqNum, data []byte) seqBufEntry {
}
}
func (s *seqBuf) notifyWatcher() {
select {
case s.entryAddedChan <- true:
default:
}
}
func (s *seqBuf) addToFront(seq seqNum, data []byte) {
s.entries = append([]seqBufEntry{s.createEntry(seq, data)}, s.entries...)
e := s.createEntry(seq, data)
s.entries = append([]seqBufEntry{e}, s.entries...)
s.notifyWatcher()
}
func (s *seqBuf) addToBack(seq seqNum, data []byte) {
s.entries = append(s.entries, s.createEntry(seq, data))
e := s.createEntry(seq, data)
s.entries = append(s.entries, e)
s.notifyWatcher()
}
func (s *seqBuf) insert(seq seqNum, data []byte, toPos int) {
@ -63,7 +84,10 @@ func (s *seqBuf) insert(seq seqNum, data []byte, toPos int) {
}
sliceBefore := s.entries[:toPos]
sliceAfter := s.entries[toPos:]
s.entries = append(sliceBefore, append([]seqBufEntry{s.createEntry(seq, data)}, sliceAfter...)...)
e := s.createEntry(seq, data)
s.entries = append(sliceBefore, append([]seqBufEntry{e}, sliceAfter...)...)
s.notifyWatcher()
}
func (s *seqBuf) getDiff(seq1, seq2 seqNum) seqNum {
@ -107,6 +131,9 @@ func (s *seqBuf) leftOrRightCloserToSeq(seq, whichSeq seqNum) direction {
}
func (s *seqBuf) add(seq seqNum, data []byte) error {
s.mutex.Lock()
defer s.mutex.Unlock()
// log.Debug("inserting ", seq)
// defer func() {
// log.Print(s.String())
@ -153,23 +180,89 @@ func (s *seqBuf) add(seq seqNum, data []byte) error {
return nil
}
func (s *seqBuf) get() (data []byte, err error) {
func (s *seqBuf) getNextDataAvailableRemainingTime() (time.Duration, error) {
s.mutex.RLock()
defer s.mutex.RUnlock()
if len(s.entries) == 0 {
return nil, errors.New("seqbuf is empty")
return 0, errors.New("seqbuf is empty")
}
lastEntryIdx := len(s.entries) - 1
inBufDuration := time.Since(s.entries[lastEntryIdx].addedAt)
if inBufDuration >= s.length {
return 0, nil
}
return s.length - inBufDuration, nil
}
func (s *seqBuf) get() (e seqBufEntry, err error) {
s.mutex.Lock()
defer s.mutex.Unlock()
if len(s.entries) == 0 {
return e, errors.New("seqbuf is empty")
}
lastEntryIdx := len(s.entries) - 1
if time.Since(s.entries[lastEntryIdx].addedAt) < s.length {
return nil, errors.New("no available entries")
return e, errors.New("no available entries")
}
data = make([]byte, len(s.entries[lastEntryIdx].data))
copy(data, s.entries[lastEntryIdx].data)
e = s.entries[lastEntryIdx]
s.entries = s.entries[:lastEntryIdx]
return data, nil
return e, nil
}
func (s *seqBuf) watcher() {
defer func() {
s.watcherCloseDoneChan <- true
}()
entryAvailableTimer := time.NewTimer(s.length)
entryAvailableTimer.Stop()
var entryAvailableTimerRunning bool
for {
t, err := s.getNextDataAvailableRemainingTime()
if err == nil {
if t == 0 { // Do we have an entry available right now?
e, err := s.get()
if err == nil {
if s.entryChan != nil {
s.entryChan <- e
}
} else {
log.Error(err)
}
} else if !entryAvailableTimerRunning {
// An entry will be available later, waiting for it.
entryAvailableTimer.Reset(t)
entryAvailableTimerRunning = true
}
}
select {
case <-s.watcherCloseNeededChan:
return
case <-s.entryAddedChan:
case <-entryAvailableTimer.C:
entryAvailableTimerRunning = false
}
}
}
// Setting a max. seqnum diff is optional. If it's 0 then the diff will be half of the maxSeqNum range.
func (s *seqBuf) init(length time.Duration, maxSeqNum, maxSeqNumDiff seqNum) {
func (s *seqBuf) init(length time.Duration, maxSeqNum, maxSeqNumDiff seqNum, entryChan chan seqBufEntry) {
s.length = length
s.maxSeqNum = maxSeqNum
s.maxSeqNumDiff = maxSeqNumDiff
s.entryChan = entryChan
s.entryAddedChan = make(chan bool)
s.watcherCloseNeededChan = make(chan bool)
s.watcherCloseDoneChan = make(chan bool)
go s.watcher()
}
// func (s *seqBuf) deinit() {
// s.watcherCloseNeededChan <- true
// <-s.watcherCloseDoneChan
// }