From 5037ce56f279b5e14b7b3da38b66b8f2cb19ca4e Mon Sep 17 00:00:00 2001 From: Nonoo Date: Sun, 18 Oct 2020 18:34:22 +0200 Subject: [PATCH] Send disconnect on startup if stream is still active --- audiostream.go | 6 ++-- controlstream.go | 8 +++-- main.go | 3 ++ pkt7.go | 8 ++++- streamcommon.go | 88 ++++++++++++++++++++++++++++++------------------ 5 files changed, 76 insertions(+), 37 deletions(-) diff --git a/audiostream.go b/audiostream.go index 0e9894c..df641b0 100644 --- a/audiostream.go +++ b/audiostream.go @@ -50,15 +50,17 @@ func (s *audioStream) handleRead(r []byte) { } } -func (s *audioStream) start() { +func (s *audioStream) init() { s.common.open("audio", 50003) +} +func (s *audioStream) start() { s.common.sendPkt3() s.common.waitForPkt4Answer() s.common.sendPkt6() s.common.waitForPkt6Answer() - log.Print("stream opened") + log.Print("stream started") s.timeoutTimer = time.NewTimer(audioTimeoutDuration) diff --git a/controlstream.go b/controlstream.go index dcb1fc9..ef259ee 100644 --- a/controlstream.go +++ b/controlstream.go @@ -198,8 +198,12 @@ func (s *controlStream) handleRead(r []byte) { } } -func (s *controlStream) start() { +func (s *controlStream) init() { s.common.open("control", 50001) +} + +func (s *controlStream) start() { + startTime := time.Now() s.common.sendPkt3() s.common.pkt7.sendSeq = 1 @@ -253,7 +257,7 @@ func (s *controlStream) start() { case <-reauthTicker.C: s.sendPktReauth(false) case <-statusLogTicker.C: - log.Print("roundtrip latency ", s.common.pkt7.latency) + log.Print("running for ", time.Since(startTime), " roundtrip latency ", s.common.pkt7.latency) } } } diff --git a/main.go b/main.go index b71fc9b..a80f973 100644 --- a/main.go +++ b/main.go @@ -48,5 +48,8 @@ func main() { parseArgs() setupCloseHandler() + streams.audio.init() + streams.control.init() + streams.control.start() } diff --git a/pkt7.go b/pkt7.go index 078850f..2637ac8 100644 --- a/pkt7.go +++ b/pkt7.go @@ -26,13 +26,19 @@ func (p *pkt7Type) isPkt7(r []byte) bool { return len(r) == 21 && bytes.Equal(r[1:6], []byte{0x00, 0x00, 0x00, 0x07, 0x00}) // Note that the first byte can be 0x15 or 0x00, so we ignore that. } +func (p *pkt7Type) tryReceive(timeout time.Duration, s *streamCommon) []byte { + return s.tryReceivePacket(timeout, 21, 1, []byte{0x00, 0x00, 0x00, 0x07, 0x00}) +} + func (p *pkt7Type) handle(s *streamCommon, r []byte) { gotSeq := binary.LittleEndian.Uint16(r[6:8]) if r[16] == 0x00 { // This is a pkt7 request from the radio. // Replying to the radio. // Example request from radio: 0x00, 0x00, 0x00, 0x00, 0x07, 0x00, 0x1c, 0x0e, 0xe4, 0x35, 0xdd, 0x72, 0xbe, 0xd9, 0xf2, 0x63, 0x00, 0x57, 0x2b, 0x12, 0x00 // Example answer from PC: 0x15, 0x00, 0x00, 0x00, 0x07, 0x00, 0x1c, 0x0e, 0xbe, 0xd9, 0xf2, 0x63, 0xe4, 0x35, 0xdd, 0x72, 0x01, 0x57, 0x2b, 0x12, 0x00 - p.sendReply(s, r[17:21], gotSeq) + if p.timeoutTimer != nil { // Only replying if the auth is already done. + p.sendReply(s, r[17:21], gotSeq) + } } else { // This is a pkt7 reply to our request. if p.timeoutTimer != nil { p.timeoutTimer.Stop() diff --git a/streamcommon.go b/streamcommon.go index 220546c..b3f7fcc 100644 --- a/streamcommon.go +++ b/streamcommon.go @@ -11,12 +11,15 @@ import ( "github.com/nonoo/kappanhang/log" ) +const expectTimeoutDuration = time.Second + type streamCommon struct { - name string - conn *net.UDPConn - localSID uint32 - remoteSID uint32 - readChan chan []byte + name string + conn *net.UDPConn + localSID uint32 + remoteSID uint32 + gotRemoteSID bool + readChan chan []byte pkt7 pkt7Type } @@ -28,56 +31,63 @@ func (s *streamCommon) send(d []byte) { } } -func (s *streamCommon) read() ([]byte, error) { - err := s.conn.SetReadDeadline(time.Now().Add(time.Second)) - if err != nil { - log.Fatal(err) - } - +func (s *streamCommon) read() []byte { b := make([]byte, 1500) n, _, err := s.conn.ReadFromUDP(b) if err != nil { - log.Fatal(err) + // Ignoring timeout errors. + if err, ok := err.(net.Error); ok && !err.Timeout() { + log.Fatal(err) + } } - return b[:n], err + return b[:n] } func (s *streamCommon) reader() { - var errCount int for { - r, err := s.read() - if err == nil { - if s.pkt7.isPkt7(r) { - s.pkt7.handle(s, r) - } - - s.readChan <- r - } else { - errCount++ - if errCount > 5 { - log.Fatal(s.name + "/timeout") - } - log.Error(s.name + "/stream break detected") + r := s.read() + if s.pkt7.isPkt7(r) { + s.pkt7.handle(s, r) } - errCount = 0 + + s.readChan <- r } } -func (s *streamCommon) expect(packetLength int, b []byte) []byte { +func (s *streamCommon) tryReceivePacket(timeout time.Duration, packetLength, matchStartByte int, b []byte) []byte { var r []byte expectStart := time.Now() for { + err := s.conn.SetReadDeadline(time.Now().Add(timeout - time.Since(expectStart))) + if err != nil { + log.Fatal(err) + } + r = <-s.readChan - if len(r) == packetLength && bytes.Equal(r[:len(b)], b) { + + err = s.conn.SetReadDeadline(time.Time{}) + if err != nil { + log.Fatal(err) + } + + if len(r) == packetLength && bytes.Equal(r[matchStartByte:len(b)+matchStartByte], b) { break } - if time.Since(expectStart) > time.Second { - log.Fatal(s.name + "/expect timeout") + if time.Since(expectStart) > timeout { + return nil } } return r } +func (s *streamCommon) expect(packetLength int, b []byte) []byte { + r := s.tryReceivePacket(expectTimeoutDuration, packetLength, 0, b) + if r == nil { + log.Fatal(s.name + "/expect timeout") + } + return r +} + func (s *streamCommon) open(name string, portNumber int) { s.name = name hostPort := fmt.Sprint(connectAddress, ":", portNumber) @@ -108,6 +118,15 @@ func (s *streamCommon) open(name string, portNumber int) { s.readChan = make(chan []byte) go s.reader() + + if r := s.pkt7.tryReceive(300*time.Millisecond, s); s.pkt7.isPkt7(r) { + s.remoteSID = binary.BigEndian.Uint32(r[8:12]) + s.gotRemoteSID = true + log.Print(s.name + "/closing running stream") + s.sendDisconnect() + time.Sleep(time.Second) + s.gotRemoteSID = false + } } func (s *streamCommon) sendPkt3() { @@ -121,6 +140,7 @@ func (s *streamCommon) waitForPkt4Answer() { // Example answer from radio: 0x10, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x8c, 0x7d, 0x45, 0x7a, 0x1d, 0xf6, 0xe9, 0x0b r := s.expect(16, []byte{0x10, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00}) s.remoteSID = binary.BigEndian.Uint32(r[8:12]) + s.gotRemoteSID = true log.Debugf(s.name+"/got remote session id %.8x", s.remoteSID) } @@ -138,6 +158,10 @@ func (s *streamCommon) waitForPkt6Answer() { } func (s *streamCommon) sendDisconnect() { + if !s.gotRemoteSID { + return + } + s.send([]byte{0x10, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, byte(s.localSID >> 24), byte(s.localSID >> 16), byte(s.localSID >> 8), byte(s.localSID), byte(s.remoteSID >> 24), byte(s.remoteSID >> 16), byte(s.remoteSID >> 8), byte(s.remoteSID)})