diff --git a/audio-linux.go b/audio-linux.go index 5af7d3a..a48c3d1 100644 --- a/audio-linux.go +++ b/audio-linux.go @@ -14,6 +14,9 @@ type audioStruct struct { source papipes.Source sink papipes.Sink + deinitNeededChan chan bool + deinitFinishedChan chan bool + // Send to this channel to play audio. play chan []byte // Read from this channel for audio. @@ -24,11 +27,14 @@ type audioStruct struct { canPlay chan bool } -var audio audioStruct - -func (a *audioStruct) playLoop() { +func (a *audioStruct) playLoop(deinitNeededChan, deinitFinishedChan chan bool) { for { - <-a.canPlay + select { + case <-a.canPlay: + case <-deinitNeededChan: + deinitFinishedChan <- true + return + } for { a.mutex.Lock() @@ -53,9 +59,8 @@ func (a *audioStruct) playLoop() { written, err := a.source.Write(d) if err != nil { if _, ok := err.(*os.PathError); !ok { - exit(err) + reportError(err) } - return } bytesToWrite -= written if bytesToWrite == 0 { @@ -67,17 +72,26 @@ func (a *audioStruct) playLoop() { } } -func (a *audioStruct) recLoop() { +func (a *audioStruct) recLoop(deinitNeededChan, deinitFinishedChan chan bool) { + defer func() { + deinitFinishedChan <- true + }() + frameBuf := make([]byte, 1920) buf := bytes.NewBuffer([]byte{}) for { + select { + case <-deinitNeededChan: + return + default: + } + n, err := a.sink.Read(frameBuf) if err != nil { if _, ok := err.(*os.PathError); !ok { - exit(err) + reportError(err) } - return } buf.Write(frameBuf[:n]) @@ -85,26 +99,48 @@ func (a *audioStruct) recLoop() { for buf.Len() >= len(frameBuf) { n, err = buf.Read(frameBuf) if err != nil { - exit(err) + reportError(err) } if n != len(frameBuf) { - exit(errors.New("audio buffer read error")) + reportError(errors.New("audio buffer read error")) + } + + select { + case a.rec <- frameBuf: + case <-deinitNeededChan: + return } - a.rec <- frameBuf } } } func (a *audioStruct) loop() { - go a.playLoop() - go a.recLoop() + playLoopDeinitNeededChan := make(chan bool) + playLoopDeinitFinishedChan := make(chan bool) + go a.playLoop(playLoopDeinitNeededChan, playLoopDeinitFinishedChan) + recLoopDeinitNeededChan := make(chan bool) + recLoopDeinitFinishedChan := make(chan bool) + go a.recLoop(recLoopDeinitNeededChan, recLoopDeinitFinishedChan) + var d []byte for { - d := <-a.play + select { + case d = <-a.play: + case <-a.deinitNeededChan: + recLoopDeinitNeededChan <- true + <-recLoopDeinitFinishedChan + playLoopDeinitNeededChan <- true + <-playLoopDeinitFinishedChan + + a.deinitFinishedChan <- true + return + } + a.mutex.Lock() a.playBuf.Write(d) a.mutex.Unlock() + // Non-blocking notify. select { case a.canPlay <- true: default: @@ -112,17 +148,17 @@ func (a *audioStruct) loop() { } } -func (a *audioStruct) init(devName string) { - a.source.Name = "kappanhang" - a.source.Filename = "/tmp/kappanhang.source" +func (a *audioStruct) init(devName string) error { + a.source.Name = "kappanhang-" + devName + a.source.Filename = "/tmp/kappanhang-" + devName + ".source" a.source.Rate = 48000 a.source.Format = "s16le" a.source.Channels = 1 a.source.SetProperty("device.buffering.buffer_size", (48000*16)/10) // 100 ms a.source.SetProperty("device.description", "kappanhang: "+devName) - a.sink.Name = "kappanhang" - a.sink.Filename = "/tmp/kappanhang.sink" + a.sink.Name = "kappanhang-" + devName + a.sink.Filename = "/tmp/kappanhang-" + devName + ".sink" a.sink.Rate = 48000 a.sink.Format = "s16le" a.sink.Channels = 1 @@ -130,18 +166,21 @@ func (a *audioStruct) init(devName string) { a.sink.SetProperty("device.description", "kappanhang: "+devName) if err := a.source.Open(); err != nil { - exit(err) + return err } if err := a.sink.Open(); err != nil { - exit(err) + return err } a.playBuf = bytes.NewBuffer([]byte{}) a.play = make(chan []byte) a.canPlay = make(chan bool) a.rec = make(chan []byte) + a.deinitNeededChan = make(chan bool) + a.deinitFinishedChan = make(chan bool) go a.loop() + return nil } func (a *audioStruct) deinit() { @@ -160,4 +199,9 @@ func (a *audioStruct) deinit() { } } } + + if a.deinitNeededChan != nil { + a.deinitNeededChan <- true + <-a.deinitFinishedChan + } } diff --git a/audiostream.go b/audiostream.go index 54631d9..91bbac8 100644 --- a/audiostream.go +++ b/audiostream.go @@ -15,6 +15,11 @@ const rxSeqBufLength = 100 * time.Millisecond type audioStream struct { common streamCommon + audio audioStruct + + deinitNeededChan chan bool + deinitFinishedChan chan bool + timeoutTimer *time.Timer receivedAudio bool lastReceivedAudioSeq uint16 @@ -24,41 +29,50 @@ type audioStream struct { audioSendSeq uint16 } -func (s *audioStream) sendDisconnect() { - s.common.sendDisconnect() -} - // sendPart1 expects 1364 bytes of PCM data. -func (s *audioStream) sendPart1(pcmData []byte) { - s.common.send(append([]byte{0x6c, 0x05, 0x00, 0x00, 0x00, 0x00, byte(s.audioSendSeq), byte(s.audioSendSeq >> 8), +func (s *audioStream) sendPart1(pcmData []byte) error { + err := s.common.send(append([]byte{0x6c, 0x05, 0x00, 0x00, 0x00, 0x00, byte(s.audioSendSeq), byte(s.audioSendSeq >> 8), byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID), byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID), 0x80, 0x00, byte((s.audioSendSeq - 1) >> 8), byte(s.audioSendSeq - 1), 0x00, 0x00, byte(len(pcmData) >> 8), byte(len(pcmData))}, pcmData...)) + if err != nil { + return err + } s.audioSendSeq++ + return nil } // sendPart2 expects 556 bytes of PCM data. -func (s *audioStream) sendPart2(pcmData []byte) { - s.common.send(append([]byte{0x44, 0x02, 0x00, 0x00, 0x00, 0x00, byte(s.audioSendSeq), byte(s.audioSendSeq >> 8), +func (s *audioStream) sendPart2(pcmData []byte) error { + err := s.common.send(append([]byte{0x44, 0x02, 0x00, 0x00, 0x00, 0x00, byte(s.audioSendSeq), byte(s.audioSendSeq >> 8), byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID), byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID), 0x80, 0x00, byte((s.audioSendSeq - 1) >> 8), byte(s.audioSendSeq - 1), 0x00, 0x00, byte(len(pcmData) >> 8), byte(len(pcmData))}, pcmData...)) + if err != nil { + return err + } s.audioSendSeq++ + return nil } -func (s *audioStream) sendRetransmitRequest(seqNum uint16) { +func (s *audioStream) sendRetransmitRequest(seqNum uint16) error { p := []byte{0x10, 0x00, 0x00, 0x00, 0x01, 0x00, byte(seqNum), byte(seqNum >> 8), byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID), byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID)} - s.common.send(p) - s.common.send(p) + if err := s.common.send(p); err != nil { + return err + } + if err := s.common.send(p); err != nil { + return err + } + return nil } type seqNumRange [2]uint16 -func (s *audioStream) sendRetransmitRequestForRanges(seqNumRanges []seqNumRange) { +func (s *audioStream) sendRetransmitRequestForRanges(seqNumRanges []seqNumRange) error { seqNumBytes := make([]byte, len(seqNumRanges)*4) for i := 0; i < len(seqNumRanges); i++ { seqNumBytes[i*2] = byte(seqNumRanges[i][0]) @@ -70,8 +84,13 @@ func (s *audioStream) sendRetransmitRequestForRanges(seqNumRanges []seqNumRange) byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID), byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID)}, seqNumBytes...) - s.common.send(p) - s.common.send(p) + if err := s.common.send(p); err != nil { + return err + } + if err := s.common.send(p); err != nil { + return err + } + return nil } func (s *audioStream) handleRxSeqBufEntry(e seqBufEntry) { @@ -91,7 +110,7 @@ func (s *audioStream) handleRxSeqBufEntry(e seqBufEntry) { s.lastReceivedAudioSeq = gotSeq s.receivedAudio = true - audio.play <- e.data + s.audio.play <- e.data } func (s *audioStream) handleAudioPacket(r []byte) { @@ -113,19 +132,46 @@ func (s *audioStream) handleRead(r []byte) { } } -func (s *audioStream) init() { - s.common.open("audio", 50003) - s.rxSeqBufEntryChan = make(chan seqBufEntry) - s.rxSeqBuf.init(rxSeqBufLength, 0xffff, 0, s.rxSeqBufEntryChan) +func (s *audioStream) loop() { + for { + select { + case r := <-s.common.readChan: + s.handleRead(r) + case <-s.timeoutTimer.C: + reportError(errors.New("audio stream timeout")) + case e := <-s.rxSeqBufEntryChan: + s.handleRxSeqBufEntry(e) + case d := <-s.audio.rec: + if err := s.sendPart1(d[:1364]); err != nil { + reportError(err) + } + if err := s.sendPart2(d[1364:1920]); err != nil { + reportError(err) + } + case <-s.deinitNeededChan: + s.deinitFinishedChan <- true + return + } + } } -func (s *audioStream) start(devName string) { - s.common.sendPkt3() - s.common.waitForPkt4Answer() - s.common.sendPkt6() - s.common.waitForPkt6Answer() +func (s *audioStream) start(devName string) error { + if err := s.audio.init(devName); err != nil { + return err + } - audio.init(devName) + if err := s.common.sendPkt3(); err != nil { + return err + } + if err := s.common.waitForPkt4Answer(); err != nil { + return err + } + if err := s.common.sendPkt6(); err != nil { + return err + } + if err := s.common.waitForPkt6Answer(); err != nil { + return err + } log.Print("stream started") @@ -135,17 +181,30 @@ func (s *audioStream) start(devName string) { s.audioSendSeq = 1 - for { - select { - case r := <-s.common.readChan: - s.handleRead(r) - case <-s.timeoutTimer.C: - exit(errors.New("audio stream timeout")) - case e := <-s.rxSeqBufEntryChan: - s.handleRxSeqBufEntry(e) - case d := <-audio.rec: - s.sendPart1(d[:1364]) - s.sendPart2(d[1364:1920]) - } - } + s.deinitNeededChan = make(chan bool) + s.deinitFinishedChan = make(chan bool) + go s.loop() + return nil +} + +func (s *audioStream) init() error { + if err := s.common.init("audio", 50003); err != nil { + return err + } + s.rxSeqBufEntryChan = make(chan seqBufEntry) + s.rxSeqBuf.init(rxSeqBufLength, 0xffff, 0, s.rxSeqBufEntryChan) + return nil +} + +func (s *audioStream) deinit() { + if s.deinitNeededChan != nil { + s.deinitNeededChan <- true + <-s.deinitFinishedChan + } + if s.timeoutTimer != nil { + s.timeoutTimer.Stop() + } + s.common.deinit() + s.rxSeqBuf.deinit() + s.audio.deinit() } diff --git a/controlstream.go b/controlstream.go index 1d6cd6a..f85b659 100644 --- a/controlstream.go +++ b/controlstream.go @@ -12,21 +12,27 @@ import ( ) type controlStream struct { - common streamCommon + common streamCommon + serial serialStream + audio audioStream + + deinitNeededChan chan bool + deinitFinishedChan chan bool + authSendSeq uint16 authInnerSendSeq uint16 authID [6]byte - serialAndAudioStreamOpened bool + serialAndAudioStreamOpened bool + requestSerialAndAudioTimeout *time.Timer } -func (s *controlStream) sendPktAuth() { +func (s *controlStream) sendPktAuth() error { // The reply to the auth packet will contain a 6 bytes long auth ID with the first 2 bytes set to our randID. var randID [2]byte - _, err := rand.Read(randID[:]) - if err != nil { - exit(err) + if _, err := rand.Read(randID[:]); err != nil { + return err } p := []byte{0x80, 0x00, 0x00, 0x00, 0x00, 0x00, byte(s.authSendSeq), byte(s.authSendSeq >> 8), byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID), @@ -45,14 +51,19 @@ func (s *controlStream) sendPktAuth() { 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} - s.common.send(p) - s.common.send(p) + if err := s.common.send(p); err != nil { + return err + } + if err := s.common.send(p); err != nil { + return err + } s.authSendSeq++ s.authInnerSendSeq++ + return nil } -func (s *controlStream) sendPktReauth(firstReauthSend bool) { +func (s *controlStream) sendPktReauth(firstReauthSend bool) error { var magic byte if firstReauthSend { @@ -86,11 +97,15 @@ func (s *controlStream) sendPktReauth(firstReauthSend bool) { 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} - s.common.send(p) - s.common.send(p) - + if err := s.common.send(p); err != nil { + return err + } + if err := s.common.send(p); err != nil { + return err + } s.authSendSeq++ s.authInnerSendSeq++ + return nil } // func (s *controlStream) sendDisconnect() { TODO @@ -109,17 +124,21 @@ func (s *controlStream) sendPktReauth(firstReauthSend bool) { // s.common.sendDisconnect() // } -func (s *controlStream) sendPkt0() { +func (s *controlStream) sendPkt0() error { p := []byte{0x10, 0x00, 0x00, 0x00, 0x00, 0x00, byte(s.authSendSeq), byte(s.authSendSeq >> 8), byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID), byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID)} - s.common.send(p) - s.common.send(p) - + if err := s.common.send(p); err != nil { + return err + } + if err := s.common.send(p); err != nil { + return err + } s.authSendSeq++ + return nil } -func (s *controlStream) sendRequestSerialAndAudio() { +func (s *controlStream) sendRequestSerialAndAudio() error { log.Print("requesting serial and audio stream") p := []byte{0x90, 0x00, 0x00, 0x00, 0x00, 0x00, byte(s.authSendSeq), byte(s.authSendSeq >> 8), byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID), @@ -140,15 +159,20 @@ func (s *controlStream) sendRequestSerialAndAudio() { 0x00, 0x00, 0xbb, 0x80, 0x00, 0x00, 0xc3, 0x52, 0x00, 0x00, 0xc3, 0x53, 0x00, 0x00, 0x00, 0xa0, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} - s.common.send(p) - s.common.send(p) + if err := s.common.send(p); err != nil { + return err + } + if err := s.common.send(p); err != nil { + return err + } s.authSendSeq++ s.authInnerSendSeq++ s.requestSerialAndAudioTimeout = time.AfterFunc(3*time.Second, func() { - exit(errors.New("serial and audio request timeout")) + reportError(errors.New("serial and audio request timeout")) }) + return nil } func (s *controlStream) parseNullTerminatedString(d []byte) (res string) { @@ -159,7 +183,7 @@ func (s *controlStream) parseNullTerminatedString(d []byte) (res string) { return } -func (s *controlStream) handleRead(r []byte) { +func (s *controlStream) handleRead(r []byte) error { switch len(r) { case 16: if bytes.Equal(r[:6], []byte{0x10, 0x00, 0x00, 0x00, 0x00, 0x00}) { @@ -170,8 +194,12 @@ func (s *controlStream) handleRead(r []byte) { p := []byte{0x10, 0x00, 0x00, 0x00, 0x00, 0x00, byte(gotSeq), byte(gotSeq >> 8), byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID), byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID)} - s.common.send(p) - s.common.send(p) + if err := s.common.send(p); err != nil { + return err + } + if err := s.common.send(p); err != nil { + return err + } } case 80: if bytes.Equal(r[:6], []byte{0x50, 0x00, 0x00, 0x00, 0x00, 0x00}) && bytes.Equal(r[48:51], []byte{0xff, 0xff, 0xff}) { @@ -186,7 +214,7 @@ func (s *controlStream) handleRead(r []byte) { // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 - exit(errors.New("reauth failed, try again after about 1 minute")) + return errors.New("reauth failed") } case 144: if !s.serialAndAudioStreamOpened && bytes.Equal(r[:6], []byte{0x90, 0x00, 0x00, 0x00, 0x00, 0x00}) && r[96] == 1 { @@ -215,31 +243,82 @@ func (s *controlStream) handleRead(r []byte) { s.requestSerialAndAudioTimeout.Stop() s.requestSerialAndAudioTimeout = nil } - go streams.serial.start() - go streams.audio.start(devName) + + if err := s.serial.start(devName); err != nil { + return err + } + + if err := s.audio.start(devName); err != nil { + return err + } + s.serialAndAudioStreamOpened = true } } + return nil +} + +func (s *controlStream) loop() { + startTime := time.Now() + pkt0SendTicker := time.NewTicker(100 * time.Millisecond) + reauthTicker := time.NewTicker(60 * time.Second) + statusLogTicker := time.NewTicker(3 * time.Second) + + for { + select { + case r := <-s.common.readChan: + if err := s.handleRead(r); err != nil { + reportError(err) + } + case <-pkt0SendTicker.C: + if err := s.sendPkt0(); err != nil { + reportError(err) + } + case <-reauthTicker.C: + if err := s.sendPktReauth(false); err != nil { + reportError(err) + } + case <-statusLogTicker.C: + if s.serialAndAudioStreamOpened { + log.Print("running for ", time.Since(startTime), " roundtrip latency ", s.common.pkt7.latency) + } + case <-s.deinitNeededChan: + s.deinitFinishedChan <- true + return + } + } } -func (s *controlStream) init() { - s.common.open("control", 50001) -} +func (s *controlStream) start() error { + if err := s.common.init("control", 50001); err != nil { + return err + } -func (s *controlStream) start() { - startTime := time.Now() - - s.common.sendPkt3() + if err := s.common.sendPkt3(); err != nil { + return err + } s.common.pkt7.sendSeq = 1 - s.common.pkt7.send(&s.common) - s.common.sendPkt3() - s.common.waitForPkt4Answer() - s.common.sendPkt6() - s.common.waitForPkt6Answer() + if err := s.common.pkt7.send(&s.common); err != nil { + return err + } + if err := s.common.sendPkt3(); err != nil { + return err + } + if err := s.common.waitForPkt4Answer(); err != nil { + return err + } + if err := s.common.sendPkt6(); err != nil { + return err + } + if err := s.common.waitForPkt6Answer(); err != nil { + return err + } s.authSendSeq = 1 s.authInnerSendSeq = 1 - s.sendPktAuth() + if err := s.sendPktAuth(); err != nil { + return err + } log.Debug("expecting auth answer") // Example success auth packet: 0x60, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, @@ -254,44 +333,65 @@ func (s *controlStream) start() { // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 - r := s.common.expect(96, []byte{0x60, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00}) + r, err := s.common.expect(96, []byte{0x60, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00}) + if err != nil { + return err + } if bytes.Equal(r[48:52], []byte{0xff, 0xff, 0xff, 0xfe}) { - exit(errors.New("invalid user/password")) + return errors.New("invalid user/password") } copy(s.authID[:], r[26:32]) log.Print("auth ok, waiting a bit") - time.AfterFunc(1*time.Second, func() { + time.AfterFunc(2*time.Second, func() { log.Print("sending reauth 1/2...") - s.sendPktReauth(true) - time.AfterFunc(1*time.Second, func() { + if err := s.sendPktReauth(true); err != nil { + reportError(err) + } + time.AfterFunc(2*time.Second, func() { log.Print("sending reauth 2/2...") - s.sendPktReauth(false) - time.AfterFunc(time.Second, func() { - s.sendRequestSerialAndAudio() + if err := s.sendPktReauth(false); err != nil { + reportError(err) + } + time.AfterFunc(2*time.Second, func() { + if err := s.sendRequestSerialAndAudio(); err != nil { + reportError(err) + } }) }) }) s.common.pkt7.startPeriodicSend(&s.common, 5, false) - pkt0SendTicker := time.NewTicker(100 * time.Millisecond) - reauthTicker := time.NewTicker(60 * time.Second) - statusLogTicker := time.NewTicker(3 * time.Second) - - for { - select { - case r = <-s.common.readChan: - s.handleRead(r) - case <-pkt0SendTicker.C: - s.sendPkt0() - case <-reauthTicker.C: - s.sendPktReauth(false) - case <-statusLogTicker.C: - if s.serialAndAudioStreamOpened { - log.Print("running for ", time.Since(startTime), " roundtrip latency ", s.common.pkt7.latency) - } - } - } + s.deinitNeededChan = make(chan bool) + s.deinitFinishedChan = make(chan bool) + go s.loop() + return nil +} + +func (s *controlStream) init() error { + log.Print("init") + + if err := s.serial.init(); err != nil { + return err + } + if err := s.audio.init(); err != nil { + return err + } + return nil +} + +func (s *controlStream) deinit() { + if s.deinitNeededChan != nil { + s.deinitNeededChan <- true + <-s.deinitFinishedChan + } + if s.requestSerialAndAudioTimeout != nil { + s.requestSerialAndAudioTimeout.Stop() + s.requestSerialAndAudioTimeout = nil + } + s.audio.deinit() + s.serial.deinit() + s.common.deinit() } diff --git a/log/log.go b/log/log.go index 3caada8..9a419a9 100644 --- a/log/log.go +++ b/log/log.go @@ -13,7 +13,7 @@ import ( var logger *zap.SugaredLogger var filenameTrimChars int -func getCallerFileName(withLine bool) string { +func GetCallerFileName(withLine bool) string { _, filename, line, _ := runtime.Caller(2) extension := filepath.Ext(filename) if withLine { @@ -24,35 +24,39 @@ func getCallerFileName(withLine bool) string { } func Printf(a string, b ...interface{}) { - logger.Infof(getCallerFileName(false)+": "+a, b...) + logger.Infof(GetCallerFileName(false)+": "+a, b...) } func Print(a ...interface{}) { - logger.Info(append([]interface{}{getCallerFileName(false) + ": "}, a...)...) + logger.Info(append([]interface{}{GetCallerFileName(false) + ": "}, a...)...) } func Debugf(a string, b ...interface{}) { - logger.Debugf(getCallerFileName(true)+": "+a, b...) + logger.Debugf(GetCallerFileName(true)+": "+a, b...) } func Debug(a ...interface{}) { - logger.Debug(append([]interface{}{getCallerFileName(true) + ": "}, a...)...) + logger.Debug(append([]interface{}{GetCallerFileName(true) + ": "}, a...)...) } func Errorf(a string, b ...interface{}) { - logger.Errorf(getCallerFileName(true)+": "+a, b...) + logger.Errorf(GetCallerFileName(true)+": "+a, b...) } func Error(a ...interface{}) { - logger.Error(append([]interface{}{getCallerFileName(true) + ": "}, a...)...) + logger.Error(append([]interface{}{GetCallerFileName(true) + ": "}, a...)...) +} + +func ErrorC(a ...interface{}) { + logger.Error(a) } func Fatalf(a string, b ...interface{}) { - logger.Fatalf(getCallerFileName(true)+": "+a, b...) + logger.Fatalf(GetCallerFileName(true)+": "+a, b...) } func Fatal(a ...interface{}) { - logger.Fatal(append([]interface{}{getCallerFileName(true) + ": "}, a...)...) + logger.Fatal(append([]interface{}{GetCallerFileName(true) + ": "}, a...)...) } func Init() { diff --git a/main.go b/main.go index 06e9f12..9e453ce 100644 --- a/main.go +++ b/main.go @@ -3,43 +3,57 @@ package main import ( "os" "os/signal" + "strings" "syscall" + "time" "github.com/nonoo/kappanhang/log" ) -var streams struct { - control controlStream - serial serialStream - audio audioStream -} +var gotErrChan = make(chan bool) -func exit(err error) { - if err != nil { - log.Error(err.Error()) +func runControlStream(osSignal chan os.Signal) (shouldExit bool, exitCode int) { + // Depleting gotErrChan. + var finished bool + for !finished { + select { + case <-gotErrChan: + default: + finished = true + } } - streams.audio.common.close() - streams.serial.common.close() - streams.control.common.close() - serialPort.deinit() - audio.deinit() + c := controlStream{} + defer c.deinit() - log.Print("exiting") - if err == nil { - os.Exit(0) - } else { - os.Exit(1) + if err := c.init(); err != nil { + log.Error(err) + return true, 1 + } + if err := c.start(); err != nil { + log.Error(err) + return + } + + select { + case <-gotErrChan: + return + case <-osSignal: + log.Print("sigterm received") + return true, 0 } } -func setupCloseHandler() { - c := make(chan os.Signal) - signal.Notify(c, os.Interrupt, syscall.SIGTERM) - go func() { - <-c - exit(nil) - }() +func reportError(err error) { + if !strings.Contains(err.Error(), "use of closed network connection") { + log.ErrorC(log.GetCallerFileName(true), ": ", err) + } + + // Non-blocking notify. + select { + case gotErrChan <- true: + default: + } } func main() { @@ -47,12 +61,23 @@ func main() { log.Print("kappanhang by Norbert Varga HA2NON and Akos Marton ES1AKOS https://github.com/nonoo/kappanhang") parseArgs() - serialPort.init() - streams.audio.init() - streams.serial.init() - streams.control.init() + osSignal := make(chan os.Signal, 1) + signal.Notify(osSignal, os.Interrupt, syscall.SIGTERM) - setupCloseHandler() + var shouldExit bool + var exitCode int + for !shouldExit { + shouldExit, exitCode = runControlStream(osSignal) + if !shouldExit { + log.Print("restarting control stream...") + select { + case <-time.NewTimer(3 * time.Second).C: + case <-osSignal: + shouldExit = true + } + } + } - streams.control.start() + log.Print("exiting") + os.Exit(exitCode) } diff --git a/pkt7.go b/pkt7.go index cb87dd8..01eb994 100644 --- a/pkt7.go +++ b/pkt7.go @@ -21,24 +21,25 @@ type pkt7Type struct { timeoutTimer *time.Timer latency time.Duration lastSendAt time.Time + + periodicStopNeededChan chan bool + periodicStopFinishedChan chan bool } func (p *pkt7Type) isPkt7(r []byte) bool { return len(r) == 21 && bytes.Equal(r[1:6], []byte{0x00, 0x00, 0x00, 0x07, 0x00}) // Note that the first byte can be 0x15 or 0x00, so we ignore that. } -// func (p *pkt7Type) tryReceive(timeout time.Duration, s *streamCommon) []byte { -// return s.tryReceivePacket(timeout, 21, 1, []byte{0x00, 0x00, 0x00, 0x07, 0x00}) -// } - -func (p *pkt7Type) handle(s *streamCommon, r []byte) { +func (p *pkt7Type) handle(s *streamCommon, r []byte) error { gotSeq := binary.LittleEndian.Uint16(r[6:8]) if r[16] == 0x00 { // This is a pkt7 request from the radio. // Replying to the radio. // Example request from radio: 0x00, 0x00, 0x00, 0x00, 0x07, 0x00, 0x1c, 0x0e, 0xe4, 0x35, 0xdd, 0x72, 0xbe, 0xd9, 0xf2, 0x63, 0x00, 0x57, 0x2b, 0x12, 0x00 // Example answer from PC: 0x15, 0x00, 0x00, 0x00, 0x07, 0x00, 0x1c, 0x0e, 0xbe, 0xd9, 0xf2, 0x63, 0xe4, 0x35, 0xdd, 0x72, 0x01, 0x57, 0x2b, 0x12, 0x00 if p.sendTicker != nil { // Only replying if the auth is already done. - p.sendReply(s, r[17:21], gotSeq) + if err := p.sendReply(s, r[17:21], gotSeq); err != nil { + return err + } } } else { // This is a pkt7 reply to our request. if p.sendTicker != nil { // Auth is already done? @@ -64,18 +65,18 @@ func (p *pkt7Type) handle(s *streamCommon, r []byte) { } p.lastConfirmedSeq = gotSeq } + return nil } -func (p *pkt7Type) sendDo(s *streamCommon, replyID []byte, seq uint16) { +func (p *pkt7Type) sendDo(s *streamCommon, replyID []byte, seq uint16) error { // Example request from PC: 0x15, 0x00, 0x00, 0x00, 0x07, 0x00, 0x09, 0x00, 0xbe, 0xd9, 0xf2, 0x63, 0xe4, 0x35, 0xdd, 0x72, 0x00, 0x78, 0x40, 0xf6, 0x02 // Example reply from radio: 0x00, 0x00, 0x00, 0x00, 0x07, 0x00, 0x09, 0x00, 0xe4, 0x35, 0xdd, 0x72, 0xbe, 0xd9, 0xf2, 0x63, 0x01, 0x78, 0x40, 0xf6, 0x02 var replyFlag byte if replyID == nil { replyID = make([]byte, 4) var randID [2]byte - _, err := rand.Read(randID[:]) - if err != nil { - exit(err) + if _, err := rand.Read(randID[:]); err != nil { + return err } replyID[0] = randID[0] replyID[1] = randID[1] @@ -89,18 +90,54 @@ func (p *pkt7Type) sendDo(s *streamCommon, replyID []byte, seq uint16) { byte(s.localSID >> 24), byte(s.localSID >> 16), byte(s.localSID >> 8), byte(s.localSID), byte(s.remoteSID >> 24), byte(s.remoteSID >> 16), byte(s.remoteSID >> 8), byte(s.remoteSID), replyFlag, replyID[0], replyID[1], replyID[2], replyID[3]} - s.send(d) - s.send(d) + if err := s.send(d); err != nil { + return err + } + if err := s.send(d); err != nil { + return err + } + return nil } -func (p *pkt7Type) send(s *streamCommon) { - p.sendDo(s, nil, p.sendSeq) +func (p *pkt7Type) send(s *streamCommon) error { + if err := p.sendDo(s, nil, p.sendSeq); err != nil { + return err + } p.lastSendAt = time.Now() p.sendSeq++ + return nil } -func (p *pkt7Type) sendReply(s *streamCommon, replyID []byte, seq uint16) { - p.sendDo(s, replyID, seq) +func (p *pkt7Type) sendReply(s *streamCommon, replyID []byte, seq uint16) error { + return p.sendDo(s, replyID, seq) +} + +func (p *pkt7Type) loop(s *streamCommon) { + for { + if p.timeoutTimer != nil { + select { + case <-p.sendTicker.C: + if err := p.send(s); err != nil { + reportError(err) + } + case <-p.timeoutTimer.C: + reportError(errors.New(s.name + "/ping timeout")) + case <-p.periodicStopNeededChan: + p.periodicStopFinishedChan <- true + return + } + } else { + select { + case <-p.sendTicker.C: + if err := p.send(s); err != nil { + reportError(err) + } + case <-p.periodicStopNeededChan: + p.periodicStopFinishedChan <- true + return + } + } + } } func (p *pkt7Type) startPeriodicSend(s *streamCommon, firstSeqNo uint16, checkPingTimeout bool) { @@ -112,19 +149,21 @@ func (p *pkt7Type) startPeriodicSend(s *streamCommon, firstSeqNo uint16, checkPi p.timeoutTimer = time.NewTimer(pkt7TimeoutDuration) } - go func() { - for { - if checkPingTimeout { - select { - case <-p.sendTicker.C: - p.send(s) - case <-p.timeoutTimer.C: - exit(errors.New(s.name + "/ping timeout")) - } - } else { - <-p.sendTicker.C - p.send(s) - } - } - }() + p.periodicStopNeededChan = make(chan bool) + p.periodicStopFinishedChan = make(chan bool) + go p.loop(s) +} + +func (p *pkt7Type) stopPeriodicSend() { + if p.sendTicker == nil { // Periodic send has not started? + return + } + + p.periodicStopNeededChan <- true + <-p.periodicStopFinishedChan + + if p.timeoutTimer != nil { + p.timeoutTimer.Stop() + } + p.sendTicker.Stop() } diff --git a/seqbuf.go b/seqbuf.go index 14e7665..71d0bc7 100644 --- a/seqbuf.go +++ b/seqbuf.go @@ -232,7 +232,11 @@ func (s *seqBuf) watcher() { e, err := s.get() if err == nil { if s.entryChan != nil { - s.entryChan <- e + select { + case s.entryChan <- e: + case <-s.watcherCloseNeededChan: + return + } } } else { log.Error(err) @@ -271,7 +275,10 @@ func (s *seqBuf) init(length time.Duration, maxSeqNum, maxSeqNumDiff seqNum, ent go s.watcher() } -// func (s *seqBuf) deinit() { -// s.watcherCloseNeededChan <- true -// <-s.watcherCloseDoneChan -// } +func (s *seqBuf) deinit() { + if s.watcherCloseNeededChan == nil { // Init has not ran? + return + } + s.watcherCloseNeededChan <- true + <-s.watcherCloseDoneChan +} diff --git a/serial-linux.go b/serial-linux.go index ee6c9da..e611be5 100644 --- a/serial-linux.go +++ b/serial-linux.go @@ -8,7 +8,13 @@ import ( ) type serialPortStruct struct { - pty *term.PTY + pty *term.PTY + symlink string + + writeLoopDeinitNeededChan chan bool + writeLoopDeinitFinishedChan chan bool + readLoopDeinitNeededChan chan bool + readLoopDeinitFinishedChan chan bool // Read from this channel to receive serial data. read chan []byte @@ -16,20 +22,24 @@ type serialPortStruct struct { write chan []byte } -var serialPort serialPortStruct - func (s *serialPortStruct) writeLoop() { s.write = make(chan []byte) var b []byte for { - b = <-s.write + select { + case b = <-s.write: + case <-s.writeLoopDeinitNeededChan: + s.writeLoopDeinitFinishedChan <- true + return + } + bytesToWrite := len(b) for bytesToWrite > 0 { written, err := s.pty.Master.Write(b) if err != nil { if _, ok := err.(*os.PathError); !ok { - exit(err) + reportError(err) } } b = b[written:] @@ -45,31 +55,55 @@ func (s *serialPortStruct) readLoop() { n, err := s.pty.Master.Read(b) if err != nil { if _, ok := err.(*os.PathError); !ok { - exit(err) + reportError(err) } } - s.read <- b[:n] + + select { + case s.read <- b[:n]: + case <-s.readLoopDeinitNeededChan: + s.readLoopDeinitFinishedChan <- true + return + } } } -func (s *serialPortStruct) init() { - var err error +func (s *serialPortStruct) init(devName string) (err error) { s.pty, err = term.OpenPTY() if err != nil { - exit(err) + return err } n, err := s.pty.PTSName() if err != nil { - exit(err) + return err } - log.Print("opened ", n) + s.symlink = "/tmp/kappanhang-" + devName + ".pty" + _ = os.Remove(s.symlink) + if err := os.Symlink(n, s.symlink); err != nil { + return err + } + log.Print("opened ", n, " as ", s.symlink) + s.readLoopDeinitNeededChan = make(chan bool) + s.readLoopDeinitFinishedChan = make(chan bool) go s.readLoop() + s.writeLoopDeinitNeededChan = make(chan bool) + s.writeLoopDeinitFinishedChan = make(chan bool) go s.writeLoop() + return nil } func (s *serialPortStruct) deinit() { if s.pty != nil { s.pty.Close() } + _ = os.Remove(s.symlink) + if s.readLoopDeinitNeededChan != nil { + s.readLoopDeinitNeededChan <- true + <-s.readLoopDeinitFinishedChan + } + if s.writeLoopDeinitNeededChan != nil { + s.writeLoopDeinitNeededChan <- true + <-s.writeLoopDeinitFinishedChan + } } diff --git a/serialstream.go b/serialstream.go index f5e30f4..39336f4 100644 --- a/serialstream.go +++ b/serialstream.go @@ -6,33 +6,68 @@ import ( type serialStream struct { common streamCommon -} -func (s *serialStream) init() { - s.common.open("serial", 50002) + serialPort serialPortStruct + + deinitNeededChan chan bool + deinitFinishedChan chan bool } func (s *serialStream) handleRead(r []byte) { } -func (s *serialStream) start() { - s.common.sendPkt3() - s.common.waitForPkt4Answer() - s.common.sendPkt6() - s.common.waitForPkt6Answer() +func (s *serialStream) loop() { + for { + select { + case r := <-s.common.readChan: + s.handleRead(r) + case <-s.deinitNeededChan: + s.deinitFinishedChan <- true + return + } + } +} + +func (s *serialStream) start(devName string) error { + if err := s.serialPort.init(devName); err != nil { + return err + } + + if err := s.common.sendPkt3(); err != nil { + return err + } + if err := s.common.waitForPkt4Answer(); err != nil { + return err + } + if err := s.common.sendPkt6(); err != nil { + return err + } + if err := s.common.waitForPkt6Answer(); err != nil { + return err + } log.Print("stream started") s.common.pkt7.startPeriodicSend(&s.common, 1, false) - for { - select { - case r := <-s.common.readChan: - s.handleRead(r) - } - } + s.deinitNeededChan = make(chan bool) + s.deinitFinishedChan = make(chan bool) + go s.loop() + return nil } -func (s *serialStream) sendDisconnect() { - s.common.sendDisconnect() +func (s *serialStream) init() error { + if err := s.common.init("serial", 50002); err != nil { + return err + } + return nil +} + +func (s *serialStream) deinit() { + if s.deinitNeededChan != nil { + s.deinitNeededChan <- true + <-s.deinitFinishedChan + } + s.common.deinit() + s.serialPort.deinit() } diff --git a/streamcommon.go b/streamcommon.go index f20c6b4..c0168b0 100644 --- a/streamcommon.go +++ b/streamcommon.go @@ -15,22 +15,23 @@ import ( const expectTimeoutDuration = time.Second type streamCommon struct { - name string - conn *net.UDPConn - localSID uint32 - remoteSID uint32 - gotRemoteSID bool - readChan chan []byte - readerClosedChan chan bool + name string + conn *net.UDPConn + localSID uint32 + remoteSID uint32 + gotRemoteSID bool + readChan chan []byte + readerCloseNeededChan chan bool + readerCloseFinishedChan chan bool pkt7 pkt7Type } -func (s *streamCommon) send(d []byte) { - _, err := s.conn.Write(d) - if err != nil { - exit(err) +func (s *streamCommon) send(d []byte) error { + if _, err := s.conn.Write(d); err != nil { + return err } + return nil } func (s *streamCommon) read() ([]byte, error) { @@ -43,15 +44,21 @@ func (s *streamCommon) reader() { for { r, err := s.read() if err != nil { - break - } - if s.pkt7.isPkt7(r) { - s.pkt7.handle(s, r) + reportError(err) + } else if s.pkt7.isPkt7(r) { + if err := s.pkt7.handle(s, r); err != nil { + reportError(err) + } + continue } - s.readChan <- r + select { + case s.readChan <- r: + case <-s.readerCloseNeededChan: + s.readerCloseFinishedChan <- true + return + } } - s.readerClosedChan <- true } func (s *streamCommon) tryReceivePacket(timeout time.Duration, packetLength, matchStartByte int, b []byte) []byte { @@ -71,26 +78,85 @@ func (s *streamCommon) tryReceivePacket(timeout time.Duration, packetLength, mat return r } -func (s *streamCommon) expect(packetLength int, b []byte) []byte { +func (s *streamCommon) expect(packetLength int, b []byte) ([]byte, error) { r := s.tryReceivePacket(expectTimeoutDuration, packetLength, 0, b) if r == nil { - exit(errors.New(s.name + "/expect timeout")) + return nil, errors.New(s.name + "/expect timeout") } - return r + return r, nil } -func (s *streamCommon) open(name string, portNumber int) { +func (s *streamCommon) sendPkt3() error { + p := []byte{0x10, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, + byte(s.localSID >> 24), byte(s.localSID >> 16), byte(s.localSID >> 8), byte(s.localSID), + byte(s.remoteSID >> 24), byte(s.remoteSID >> 16), byte(s.remoteSID >> 8), byte(s.remoteSID)} + if err := s.send(p); err != nil { + return err + } + if err := s.send(p); err != nil { + return err + } + return nil +} + +func (s *streamCommon) waitForPkt4Answer() error { + log.Debug(s.name + "/expecting a pkt4 answer") + // Example answer from radio: 0x10, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x8c, 0x7d, 0x45, 0x7a, 0x1d, 0xf6, 0xe9, 0x0b + r, err := s.expect(16, []byte{0x10, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00}) + if err != nil { + return err + } + s.remoteSID = binary.BigEndian.Uint32(r[8:12]) + s.gotRemoteSID = true + return nil +} + +func (s *streamCommon) sendPkt6() error { + p := []byte{0x10, 0x00, 0x00, 0x00, 0x06, 0x00, 0x01, 0x00, + byte(s.localSID >> 24), byte(s.localSID >> 16), byte(s.localSID >> 8), byte(s.localSID), + byte(s.remoteSID >> 24), byte(s.remoteSID >> 16), byte(s.remoteSID >> 8), byte(s.remoteSID)} + if err := s.send(p); err != nil { + return err + } + if err := s.send(p); err != nil { + return err + } + return nil +} + +func (s *streamCommon) waitForPkt6Answer() error { + log.Debug(s.name + "/expecting pkt6 answer") + // Example answer from radio: 0x10, 0x00, 0x00, 0x00, 0x06, 0x00, 0x01, 0x00, 0xe8, 0xd0, 0x44, 0x50, 0xa0, 0x61, 0x39, 0xbe + _, err := s.expect(16, []byte{0x10, 0x00, 0x00, 0x00, 0x06, 0x00, 0x01, 0x00}) + return err +} + +func (s *streamCommon) sendDisconnect() error { + log.Print(s.name + "/disconnecting") + p := []byte{0x10, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, + byte(s.localSID >> 24), byte(s.localSID >> 16), byte(s.localSID >> 8), byte(s.localSID), + byte(s.remoteSID >> 24), byte(s.remoteSID >> 16), byte(s.remoteSID >> 8), byte(s.remoteSID)} + if err := s.send(p); err != nil { + return err + } + if err := s.send(p); err != nil { + return err + } + return nil +} + +func (s *streamCommon) init(name string, portNumber int) error { s.name = name hostPort := fmt.Sprint(connectAddress, ":", portNumber) log.Print(s.name+"/connecting to ", hostPort) raddr, err := net.ResolveUDPAddr("udp", hostPort) if err != nil { - exit(err) + return err } s.conn, err = net.DialUDP("udp", nil, raddr) if err != nil { - exit(err) + return err } // Constructing the local session ID by combining the local IP address and port. @@ -99,71 +165,25 @@ func (s *streamCommon) open(name string, portNumber int) { _, err = rand.Read(s.pkt7.randIDBytes[:]) if err != nil { - exit(err) + return err } s.readChan = make(chan []byte) - s.readerClosedChan = make(chan bool) + s.readerCloseNeededChan = make(chan bool) + s.readerCloseFinishedChan = make(chan bool) go s.reader() + return nil } -func (s *streamCommon) close() { - s.sendDisconnect() +func (s *streamCommon) deinit() { + s.pkt7.stopPeriodicSend() + if s.gotRemoteSID && s.conn != nil { + _ = s.sendDisconnect() + } s.conn.Close() - // Depleting the read channel. - var finished bool - for !finished { - select { - case <-s.readChan: - default: - finished = true - } + if s.readerCloseNeededChan != nil { + s.readerCloseNeededChan <- true + <-s.readerCloseFinishedChan } - - // Waiting for the reader to finish. - <-s.readerClosedChan -} - -func (s *streamCommon) sendPkt3() { - p := []byte{0x10, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, - byte(s.localSID >> 24), byte(s.localSID >> 16), byte(s.localSID >> 8), byte(s.localSID), - byte(s.remoteSID >> 24), byte(s.remoteSID >> 16), byte(s.remoteSID >> 8), byte(s.remoteSID)} - s.send(p) - s.send(p) -} - -func (s *streamCommon) waitForPkt4Answer() { - log.Debug(s.name + "/expecting a pkt4 answer") - // Example answer from radio: 0x10, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x8c, 0x7d, 0x45, 0x7a, 0x1d, 0xf6, 0xe9, 0x0b - r := s.expect(16, []byte{0x10, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00}) - s.remoteSID = binary.BigEndian.Uint32(r[8:12]) - s.gotRemoteSID = true -} - -func (s *streamCommon) sendPkt6() { - p := []byte{0x10, 0x00, 0x00, 0x00, 0x06, 0x00, 0x01, 0x00, - byte(s.localSID >> 24), byte(s.localSID >> 16), byte(s.localSID >> 8), byte(s.localSID), - byte(s.remoteSID >> 24), byte(s.remoteSID >> 16), byte(s.remoteSID >> 8), byte(s.remoteSID)} - s.send(p) - s.send(p) -} - -func (s *streamCommon) waitForPkt6Answer() { - log.Debug(s.name + "/expecting pkt6 answer") - // Example answer from radio: 0x10, 0x00, 0x00, 0x00, 0x06, 0x00, 0x01, 0x00, 0xe8, 0xd0, 0x44, 0x50, 0xa0, 0x61, 0x39, 0xbe - s.expect(16, []byte{0x10, 0x00, 0x00, 0x00, 0x06, 0x00, 0x01, 0x00}) -} - -func (s *streamCommon) sendDisconnect() { - if !s.gotRemoteSID || s.conn == nil { - return - } - - log.Print(s.name + "/disconnecting") - p := []byte{0x10, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, - byte(s.localSID >> 24), byte(s.localSID >> 16), byte(s.localSID >> 8), byte(s.localSID), - byte(s.remoteSID >> 24), byte(s.remoteSID >> 16), byte(s.remoteSID >> 8), byte(s.remoteSID)} - s.send(p) - s.send(p) }