diff --git a/audiostream.go b/audiostream.go index 4a44519..cbfa236 100644 --- a/audiostream.go +++ b/audiostream.go @@ -10,6 +10,7 @@ import ( ) const audioTimeoutDuration = 3 * time.Second +const rxSeqBufLength = 100 * time.Millisecond type audioStream struct { common streamCommon @@ -17,6 +18,8 @@ type audioStream struct { timeoutTimer *time.Timer receivedAudio bool lastReceivedAudioSeq uint16 + rxSeqBuf seqBuf + rxSeqBufEntryChan chan seqBufEntry audioSendSeq uint16 } @@ -25,6 +28,26 @@ func (s *audioStream) sendDisconnect() { s.common.sendDisconnect() } +// sendPart1 expects 1364 bytes of PCM data. +func (s *audioStream) sendPart1(pcmData []byte) { + s.common.send(append([]byte{0x6c, 0x05, 0x00, 0x00, 0x00, 0x00, byte(s.audioSendSeq), byte(s.audioSendSeq >> 8), + byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID), + byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID), + 0x80, 0x00, byte((s.audioSendSeq - 1) >> 8), byte(s.audioSendSeq - 1), 0x00, 0x00, byte(len(pcmData) >> 8), byte(len(pcmData))}, + pcmData...)) + s.audioSendSeq++ +} + +// sendPart2 expects 556 bytes of PCM data. +func (s *audioStream) sendPart2(pcmData []byte) { + s.common.send(append([]byte{0x44, 0x02, 0x00, 0x00, 0x00, 0x00, byte(s.audioSendSeq), byte(s.audioSendSeq >> 8), + byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID), + byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID), + 0x80, 0x00, byte((s.audioSendSeq - 1) >> 8), byte(s.audioSendSeq - 1), 0x00, 0x00, byte(len(pcmData) >> 8), byte(len(pcmData))}, + pcmData...)) + s.audioSendSeq++ +} + func (s *audioStream) sendRetransmitRequest(seqNum uint16) { p := []byte{0x10, 0x00, 0x00, 0x00, 0x01, 0x00, byte(seqNum), byte(seqNum >> 8), byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID), @@ -51,13 +74,8 @@ func (s *audioStream) sendRetransmitRequestForRanges(seqNumRanges []seqNumRange) s.common.send(p) } -func (s *audioStream) handleAudioPacket(r []byte) { - if s.timeoutTimer != nil { - s.timeoutTimer.Stop() - s.timeoutTimer.Reset(audioTimeoutDuration) - } - - gotSeq := binary.LittleEndian.Uint16(r[6:8]) +func (s *audioStream) handleRxSeqBufEntry(e seqBufEntry) { + gotSeq := uint16(e.seq) if s.receivedAudio { expectedSeq := s.lastReceivedAudioSeq + 1 if expectedSeq != gotSeq { @@ -73,11 +91,22 @@ func (s *audioStream) handleAudioPacket(r []byte) { s.lastReceivedAudioSeq = gotSeq s.receivedAudio = true - // log.Print("got audio packet ", len(r[24:]), " bytes seq ", gotSeq) - // TODO: audioPipes.source.Write() } +func (s *audioStream) handleAudioPacket(r []byte) { + if s.timeoutTimer != nil { + s.timeoutTimer.Stop() + s.timeoutTimer.Reset(audioTimeoutDuration) + } + + gotSeq := binary.LittleEndian.Uint16(r[6:8]) + err := s.rxSeqBuf.add(seqNum(gotSeq), r[24:]) + if err != nil { + log.Error(err) + } +} + // TODO: audioPipes.sink.Read() + sendPart1(); sendPart2() func (s *audioStream) handleRead(r []byte) { @@ -86,28 +115,10 @@ func (s *audioStream) handleRead(r []byte) { } } -// sendPart1 expects 1364 bytes of PCM data. -func (s *audioStream) sendPart1(pcmData []byte) { - s.common.send(append([]byte{0x6c, 0x05, 0x00, 0x00, 0x00, 0x00, byte(s.audioSendSeq), byte(s.audioSendSeq >> 8), - byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID), - byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID), - 0x80, 0x00, byte((s.audioSendSeq - 1) >> 8), byte(s.audioSendSeq - 1), 0x00, 0x00, byte(len(pcmData) >> 8), byte(len(pcmData))}, - pcmData...)) - s.audioSendSeq++ -} - -// sendPart2 expects 556 bytes of PCM data. -func (s *audioStream) sendPart2(pcmData []byte) { - s.common.send(append([]byte{0x44, 0x02, 0x00, 0x00, 0x00, 0x00, byte(s.audioSendSeq), byte(s.audioSendSeq >> 8), - byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID), - byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID), - 0x80, 0x00, byte((s.audioSendSeq - 1) >> 8), byte(s.audioSendSeq - 1), 0x00, 0x00, byte(len(pcmData) >> 8), byte(len(pcmData))}, - pcmData...)) - s.audioSendSeq++ -} - func (s *audioStream) init() { s.common.open("audio", 50003) + s.rxSeqBufEntryChan = make(chan seqBufEntry) + s.rxSeqBuf.init(rxSeqBufLength, 0xffff, 0, s.rxSeqBufEntryChan) } func (s *audioStream) start() { @@ -126,13 +137,14 @@ func (s *audioStream) start() { testSendTicker := time.NewTicker(80 * time.Millisecond) // TODO: remove - var r []byte for { select { - case r = <-s.common.readChan: + case r := <-s.common.readChan: s.handleRead(r) case <-s.timeoutTimer.C: exit(errors.New("timeout")) + case e := <-s.rxSeqBufEntryChan: + s.handleRxSeqBufEntry(e) case <-testSendTicker.C: // TODO: remove b1 := make([]byte, 1364) s.sendPart1(b1) diff --git a/seqbuf.go b/seqbuf.go index 9ecb8be..3b5b7bf 100644 --- a/seqbuf.go +++ b/seqbuf.go @@ -2,8 +2,10 @@ package main import ( "errors" - "fmt" + "sync" "time" + + "github.com/nonoo/kappanhang/log" ) type seqNum int @@ -18,23 +20,29 @@ type seqBuf struct { length time.Duration maxSeqNum seqNum maxSeqNumDiff seqNum + entryChan chan seqBufEntry // Note that the most recently added entry is stored as the 0th entry. entries []seqBufEntry + mutex sync.RWMutex + + entryAddedChan chan bool + watcherCloseNeededChan chan bool + watcherCloseDoneChan chan bool } -func (s *seqBuf) String() (out string) { - if len(s.entries) == 0 { - return "empty" - } - for _, e := range s.entries { - if out != "" { - out += " " - } - out += fmt.Sprint(e.seq) - } - return out -} +// func (s *seqBuf) string() (out string) { +// if len(s.entries) == 0 { +// return "empty" +// } +// for _, e := range s.entries { +// if out != "" { +// out += " " +// } +// out += fmt.Sprint(e.seq) +// } +// return out +// } func (s *seqBuf) createEntry(seq seqNum, data []byte) seqBufEntry { return seqBufEntry{ @@ -44,12 +52,25 @@ func (s *seqBuf) createEntry(seq seqNum, data []byte) seqBufEntry { } } +func (s *seqBuf) notifyWatcher() { + select { + case s.entryAddedChan <- true: + default: + } +} + func (s *seqBuf) addToFront(seq seqNum, data []byte) { - s.entries = append([]seqBufEntry{s.createEntry(seq, data)}, s.entries...) + e := s.createEntry(seq, data) + s.entries = append([]seqBufEntry{e}, s.entries...) + + s.notifyWatcher() } func (s *seqBuf) addToBack(seq seqNum, data []byte) { - s.entries = append(s.entries, s.createEntry(seq, data)) + e := s.createEntry(seq, data) + s.entries = append(s.entries, e) + + s.notifyWatcher() } func (s *seqBuf) insert(seq seqNum, data []byte, toPos int) { @@ -63,7 +84,10 @@ func (s *seqBuf) insert(seq seqNum, data []byte, toPos int) { } sliceBefore := s.entries[:toPos] sliceAfter := s.entries[toPos:] - s.entries = append(sliceBefore, append([]seqBufEntry{s.createEntry(seq, data)}, sliceAfter...)...) + e := s.createEntry(seq, data) + s.entries = append(sliceBefore, append([]seqBufEntry{e}, sliceAfter...)...) + + s.notifyWatcher() } func (s *seqBuf) getDiff(seq1, seq2 seqNum) seqNum { @@ -107,6 +131,9 @@ func (s *seqBuf) leftOrRightCloserToSeq(seq, whichSeq seqNum) direction { } func (s *seqBuf) add(seq seqNum, data []byte) error { + s.mutex.Lock() + defer s.mutex.Unlock() + // log.Debug("inserting ", seq) // defer func() { // log.Print(s.String()) @@ -153,23 +180,89 @@ func (s *seqBuf) add(seq seqNum, data []byte) error { return nil } -func (s *seqBuf) get() (data []byte, err error) { +func (s *seqBuf) getNextDataAvailableRemainingTime() (time.Duration, error) { + s.mutex.RLock() + defer s.mutex.RUnlock() + if len(s.entries) == 0 { - return nil, errors.New("seqbuf is empty") + return 0, errors.New("seqbuf is empty") + } + lastEntryIdx := len(s.entries) - 1 + inBufDuration := time.Since(s.entries[lastEntryIdx].addedAt) + if inBufDuration >= s.length { + return 0, nil + } + return s.length - inBufDuration, nil +} + +func (s *seqBuf) get() (e seqBufEntry, err error) { + s.mutex.Lock() + defer s.mutex.Unlock() + + if len(s.entries) == 0 { + return e, errors.New("seqbuf is empty") } lastEntryIdx := len(s.entries) - 1 if time.Since(s.entries[lastEntryIdx].addedAt) < s.length { - return nil, errors.New("no available entries") + return e, errors.New("no available entries") } - data = make([]byte, len(s.entries[lastEntryIdx].data)) - copy(data, s.entries[lastEntryIdx].data) + e = s.entries[lastEntryIdx] s.entries = s.entries[:lastEntryIdx] - return data, nil + return e, nil +} + +func (s *seqBuf) watcher() { + defer func() { + s.watcherCloseDoneChan <- true + }() + + entryAvailableTimer := time.NewTimer(s.length) + entryAvailableTimer.Stop() + var entryAvailableTimerRunning bool + + for { + t, err := s.getNextDataAvailableRemainingTime() + if err == nil { + if t == 0 { // Do we have an entry available right now? + e, err := s.get() + if err == nil { + if s.entryChan != nil { + s.entryChan <- e + } + } else { + log.Error(err) + } + } else if !entryAvailableTimerRunning { + // An entry will be available later, waiting for it. + entryAvailableTimer.Reset(t) + entryAvailableTimerRunning = true + } + } + + select { + case <-s.watcherCloseNeededChan: + return + case <-s.entryAddedChan: + case <-entryAvailableTimer.C: + entryAvailableTimerRunning = false + } + } } // Setting a max. seqnum diff is optional. If it's 0 then the diff will be half of the maxSeqNum range. -func (s *seqBuf) init(length time.Duration, maxSeqNum, maxSeqNumDiff seqNum) { +func (s *seqBuf) init(length time.Duration, maxSeqNum, maxSeqNumDiff seqNum, entryChan chan seqBufEntry) { s.length = length s.maxSeqNum = maxSeqNum s.maxSeqNumDiff = maxSeqNumDiff + s.entryChan = entryChan + + s.entryAddedChan = make(chan bool) + s.watcherCloseNeededChan = make(chan bool) + s.watcherCloseDoneChan = make(chan bool) + go s.watcher() } + +// func (s *seqBuf) deinit() { +// s.watcherCloseNeededChan <- true +// <-s.watcherCloseDoneChan +// }