Send disconnect on startup if stream is still active

This commit is contained in:
Nonoo 2020-10-18 18:34:22 +02:00
parent 8b2f47d13e
commit 5037ce56f2
5 changed files with 76 additions and 37 deletions

View file

@ -50,15 +50,17 @@ func (s *audioStream) handleRead(r []byte) {
}
}
func (s *audioStream) start() {
func (s *audioStream) init() {
s.common.open("audio", 50003)
}
func (s *audioStream) start() {
s.common.sendPkt3()
s.common.waitForPkt4Answer()
s.common.sendPkt6()
s.common.waitForPkt6Answer()
log.Print("stream opened")
log.Print("stream started")
s.timeoutTimer = time.NewTimer(audioTimeoutDuration)

View file

@ -198,8 +198,12 @@ func (s *controlStream) handleRead(r []byte) {
}
}
func (s *controlStream) start() {
func (s *controlStream) init() {
s.common.open("control", 50001)
}
func (s *controlStream) start() {
startTime := time.Now()
s.common.sendPkt3()
s.common.pkt7.sendSeq = 1
@ -253,7 +257,7 @@ func (s *controlStream) start() {
case <-reauthTicker.C:
s.sendPktReauth(false)
case <-statusLogTicker.C:
log.Print("roundtrip latency ", s.common.pkt7.latency)
log.Print("running for ", time.Since(startTime), " roundtrip latency ", s.common.pkt7.latency)
}
}
}

View file

@ -48,5 +48,8 @@ func main() {
parseArgs()
setupCloseHandler()
streams.audio.init()
streams.control.init()
streams.control.start()
}

View file

@ -26,13 +26,19 @@ func (p *pkt7Type) isPkt7(r []byte) bool {
return len(r) == 21 && bytes.Equal(r[1:6], []byte{0x00, 0x00, 0x00, 0x07, 0x00}) // Note that the first byte can be 0x15 or 0x00, so we ignore that.
}
func (p *pkt7Type) tryReceive(timeout time.Duration, s *streamCommon) []byte {
return s.tryReceivePacket(timeout, 21, 1, []byte{0x00, 0x00, 0x00, 0x07, 0x00})
}
func (p *pkt7Type) handle(s *streamCommon, r []byte) {
gotSeq := binary.LittleEndian.Uint16(r[6:8])
if r[16] == 0x00 { // This is a pkt7 request from the radio.
// Replying to the radio.
// Example request from radio: 0x00, 0x00, 0x00, 0x00, 0x07, 0x00, 0x1c, 0x0e, 0xe4, 0x35, 0xdd, 0x72, 0xbe, 0xd9, 0xf2, 0x63, 0x00, 0x57, 0x2b, 0x12, 0x00
// Example answer from PC: 0x15, 0x00, 0x00, 0x00, 0x07, 0x00, 0x1c, 0x0e, 0xbe, 0xd9, 0xf2, 0x63, 0xe4, 0x35, 0xdd, 0x72, 0x01, 0x57, 0x2b, 0x12, 0x00
p.sendReply(s, r[17:21], gotSeq)
if p.timeoutTimer != nil { // Only replying if the auth is already done.
p.sendReply(s, r[17:21], gotSeq)
}
} else { // This is a pkt7 reply to our request.
if p.timeoutTimer != nil {
p.timeoutTimer.Stop()

View file

@ -11,12 +11,15 @@ import (
"github.com/nonoo/kappanhang/log"
)
const expectTimeoutDuration = time.Second
type streamCommon struct {
name string
conn *net.UDPConn
localSID uint32
remoteSID uint32
readChan chan []byte
name string
conn *net.UDPConn
localSID uint32
remoteSID uint32
gotRemoteSID bool
readChan chan []byte
pkt7 pkt7Type
}
@ -28,56 +31,63 @@ func (s *streamCommon) send(d []byte) {
}
}
func (s *streamCommon) read() ([]byte, error) {
err := s.conn.SetReadDeadline(time.Now().Add(time.Second))
if err != nil {
log.Fatal(err)
}
func (s *streamCommon) read() []byte {
b := make([]byte, 1500)
n, _, err := s.conn.ReadFromUDP(b)
if err != nil {
log.Fatal(err)
// Ignoring timeout errors.
if err, ok := err.(net.Error); ok && !err.Timeout() {
log.Fatal(err)
}
}
return b[:n], err
return b[:n]
}
func (s *streamCommon) reader() {
var errCount int
for {
r, err := s.read()
if err == nil {
if s.pkt7.isPkt7(r) {
s.pkt7.handle(s, r)
}
s.readChan <- r
} else {
errCount++
if errCount > 5 {
log.Fatal(s.name + "/timeout")
}
log.Error(s.name + "/stream break detected")
r := s.read()
if s.pkt7.isPkt7(r) {
s.pkt7.handle(s, r)
}
errCount = 0
s.readChan <- r
}
}
func (s *streamCommon) expect(packetLength int, b []byte) []byte {
func (s *streamCommon) tryReceivePacket(timeout time.Duration, packetLength, matchStartByte int, b []byte) []byte {
var r []byte
expectStart := time.Now()
for {
err := s.conn.SetReadDeadline(time.Now().Add(timeout - time.Since(expectStart)))
if err != nil {
log.Fatal(err)
}
r = <-s.readChan
if len(r) == packetLength && bytes.Equal(r[:len(b)], b) {
err = s.conn.SetReadDeadline(time.Time{})
if err != nil {
log.Fatal(err)
}
if len(r) == packetLength && bytes.Equal(r[matchStartByte:len(b)+matchStartByte], b) {
break
}
if time.Since(expectStart) > time.Second {
log.Fatal(s.name + "/expect timeout")
if time.Since(expectStart) > timeout {
return nil
}
}
return r
}
func (s *streamCommon) expect(packetLength int, b []byte) []byte {
r := s.tryReceivePacket(expectTimeoutDuration, packetLength, 0, b)
if r == nil {
log.Fatal(s.name + "/expect timeout")
}
return r
}
func (s *streamCommon) open(name string, portNumber int) {
s.name = name
hostPort := fmt.Sprint(connectAddress, ":", portNumber)
@ -108,6 +118,15 @@ func (s *streamCommon) open(name string, portNumber int) {
s.readChan = make(chan []byte)
go s.reader()
if r := s.pkt7.tryReceive(300*time.Millisecond, s); s.pkt7.isPkt7(r) {
s.remoteSID = binary.BigEndian.Uint32(r[8:12])
s.gotRemoteSID = true
log.Print(s.name + "/closing running stream")
s.sendDisconnect()
time.Sleep(time.Second)
s.gotRemoteSID = false
}
}
func (s *streamCommon) sendPkt3() {
@ -121,6 +140,7 @@ func (s *streamCommon) waitForPkt4Answer() {
// Example answer from radio: 0x10, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x8c, 0x7d, 0x45, 0x7a, 0x1d, 0xf6, 0xe9, 0x0b
r := s.expect(16, []byte{0x10, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00})
s.remoteSID = binary.BigEndian.Uint32(r[8:12])
s.gotRemoteSID = true
log.Debugf(s.name+"/got remote session id %.8x", s.remoteSID)
}
@ -138,6 +158,10 @@ func (s *streamCommon) waitForPkt6Answer() {
}
func (s *streamCommon) sendDisconnect() {
if !s.gotRemoteSID {
return
}
s.send([]byte{0x10, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00,
byte(s.localSID >> 24), byte(s.localSID >> 16), byte(s.localSID >> 8), byte(s.localSID),
byte(s.remoteSID >> 24), byte(s.remoteSID >> 16), byte(s.remoteSID >> 8), byte(s.remoteSID)})