From 1494c37eb5d31361b6957292fa32121641ee525c Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 13 Mar 2026 07:59:46 +0000 Subject: [PATCH 01/11] Initial plan From d4e458370d671f4abc4e3c938b676feafc8d48a9 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 13 Mar 2026 08:18:36 +0000 Subject: [PATCH 02/11] Add built-in MPEGTS-over-TCP stream server (--stream-port) Co-authored-by: yeicor <4929005+yeicor@users.noreply.github.com> --- app/meson.build | 1 + app/src/cli.c | 15 + app/src/options.c | 1 + app/src/options.h | 1 + app/src/scrcpy.c | 36 ++ app/src/stream_sink.c | 749 ++++++++++++++++++++++++++++++++++ app/src/stream_sink.h | 75 ++++ app/src/trait/packet_source.h | 2 +- 8 files changed, 879 insertions(+), 1 deletion(-) create mode 100644 app/src/stream_sink.c create mode 100644 app/src/stream_sink.h diff --git a/app/meson.build b/app/meson.build index f7df69eb..bb52c2f3 100644 --- a/app/meson.build +++ b/app/meson.build @@ -31,6 +31,7 @@ src = [ 'src/receiver.c', 'src/recorder.c', 'src/scrcpy.c', + 'src/stream_sink.c', 'src/screen.c', 'src/server.c', 'src/version.c', diff --git a/app/src/cli.c b/app/src/cli.c index b2e3e30a..b7ececbe 100644 --- a/app/src/cli.c +++ b/app/src/cli.c @@ -114,6 +114,7 @@ enum { OPT_NO_VD_SYSTEM_DECORATIONS, OPT_NO_VD_DESTROY_CONTENT, OPT_DISPLAY_IME_POLICY, + OPT_STREAM_PORT, }; struct sc_option { @@ -956,6 +957,15 @@ static const struct sc_option options[] = { "Default is info.", #endif }, + { + .longopt_id = OPT_STREAM_PORT, + .longopt = "stream-port", + .argdesc = "port", + .text = "Start a TCP server that streams the device video (and audio, " + "if enabled) as MPEG-TS on the given port. " + "Once started, connect with any compatible player using " + "tcp://127.0.0.1: (e.g. in OBS Media Source or VLC).", + }, { .longopt_id = OPT_V4L2_SINK, .longopt = "v4l2-sink", @@ -2686,6 +2696,11 @@ parse_args_with_getopt(struct scrcpy_cli_args *args, int argc, char *argv[], LOGE("OTG mode (--otg) is disabled."); return false; #endif + case OPT_STREAM_PORT: + if (!parse_port(optarg, &opts->stream_port)) { + return false; + } + break; case OPT_V4L2_SINK: #ifdef HAVE_V4L2 opts->v4l2_device = optarg; diff --git a/app/src/options.c b/app/src/options.c index 0fe82d29..a1ce74d9 100644 --- a/app/src/options.c +++ b/app/src/options.c @@ -71,6 +71,7 @@ const struct scrcpy_options scrcpy_options_default = { .v4l2_device = NULL, .v4l2_buffer = 0, #endif + .stream_port = 0, #ifdef HAVE_USB .otg = false, #endif diff --git a/app/src/options.h b/app/src/options.h index 03b42913..88ab7cfd 100644 --- a/app/src/options.h +++ b/app/src/options.h @@ -281,6 +281,7 @@ struct scrcpy_options { const char *v4l2_device; sc_tick v4l2_buffer; #endif + uint16_t stream_port; // 0 means disabled #ifdef HAVE_USB bool otg; #endif diff --git a/app/src/scrcpy.c b/app/src/scrcpy.c index aedfdf9c..5674eae2 100644 --- a/app/src/scrcpy.c +++ b/app/src/scrcpy.c @@ -26,6 +26,7 @@ #include "recorder.h" #include "screen.h" #include "server.h" +#include "stream_sink.h" #include "uhid/gamepad_uhid.h" #include "uhid/keyboard_uhid.h" #include "uhid/mouse_uhid.h" @@ -54,6 +55,7 @@ struct scrcpy { struct sc_decoder video_decoder; struct sc_decoder audio_decoder; struct sc_recorder recorder; + struct sc_stream_sink stream_sink; struct sc_delay_buffer video_buffer; #ifdef HAVE_V4L2 struct sc_v4l2_sink v4l2_sink; @@ -400,6 +402,8 @@ scrcpy(struct scrcpy_options *options) { bool file_pusher_initialized = false; bool recorder_initialized = false; bool recorder_started = false; + bool stream_sink_initialized = false; + bool stream_sink_started = false; #ifdef HAVE_V4L2 bool v4l2_sink_initialized = false; #endif @@ -632,6 +636,28 @@ scrcpy(struct scrcpy_options *options) { } } + if (options->stream_port) { + if (!sc_stream_sink_init(&s->stream_sink, options->stream_port, + options->video, options->audio)) { + goto end; + } + stream_sink_initialized = true; + + if (!sc_stream_sink_start(&s->stream_sink)) { + goto end; + } + stream_sink_started = true; + + if (options->video) { + sc_packet_source_add_sink(&s->video_demuxer.packet_source, + &s->stream_sink.video_packet_sink); + } + if (options->audio) { + sc_packet_source_add_sink(&s->audio_demuxer.packet_source, + &s->stream_sink.audio_packet_sink); + } + } + struct sc_controller *controller = NULL; struct sc_key_processor *kp = NULL; struct sc_mouse_processor *mp = NULL; @@ -989,6 +1015,9 @@ end: if (recorder_initialized) { sc_recorder_stop(&s->recorder); } + if (stream_sink_initialized) { + sc_stream_sink_stop(&s->stream_sink); + } if (screen_initialized) { sc_screen_interrupt(&s->screen); } @@ -1053,6 +1082,13 @@ end: sc_recorder_destroy(&s->recorder); } + if (stream_sink_started) { + sc_stream_sink_join(&s->stream_sink); + } + if (stream_sink_initialized) { + sc_stream_sink_destroy(&s->stream_sink); + } + if (file_pusher_initialized) { sc_file_pusher_join(&s->file_pusher); sc_file_pusher_destroy(&s->file_pusher); diff --git a/app/src/stream_sink.c b/app/src/stream_sink.c new file mode 100644 index 00000000..58d02669 --- /dev/null +++ b/app/src/stream_sink.c @@ -0,0 +1,749 @@ +#include "stream_sink.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "util/log.h" + +/** Downcast packet sinks to stream sink */ +#define DOWNCAST_VIDEO(SINK) \ + container_of(SINK, struct sc_stream_sink, video_packet_sink) +#define DOWNCAST_AUDIO(SINK) \ + container_of(SINK, struct sc_stream_sink, audio_packet_sink) + +static const AVRational SCRCPY_TIME_BASE = {1, 1000000}; // timestamps in us + +static AVPacket * +sc_stream_sink_packet_ref(const AVPacket *packet) { + AVPacket *p = av_packet_alloc(); + if (!p) { + LOG_OOM(); + return NULL; + } + + if (av_packet_ref(p, packet)) { + av_packet_free(&p); + return NULL; + } + + return p; +} + +static void +sc_stream_sink_queue_clear(struct sc_stream_sink_queue *queue) { + while (!sc_vecdeque_is_empty(queue)) { + AVPacket *p = sc_vecdeque_pop(queue); + av_packet_free(&p); + } +} + +static bool +sc_stream_sink_set_extradata(AVStream *ostream, const AVPacket *packet) { + uint8_t *extradata = av_malloc(packet->size * sizeof(uint8_t)); + if (!extradata) { + LOG_OOM(); + return false; + } + + // copy the first packet to the extra data + memcpy(extradata, packet->data, packet->size); + + ostream->codecpar->extradata = extradata; + ostream->codecpar->extradata_size = packet->size; + return true; +} + +static inline void +sc_stream_sink_rescale_packet(AVStream *stream, AVPacket *packet) { + av_packet_rescale_ts(packet, SCRCPY_TIME_BASE, stream->time_base); +} + +static bool +sc_stream_sink_write_stream(struct sc_stream_sink *sink, + struct sc_stream_sink_stream *st, + AVPacket *packet) { + AVStream *stream = sink->ctx->streams[st->index]; + sc_stream_sink_rescale_packet(stream, packet); + if (st->last_pts != AV_NOPTS_VALUE && packet->pts <= st->last_pts) { + LOGD("Fixing PTS non monotonically increasing in stream %d " + "(%" PRIi64 " >= %" PRIi64 ")", + st->index, st->last_pts, packet->pts); + packet->pts = ++st->last_pts; + packet->dts = packet->pts; + } else { + st->last_pts = packet->pts; + } + return av_interleaved_write_frame(sink->ctx, packet) >= 0; +} + +static inline bool +sc_stream_sink_write_video(struct sc_stream_sink *sink, AVPacket *packet) { + return sc_stream_sink_write_stream(sink, &sink->video_stream, packet); +} + +static inline bool +sc_stream_sink_write_audio(struct sc_stream_sink *sink, AVPacket *packet) { + return sc_stream_sink_write_stream(sink, &sink->audio_stream, packet); +} + +static int +sc_stream_sink_interrupt_cb(void *data) { + struct sc_stream_sink *sink = data; + // Read without mutex: this is intentional (same pattern as interrupt + // callbacks in other parts of the codebase) + return sink->stopped ? 1 : 0; +} + +static inline bool +sc_stream_sink_must_wait_for_config_packets(struct sc_stream_sink *sink) { + if (sink->video && sc_vecdeque_is_empty(&sink->video_queue)) { + // The video queue is empty + return true; + } + + if (sink->audio && sink->audio_expects_config_packet + && sc_vecdeque_is_empty(&sink->audio_queue)) { + // The audio queue is empty (when audio is enabled) + return true; + } + + // No queue is empty + return false; +} + +static bool +sc_stream_sink_process_header(struct sc_stream_sink *sink) { + sc_mutex_lock(&sink->mutex); + + while (!sink->stopped && + ((sink->video && !sink->video_init) + || (sink->audio && !sink->audio_init) + || sc_stream_sink_must_wait_for_config_packets(sink))) { + sc_cond_wait(&sink->cond, &sink->mutex); + } + + if (sink->video && sc_vecdeque_is_empty(&sink->video_queue)) { + assert(sink->stopped); + // If the stream sink is stopped, don't process anything if there are + // not at least video packets + sc_mutex_unlock(&sink->mutex); + return false; + } + + AVPacket *video_pkt = NULL; + if (!sc_vecdeque_is_empty(&sink->video_queue)) { + assert(sink->video); + video_pkt = sc_vecdeque_pop(&sink->video_queue); + } + + AVPacket *audio_pkt = NULL; + if (sink->audio_expects_config_packet && + !sc_vecdeque_is_empty(&sink->audio_queue)) { + assert(sink->audio); + audio_pkt = sc_vecdeque_pop(&sink->audio_queue); + } + + sc_mutex_unlock(&sink->mutex); + + bool ret = false; + + if (video_pkt) { + if (video_pkt->pts != AV_NOPTS_VALUE) { + LOGE("The first video packet is not a config packet"); + goto end; + } + + assert(sink->video_stream.index >= 0); + AVStream *video_stream = sink->ctx->streams[sink->video_stream.index]; + bool ok = sc_stream_sink_set_extradata(video_stream, video_pkt); + if (!ok) { + goto end; + } + } + + if (audio_pkt) { + if (audio_pkt->pts != AV_NOPTS_VALUE) { + LOGE("The first audio packet is not a config packet"); + goto end; + } + + assert(sink->audio_stream.index >= 0); + AVStream *audio_stream = sink->ctx->streams[sink->audio_stream.index]; + bool ok = sc_stream_sink_set_extradata(audio_stream, audio_pkt); + if (!ok) { + goto end; + } + } + + { + // Open the TCP server: this blocks until a client connects (or + // sink->stopped is set, via the interrupt callback) + char url[64]; + snprintf(url, sizeof(url), + "tcp://0.0.0.0:%" PRIu16 "?listen=1", sink->port); + + AVIOInterruptCB int_cb = { + .callback = sc_stream_sink_interrupt_cb, + .opaque = sink, + }; + + LOGI("Stream sink: waiting for client on tcp://127.0.0.1:%" PRIu16, + sink->port); + + int r = avio_open2(&sink->ctx->pb, url, AVIO_FLAG_WRITE, + &int_cb, NULL); + if (r < 0) { + if (!sink->stopped) { + LOGE("Failed to open stream server on port %" PRIu16, + sink->port); + } + goto end; + } + } + + { + bool ok = avformat_write_header(sink->ctx, NULL) >= 0; + if (!ok) { + LOGE("Failed to write stream header"); + avio_close(sink->ctx->pb); + sink->ctx->pb = NULL; + goto end; + } + } + + ret = true; + +end: + if (video_pkt) { + av_packet_free(&video_pkt); + } + if (audio_pkt) { + av_packet_free(&audio_pkt); + } + + return ret; +} + +static bool +sc_stream_sink_process_packets(struct sc_stream_sink *sink) { + int64_t pts_origin = AV_NOPTS_VALUE; + + bool header_written = sc_stream_sink_process_header(sink); + if (!header_written) { + return false; + } + + LOGI("Stream sink: streaming started on tcp://127.0.0.1:%" PRIu16, + sink->port); + + AVPacket *video_pkt = NULL; + AVPacket *audio_pkt = NULL; + + // We can write a video packet only once we received the next one so that + // we can set its duration (next_pts - current_pts) + AVPacket *video_pkt_previous = NULL; + + bool error = false; + + for (;;) { + sc_mutex_lock(&sink->mutex); + + while (!sink->stopped) { + if (sink->video && !video_pkt && + !sc_vecdeque_is_empty(&sink->video_queue)) { + // A new packet may be assigned to video_pkt and be processed + break; + } + if (sink->audio && !audio_pkt + && !sc_vecdeque_is_empty(&sink->audio_queue)) { + // A new packet may be assigned to audio_pkt and be processed + break; + } + sc_cond_wait(&sink->cond, &sink->mutex); + } + + // If stopped is set, continue to process the remaining events (to + // finish the streaming) before actually stopping. + + // If there is no video, then the video_queue will remain empty forever + // and video_pkt will always be NULL. + assert(sink->video || (!video_pkt + && sc_vecdeque_is_empty(&sink->video_queue))); + + // If there is no audio, then the audio_queue will remain empty forever + // and audio_pkt will always be NULL. + assert(sink->audio || (!audio_pkt + && sc_vecdeque_is_empty(&sink->audio_queue))); + + if (!video_pkt && !sc_vecdeque_is_empty(&sink->video_queue)) { + video_pkt = sc_vecdeque_pop(&sink->video_queue); + } + + if (!audio_pkt && !sc_vecdeque_is_empty(&sink->audio_queue)) { + audio_pkt = sc_vecdeque_pop(&sink->audio_queue); + } + + if (sink->stopped && !video_pkt && !audio_pkt) { + assert(sc_vecdeque_is_empty(&sink->video_queue)); + assert(sc_vecdeque_is_empty(&sink->audio_queue)); + sc_mutex_unlock(&sink->mutex); + break; + } + + assert(video_pkt || audio_pkt); // at least one + + sc_mutex_unlock(&sink->mutex); + + // Ignore further config packets (e.g. on device orientation + // change). The next non-config packet will have the config packet + // data prepended. + if (video_pkt && video_pkt->pts == AV_NOPTS_VALUE) { + av_packet_free(&video_pkt); + video_pkt = NULL; + } + + if (audio_pkt && audio_pkt->pts == AV_NOPTS_VALUE) { + av_packet_free(&audio_pkt); + audio_pkt = NULL; + } + + if (pts_origin == AV_NOPTS_VALUE) { + if (!sink->audio) { + assert(video_pkt); + pts_origin = video_pkt->pts; + } else if (!sink->video) { + assert(audio_pkt); + pts_origin = audio_pkt->pts; + } else if (video_pkt && audio_pkt) { + pts_origin = MIN(video_pkt->pts, audio_pkt->pts); + } else if (sink->stopped) { + if (video_pkt) { + // The sink is stopped without audio, stream the video + // packets + pts_origin = video_pkt->pts; + } else { + // Fail if there is no video + error = true; + goto end; + } + } else { + // We need both video and audio packets to initialize pts_origin + continue; + } + } + + assert(pts_origin != AV_NOPTS_VALUE); + + if (video_pkt) { + video_pkt->pts -= pts_origin; + video_pkt->dts = video_pkt->pts; + + if (video_pkt_previous) { + // we now know the duration of the previous packet + video_pkt_previous->duration = video_pkt->pts + - video_pkt_previous->pts; + + bool ok = sc_stream_sink_write_video(sink, video_pkt_previous); + av_packet_free(&video_pkt_previous); + if (!ok) { + LOGE("Could not write video packet to stream"); + error = true; + goto end; + } + } + + video_pkt_previous = video_pkt; + video_pkt = NULL; + } + + if (audio_pkt) { + audio_pkt->pts -= pts_origin; + audio_pkt->dts = audio_pkt->pts; + + bool ok = sc_stream_sink_write_audio(sink, audio_pkt); + if (!ok) { + LOGE("Could not write audio packet to stream"); + error = true; + goto end; + } + + av_packet_free(&audio_pkt); + audio_pkt = NULL; + } + } + + // Write the last video packet + AVPacket *last = video_pkt_previous; + if (last) { + // assign an arbitrary duration to the last packet: 0.1s in us + last->duration = 100000; + bool ok = sc_stream_sink_write_video(sink, last); + if (!ok) { + // failing to write the last frame is not very serious, no + // future frame may depend on it, so the resulting stream + // will still be valid + LOGW("Could not write last packet to stream"); + } + av_packet_free(&last); + last = NULL; + } + + av_write_trailer(sink->ctx); + +end: + if (video_pkt) { + av_packet_free(&video_pkt); + } + if (audio_pkt) { + av_packet_free(&audio_pkt); + } + if (video_pkt_previous) { + av_packet_free(&video_pkt_previous); + } + + return !error; +} + +static int +run_stream_sink(void *data) { + struct sc_stream_sink *sink = data; + + bool ok = sc_stream_sink_process_packets(sink); + + sc_mutex_lock(&sink->mutex); + // Prevent the producer from pushing any new packet + sink->stopped = true; + // Discard pending packets + sc_stream_sink_queue_clear(&sink->video_queue); + sc_stream_sink_queue_clear(&sink->audio_queue); + sc_mutex_unlock(&sink->mutex); + + if (sink->ctx->pb) { + avio_close(sink->ctx->pb); + sink->ctx->pb = NULL; + } + + if (ok) { + LOGI("Stream sink complete"); + } else { + LOGE("Stream sink failed"); + } + + LOGD("Stream sink thread ended"); + + return 0; +} + +static void +sc_stream_sink_stream_init(struct sc_stream_sink_stream *stream) { + stream->index = -1; + stream->last_pts = AV_NOPTS_VALUE; +} + +static bool +sc_stream_sink_video_packet_sink_open(struct sc_packet_sink *sink, + AVCodecContext *ctx) { + struct sc_stream_sink *ss = DOWNCAST_VIDEO(sink); + // only written from this thread, no need to lock + assert(!ss->video_init); + + sc_mutex_lock(&ss->mutex); + if (ss->stopped) { + sc_mutex_unlock(&ss->mutex); + return false; + } + + AVStream *stream = avformat_new_stream(ss->ctx, ctx->codec); + if (!stream) { + sc_mutex_unlock(&ss->mutex); + return false; + } + + int r = avcodec_parameters_from_context(stream->codecpar, ctx); + if (r < 0) { + sc_mutex_unlock(&ss->mutex); + return false; + } + + ss->video_stream.index = stream->index; + + ss->video_init = true; + sc_cond_signal(&ss->cond); + sc_mutex_unlock(&ss->mutex); + + return true; +} + +static void +sc_stream_sink_video_packet_sink_close(struct sc_packet_sink *sink) { + struct sc_stream_sink *ss = DOWNCAST_VIDEO(sink); + // only written from this thread, no need to lock + assert(ss->video_init); + + sc_mutex_lock(&ss->mutex); + // EOS also stops the stream sink + ss->stopped = true; + sc_cond_signal(&ss->cond); + sc_mutex_unlock(&ss->mutex); +} + +static bool +sc_stream_sink_video_packet_sink_push(struct sc_packet_sink *sink, + const AVPacket *packet) { + struct sc_stream_sink *ss = DOWNCAST_VIDEO(sink); + // only written from this thread, no need to lock + assert(ss->video_init); + + sc_mutex_lock(&ss->mutex); + + if (ss->stopped) { + // reject any new packet + sc_mutex_unlock(&ss->mutex); + return false; + } + + AVPacket *p = sc_stream_sink_packet_ref(packet); + if (!p) { + LOG_OOM(); + sc_mutex_unlock(&ss->mutex); + return false; + } + + p->stream_index = ss->video_stream.index; + + bool ok = sc_vecdeque_push(&ss->video_queue, p); + if (!ok) { + LOG_OOM(); + av_packet_free(&p); + sc_mutex_unlock(&ss->mutex); + return false; + } + + sc_cond_signal(&ss->cond); + + sc_mutex_unlock(&ss->mutex); + return true; +} + +static bool +sc_stream_sink_audio_packet_sink_open(struct sc_packet_sink *sink, + AVCodecContext *ctx) { + struct sc_stream_sink *ss = DOWNCAST_AUDIO(sink); + assert(ss->audio); + // only written from this thread, no need to lock + assert(!ss->audio_init); + + sc_mutex_lock(&ss->mutex); + + AVStream *stream = avformat_new_stream(ss->ctx, ctx->codec); + if (!stream) { + sc_mutex_unlock(&ss->mutex); + return false; + } + + int r = avcodec_parameters_from_context(stream->codecpar, ctx); + if (r < 0) { + sc_mutex_unlock(&ss->mutex); + return false; + } + + ss->audio_stream.index = stream->index; + + // A config packet is provided for all supported formats except raw audio + ss->audio_expects_config_packet = + ctx->codec_id != AV_CODEC_ID_PCM_S16LE; + + ss->audio_init = true; + sc_cond_signal(&ss->cond); + sc_mutex_unlock(&ss->mutex); + + return true; +} + +static void +sc_stream_sink_audio_packet_sink_close(struct sc_packet_sink *sink) { + struct sc_stream_sink *ss = DOWNCAST_AUDIO(sink); + assert(ss->audio); + // only written from this thread, no need to lock + assert(ss->audio_init); + + sc_mutex_lock(&ss->mutex); + // EOS also stops the stream sink + ss->stopped = true; + sc_cond_signal(&ss->cond); + sc_mutex_unlock(&ss->mutex); +} + +static bool +sc_stream_sink_audio_packet_sink_push(struct sc_packet_sink *sink, + const AVPacket *packet) { + struct sc_stream_sink *ss = DOWNCAST_AUDIO(sink); + assert(ss->audio); + // only written from this thread, no need to lock + assert(ss->audio_init); + + sc_mutex_lock(&ss->mutex); + + if (ss->stopped) { + // reject any new packet + sc_mutex_unlock(&ss->mutex); + return false; + } + + AVPacket *p = sc_stream_sink_packet_ref(packet); + if (!p) { + LOG_OOM(); + sc_mutex_unlock(&ss->mutex); + return false; + } + + p->stream_index = ss->audio_stream.index; + + bool ok = sc_vecdeque_push(&ss->audio_queue, p); + if (!ok) { + LOG_OOM(); + av_packet_free(&p); + sc_mutex_unlock(&ss->mutex); + return false; + } + + sc_cond_signal(&ss->cond); + + sc_mutex_unlock(&ss->mutex); + return true; +} + +static void +sc_stream_sink_audio_packet_sink_disable(struct sc_packet_sink *sink) { + struct sc_stream_sink *ss = DOWNCAST_AUDIO(sink); + assert(ss->audio); + // only written from this thread, no need to lock + assert(!ss->audio_init); + + LOGW("Audio stream disabled for stream sink"); + + sc_mutex_lock(&ss->mutex); + ss->audio = false; + ss->audio_init = true; + sc_cond_signal(&ss->cond); + sc_mutex_unlock(&ss->mutex); +} + +bool +sc_stream_sink_init(struct sc_stream_sink *sink, uint16_t port, + bool video, bool audio) { + assert(video || audio); + + sink->port = port; + sink->video = video; + sink->audio = audio; + + bool ok = sc_mutex_init(&sink->mutex); + if (!ok) { + return false; + } + + ok = sc_cond_init(&sink->cond); + if (!ok) { + goto error_mutex_destroy; + } + + sink->stopped = false; + + sc_vecdeque_init(&sink->video_queue); + sc_vecdeque_init(&sink->audio_queue); + + sink->video_init = false; + sink->audio_init = false; + + sink->audio_expects_config_packet = false; + + sc_stream_sink_stream_init(&sink->video_stream); + sc_stream_sink_stream_init(&sink->audio_stream); + + // Allocate the output format context with mpegts (ideal for TCP streaming) + const AVOutputFormat *oformat = av_guess_format("mpegts", NULL, NULL); + if (!oformat) { + LOGE("Could not find mpegts muxer"); + goto error_cond_destroy; + } + + sink->ctx = avformat_alloc_context(); + if (!sink->ctx) { + LOG_OOM(); + goto error_cond_destroy; + } + + // contrary to the deprecated API (av_oformat_next()), av_muxer_iterate() + // returns (on purpose) a pointer-to-const, but AVFormatContext.oformat + // still expects a pointer-to-non-const (it has not been updated accordingly) + // + sink->ctx->oformat = (AVOutputFormat *) oformat; + + if (video) { + static const struct sc_packet_sink_ops video_ops = { + .open = sc_stream_sink_video_packet_sink_open, + .close = sc_stream_sink_video_packet_sink_close, + .push = sc_stream_sink_video_packet_sink_push, + }; + + sink->video_packet_sink.ops = &video_ops; + } + + if (audio) { + static const struct sc_packet_sink_ops audio_ops = { + .open = sc_stream_sink_audio_packet_sink_open, + .close = sc_stream_sink_audio_packet_sink_close, + .push = sc_stream_sink_audio_packet_sink_push, + .disable = sc_stream_sink_audio_packet_sink_disable, + }; + + sink->audio_packet_sink.ops = &audio_ops; + } + + return true; + +error_cond_destroy: + sc_cond_destroy(&sink->cond); +error_mutex_destroy: + sc_mutex_destroy(&sink->mutex); + + return false; +} + +bool +sc_stream_sink_start(struct sc_stream_sink *sink) { + bool ok = sc_thread_create(&sink->thread, run_stream_sink, + "scrcpy-stream-sink", sink); + if (!ok) { + LOGE("Could not start stream sink thread"); + return false; + } + + return true; +} + +void +sc_stream_sink_stop(struct sc_stream_sink *sink) { + sc_mutex_lock(&sink->mutex); + sink->stopped = true; + sc_cond_signal(&sink->cond); + sc_mutex_unlock(&sink->mutex); +} + +void +sc_stream_sink_join(struct sc_stream_sink *sink) { + sc_thread_join(&sink->thread, NULL); +} + +void +sc_stream_sink_destroy(struct sc_stream_sink *sink) { + sc_cond_destroy(&sink->cond); + sc_mutex_destroy(&sink->mutex); + avformat_free_context(sink->ctx); +} diff --git a/app/src/stream_sink.h b/app/src/stream_sink.h new file mode 100644 index 00000000..f07f0ea4 --- /dev/null +++ b/app/src/stream_sink.h @@ -0,0 +1,75 @@ +#ifndef SC_STREAM_SINK_H +#define SC_STREAM_SINK_H + +#include "common.h" + +#include +#include +#include +#include + +#include "trait/packet_sink.h" +#include "util/thread.h" +#include "util/vecdeque.h" + +struct sc_stream_sink_queue SC_VECDEQUE(AVPacket *); + +struct sc_stream_sink_stream { + int index; + int64_t last_pts; +}; + +struct sc_stream_sink { + struct sc_packet_sink video_packet_sink; + struct sc_packet_sink audio_packet_sink; + + /* The audio flag is unprotected: + * - it is initialized from sc_stream_sink_init() from the main thread; + * - it may be reset once from the stream sink thread if the audio is + * disabled dynamically. + * + * Therefore, once the stream sink thread is started, only the stream sink + * thread may access it without data races. + */ + bool audio; + bool video; + + uint16_t port; + + AVFormatContext *ctx; + + sc_thread thread; + sc_mutex mutex; + sc_cond cond; + // set on sc_stream_sink_stop(), packet_sink close or streaming failure + bool stopped; + struct sc_stream_sink_queue video_queue; + struct sc_stream_sink_queue audio_queue; + + // wake up the stream sink thread once the video or audio codec is known + bool video_init; + bool audio_init; + + bool audio_expects_config_packet; + + struct sc_stream_sink_stream video_stream; + struct sc_stream_sink_stream audio_stream; +}; + +bool +sc_stream_sink_init(struct sc_stream_sink *sink, uint16_t port, + bool video, bool audio); + +bool +sc_stream_sink_start(struct sc_stream_sink *sink); + +void +sc_stream_sink_stop(struct sc_stream_sink *sink); + +void +sc_stream_sink_join(struct sc_stream_sink *sink); + +void +sc_stream_sink_destroy(struct sc_stream_sink *sink); + +#endif diff --git a/app/src/trait/packet_source.h b/app/src/trait/packet_source.h index 8788021a..71fb769e 100644 --- a/app/src/trait/packet_source.h +++ b/app/src/trait/packet_source.h @@ -7,7 +7,7 @@ #include "trait/packet_sink.h" -#define SC_PACKET_SOURCE_MAX_SINKS 2 +#define SC_PACKET_SOURCE_MAX_SINKS 3 /** * Packet source trait From 2fcba7e5b3f5435fa76cd4d4359bf09c042bf767 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 13 Mar 2026 08:43:31 +0000 Subject: [PATCH 03/11] Fix: shorten stream sink thread name to satisfy 15-char limit Co-authored-by: yeicor <4929005+yeicor@users.noreply.github.com> --- app/src/stream_sink.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/src/stream_sink.c b/app/src/stream_sink.c index 58d02669..fc72c7ab 100644 --- a/app/src/stream_sink.c +++ b/app/src/stream_sink.c @@ -719,7 +719,7 @@ error_mutex_destroy: bool sc_stream_sink_start(struct sc_stream_sink *sink) { bool ok = sc_thread_create(&sink->thread, run_stream_sink, - "scrcpy-stream-sink", sink); + "scrcpy-stream", sink); if (!ok) { LOGE("Could not start stream sink thread"); return false; From eb585c2c4f6fc9dd4a1ae4bf9af4f700e26ee0b9 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 13 Mar 2026 09:43:53 +0000 Subject: [PATCH 04/11] Rename --stream-port to --srt-sink (SRT protocol), fix video/audio assertion with --no-playback Co-authored-by: yeicor <4929005+yeicor@users.noreply.github.com> --- app/src/cli.c | 36 +++++++++++++++++-------------- app/src/options.c | 2 +- app/src/options.h | 2 +- app/src/scrcpy.c | 4 ++-- app/src/stream_sink.c | 49 +++++++++++++++++++++++++++++-------------- app/src/stream_sink.h | 5 ++--- 6 files changed, 59 insertions(+), 39 deletions(-) diff --git a/app/src/cli.c b/app/src/cli.c index b7ececbe..06c6db0b 100644 --- a/app/src/cli.c +++ b/app/src/cli.c @@ -114,7 +114,7 @@ enum { OPT_NO_VD_SYSTEM_DECORATIONS, OPT_NO_VD_DESTROY_CONTENT, OPT_DISPLAY_IME_POLICY, - OPT_STREAM_PORT, + OPT_SRT_SINK, }; struct sc_option { @@ -958,13 +958,17 @@ static const struct sc_option options[] = { #endif }, { - .longopt_id = OPT_STREAM_PORT, - .longopt = "stream-port", - .argdesc = "port", - .text = "Start a TCP server that streams the device video (and audio, " - "if enabled) as MPEG-TS on the given port. " - "Once started, connect with any compatible player using " - "tcp://127.0.0.1: (e.g. in OBS Media Source or VLC).", + .longopt_id = OPT_SRT_SINK, + .longopt = "srt-sink", + .argdesc = "url", + .text = "Stream the device video (and audio, if enabled) as MPEG-TS " + "over SRT to the given URL.\n" + "Example: srt://0.0.0.0:8080\n" + "scrcpy acts as the SRT listener (server) by default; " + "?mode=listener is appended automatically if not present.\n" + "Connect with any SRT-compatible player, e.g.:\n" + " VLC: srt://127.0.0.1:8080\n" + " ffplay: -i srt://127.0.0.1:8080", }, { .longopt_id = OPT_V4L2_SINK, @@ -2696,10 +2700,8 @@ parse_args_with_getopt(struct scrcpy_cli_args *args, int argc, char *argv[], LOGE("OTG mode (--otg) is disabled."); return false; #endif - case OPT_STREAM_PORT: - if (!parse_port(optarg, &opts->stream_port)) { - return false; - } + case OPT_SRT_SINK: + opts->srt_sink = optarg; break; case OPT_V4L2_SINK: #ifdef HAVE_V4L2 @@ -2891,13 +2893,15 @@ parse_args_with_getopt(struct scrcpy_cli_args *args, int argc, char *argv[], } if (opts->video && !opts->video_playback && !opts->record_filename - && !v4l2) { - LOGI("No video playback, no recording, no V4L2 sink: video disabled"); + && !v4l2 && !opts->srt_sink) { + LOGI("No video playback, no recording, no V4L2 sink, no SRT sink: " + "video disabled"); opts->video = false; } - if (opts->audio && !opts->audio_playback && !opts->record_filename) { - LOGI("No audio playback, no recording: audio disabled"); + if (opts->audio && !opts->audio_playback && !opts->record_filename + && !opts->srt_sink) { + LOGI("No audio playback, no recording, no SRT sink: audio disabled"); opts->audio = false; } diff --git a/app/src/options.c b/app/src/options.c index a1ce74d9..77768c0e 100644 --- a/app/src/options.c +++ b/app/src/options.c @@ -71,7 +71,7 @@ const struct scrcpy_options scrcpy_options_default = { .v4l2_device = NULL, .v4l2_buffer = 0, #endif - .stream_port = 0, + .srt_sink = NULL, #ifdef HAVE_USB .otg = false, #endif diff --git a/app/src/options.h b/app/src/options.h index 88ab7cfd..138992de 100644 --- a/app/src/options.h +++ b/app/src/options.h @@ -281,7 +281,7 @@ struct scrcpy_options { const char *v4l2_device; sc_tick v4l2_buffer; #endif - uint16_t stream_port; // 0 means disabled + const char *srt_sink; #ifdef HAVE_USB bool otg; #endif diff --git a/app/src/scrcpy.c b/app/src/scrcpy.c index 5674eae2..20e52e9a 100644 --- a/app/src/scrcpy.c +++ b/app/src/scrcpy.c @@ -636,8 +636,8 @@ scrcpy(struct scrcpy_options *options) { } } - if (options->stream_port) { - if (!sc_stream_sink_init(&s->stream_sink, options->stream_port, + if (options->srt_sink) { + if (!sc_stream_sink_init(&s->stream_sink, options->srt_sink, options->video, options->audio)) { goto end; } diff --git a/app/src/stream_sink.c b/app/src/stream_sink.c index fc72c7ab..38ec8a9c 100644 --- a/app/src/stream_sink.c +++ b/app/src/stream_sink.c @@ -181,26 +181,37 @@ sc_stream_sink_process_header(struct sc_stream_sink *sink) { } { - // Open the TCP server: this blocks until a client connects (or - // sink->stopped is set, via the interrupt callback) - char url[64]; - snprintf(url, sizeof(url), - "tcp://0.0.0.0:%" PRIu16 "?listen=1", sink->port); + // Build the SRT listener URL. If the user already specified + // mode=, use the URL as-is; otherwise append ?mode=listener + // so that scrcpy acts as the SRT server waiting for a player. + const char *connect_url = sink->url; + char *alloc_url = NULL; + if (!strstr(sink->url, "mode=")) { + const char *sep = strchr(sink->url, '?') ? "&" : "?"; + const char *suffix = "mode=listener"; + size_t len = strlen(sink->url) + strlen(sep) + strlen(suffix) + 1; + alloc_url = malloc(len); + if (!alloc_url) { + LOG_OOM(); + goto end; + } + snprintf(alloc_url, len, "%s%s%s", sink->url, sep, suffix); + connect_url = alloc_url; + } AVIOInterruptCB int_cb = { .callback = sc_stream_sink_interrupt_cb, .opaque = sink, }; - LOGI("Stream sink: waiting for client on tcp://127.0.0.1:%" PRIu16, - sink->port); + LOGI("SRT sink: waiting for client on %s", sink->url); - int r = avio_open2(&sink->ctx->pb, url, AVIO_FLAG_WRITE, + int r = avio_open2(&sink->ctx->pb, connect_url, AVIO_FLAG_WRITE, &int_cb, NULL); + free(alloc_url); if (r < 0) { if (!sink->stopped) { - LOGE("Failed to open stream server on port %" PRIu16, - sink->port); + LOGE("Failed to open SRT server on %s", sink->url); } goto end; } @@ -238,8 +249,7 @@ sc_stream_sink_process_packets(struct sc_stream_sink *sink) { return false; } - LOGI("Stream sink: streaming started on tcp://127.0.0.1:%" PRIu16, - sink->port); + LOGI("SRT sink: streaming started on %s", sink->url); AVPacket *video_pkt = NULL; AVPacket *audio_pkt = NULL; @@ -635,17 +645,21 @@ sc_stream_sink_audio_packet_sink_disable(struct sc_packet_sink *sink) { } bool -sc_stream_sink_init(struct sc_stream_sink *sink, uint16_t port, +sc_stream_sink_init(struct sc_stream_sink *sink, const char *url, bool video, bool audio) { assert(video || audio); - sink->port = port; + sink->url = strdup(url); + if (!sink->url) { + LOG_OOM(); + return false; + } sink->video = video; sink->audio = audio; bool ok = sc_mutex_init(&sink->mutex); if (!ok) { - return false; + goto error_url_free; } ok = sc_cond_init(&sink->cond); @@ -666,7 +680,7 @@ sc_stream_sink_init(struct sc_stream_sink *sink, uint16_t port, sc_stream_sink_stream_init(&sink->video_stream); sc_stream_sink_stream_init(&sink->audio_stream); - // Allocate the output format context with mpegts (ideal for TCP streaming) + // Allocate the output format context with mpegts (for network streaming) const AVOutputFormat *oformat = av_guess_format("mpegts", NULL, NULL); if (!oformat) { LOGE("Could not find mpegts muxer"); @@ -712,6 +726,8 @@ error_cond_destroy: sc_cond_destroy(&sink->cond); error_mutex_destroy: sc_mutex_destroy(&sink->mutex); +error_url_free: + free(sink->url); return false; } @@ -746,4 +762,5 @@ sc_stream_sink_destroy(struct sc_stream_sink *sink) { sc_cond_destroy(&sink->cond); sc_mutex_destroy(&sink->mutex); avformat_free_context(sink->ctx); + free(sink->url); } diff --git a/app/src/stream_sink.h b/app/src/stream_sink.h index f07f0ea4..7ac3c5ec 100644 --- a/app/src/stream_sink.h +++ b/app/src/stream_sink.h @@ -4,7 +4,6 @@ #include "common.h" #include -#include #include #include @@ -34,7 +33,7 @@ struct sc_stream_sink { bool audio; bool video; - uint16_t port; + char *url; AVFormatContext *ctx; @@ -57,7 +56,7 @@ struct sc_stream_sink { }; bool -sc_stream_sink_init(struct sc_stream_sink *sink, uint16_t port, +sc_stream_sink_init(struct sc_stream_sink *sink, const char *url, bool video, bool audio); bool From 98ad4c0ef902132977475c3474359155d7ec7900 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 13 Mar 2026 09:59:58 +0000 Subject: [PATCH 05/11] Rename --srt-sink to --stream-sink Co-authored-by: yeicor <4929005+yeicor@users.noreply.github.com> --- app/src/cli.c | 18 +++++++++--------- app/src/options.c | 2 +- app/src/options.h | 2 +- app/src/scrcpy.c | 4 ++-- app/src/stream_sink.c | 4 ++-- 5 files changed, 15 insertions(+), 15 deletions(-) diff --git a/app/src/cli.c b/app/src/cli.c index 06c6db0b..2929c987 100644 --- a/app/src/cli.c +++ b/app/src/cli.c @@ -114,7 +114,7 @@ enum { OPT_NO_VD_SYSTEM_DECORATIONS, OPT_NO_VD_DESTROY_CONTENT, OPT_DISPLAY_IME_POLICY, - OPT_SRT_SINK, + OPT_STREAM_SINK, }; struct sc_option { @@ -958,8 +958,8 @@ static const struct sc_option options[] = { #endif }, { - .longopt_id = OPT_SRT_SINK, - .longopt = "srt-sink", + .longopt_id = OPT_STREAM_SINK, + .longopt = "stream-sink", .argdesc = "url", .text = "Stream the device video (and audio, if enabled) as MPEG-TS " "over SRT to the given URL.\n" @@ -2700,8 +2700,8 @@ parse_args_with_getopt(struct scrcpy_cli_args *args, int argc, char *argv[], LOGE("OTG mode (--otg) is disabled."); return false; #endif - case OPT_SRT_SINK: - opts->srt_sink = optarg; + case OPT_STREAM_SINK: + opts->stream_sink = optarg; break; case OPT_V4L2_SINK: #ifdef HAVE_V4L2 @@ -2893,15 +2893,15 @@ parse_args_with_getopt(struct scrcpy_cli_args *args, int argc, char *argv[], } if (opts->video && !opts->video_playback && !opts->record_filename - && !v4l2 && !opts->srt_sink) { - LOGI("No video playback, no recording, no V4L2 sink, no SRT sink: " + && !v4l2 && !opts->stream_sink) { + LOGI("No video playback, no recording, no V4L2 sink, no stream sink: " "video disabled"); opts->video = false; } if (opts->audio && !opts->audio_playback && !opts->record_filename - && !opts->srt_sink) { - LOGI("No audio playback, no recording, no SRT sink: audio disabled"); + && !opts->stream_sink) { + LOGI("No audio playback, no recording, no stream sink: audio disabled"); opts->audio = false; } diff --git a/app/src/options.c b/app/src/options.c index 77768c0e..cb51c3c6 100644 --- a/app/src/options.c +++ b/app/src/options.c @@ -71,7 +71,7 @@ const struct scrcpy_options scrcpy_options_default = { .v4l2_device = NULL, .v4l2_buffer = 0, #endif - .srt_sink = NULL, + .stream_sink = NULL, #ifdef HAVE_USB .otg = false, #endif diff --git a/app/src/options.h b/app/src/options.h index 138992de..182c05bb 100644 --- a/app/src/options.h +++ b/app/src/options.h @@ -281,7 +281,7 @@ struct scrcpy_options { const char *v4l2_device; sc_tick v4l2_buffer; #endif - const char *srt_sink; + const char *stream_sink; #ifdef HAVE_USB bool otg; #endif diff --git a/app/src/scrcpy.c b/app/src/scrcpy.c index 20e52e9a..eb804ca6 100644 --- a/app/src/scrcpy.c +++ b/app/src/scrcpy.c @@ -636,8 +636,8 @@ scrcpy(struct scrcpy_options *options) { } } - if (options->srt_sink) { - if (!sc_stream_sink_init(&s->stream_sink, options->srt_sink, + if (options->stream_sink) { + if (!sc_stream_sink_init(&s->stream_sink, options->stream_sink, options->video, options->audio)) { goto end; } diff --git a/app/src/stream_sink.c b/app/src/stream_sink.c index 38ec8a9c..8a483a91 100644 --- a/app/src/stream_sink.c +++ b/app/src/stream_sink.c @@ -204,7 +204,7 @@ sc_stream_sink_process_header(struct sc_stream_sink *sink) { .opaque = sink, }; - LOGI("SRT sink: waiting for client on %s", sink->url); + LOGI("Stream sink: waiting for client on %s", sink->url); int r = avio_open2(&sink->ctx->pb, connect_url, AVIO_FLAG_WRITE, &int_cb, NULL); @@ -249,7 +249,7 @@ sc_stream_sink_process_packets(struct sc_stream_sink *sink) { return false; } - LOGI("SRT sink: streaming started on %s", sink->url); + LOGI("Stream sink: streaming started on %s", sink->url); AVPacket *video_pkt = NULL; AVPacket *audio_pkt = NULL; From 930b7c5a5f71ecc30cb458bb3380509360c68ddd Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 13 Mar 2026 10:11:25 +0000 Subject: [PATCH 06/11] Tune stream-sink for low-latency live streaming; support srt/tcp/udp/rtp Co-authored-by: yeicor <4929005+yeicor@users.noreply.github.com> --- app/src/cli.c | 25 +++++--- app/src/stream_sink.c | 137 ++++++++++++++++++++++++++++++++++++------ 2 files changed, 137 insertions(+), 25 deletions(-) diff --git a/app/src/cli.c b/app/src/cli.c index 2929c987..9521a789 100644 --- a/app/src/cli.c +++ b/app/src/cli.c @@ -962,13 +962,24 @@ static const struct sc_option options[] = { .longopt = "stream-sink", .argdesc = "url", .text = "Stream the device video (and audio, if enabled) as MPEG-TS " - "over SRT to the given URL.\n" - "Example: srt://0.0.0.0:8080\n" - "scrcpy acts as the SRT listener (server) by default; " - "?mode=listener is appended automatically if not present.\n" - "Connect with any SRT-compatible player, e.g.:\n" - " VLC: srt://127.0.0.1:8080\n" - " ffplay: -i srt://127.0.0.1:8080", + "to the given URL. Tuned for low-latency live streaming.\n" + "\n" + "Supported protocols and auto-applied server settings:\n" + " srt://HOST:PORT SRT (recommended); adds ?mode=listener " + "and ?latency=50 automatically\n" + " tcp://HOST:PORT raw TCP; adds ?listen=1 automatically\n" + " udp://HOST:PORT UDP (lowest latency, unreliable)\n" + " rtp://HOST:PORT RTP over UDP\n" + "Unknown protocols are used as-is (with a warning).\n" + "\n" + "Low-latency client examples (connect after starting scrcpy):\n" + " ffplay -fflags nobuffer -flags low_delay -framedrop " + "-i srt://127.0.0.1:8080\n" + " ffplay -fflags nobuffer -flags low_delay -framedrop " + "-i tcp://127.0.0.1:8080\n" + " ffplay -fflags nobuffer -flags low_delay -framedrop " + "-i udp://127.0.0.1:8080\n" + " VLC: Media > Open Network Stream > srt://127.0.0.1:8080", }, { .longopt_id = OPT_V4L2_SINK, diff --git a/app/src/stream_sink.c b/app/src/stream_sink.c index 8a483a91..098d155a 100644 --- a/app/src/stream_sink.c +++ b/app/src/stream_sink.c @@ -18,6 +18,111 @@ static const AVRational SCRCPY_TIME_BASE = {1, 1000000}; // timestamps in us +/** Return true if `key=` appears in the URL's query string. */ +static bool +sc_url_has_param(const char *url, const char *key) { + const char *q = strchr(url, '?'); + if (!q) { + return false; + } + size_t klen = strlen(key); + const char *p = q + 1; + while (*p) { + if (!strncmp(p, key, klen) && p[klen] == '=') { + return true; + } + const char *amp = strchr(p, '&'); + if (!amp) { + break; + } + p = amp + 1; + } + return false; +} + +/** Append "key=value" to url. Returns a newly allocated string. */ +static char * +sc_url_append_param(const char *url, const char *key, const char *value) { + const char *sep = strchr(url, '?') ? "&" : "?"; + size_t len = strlen(url) + strlen(sep) + strlen(key) + 1 /* '=' */ + + strlen(value) + 1 /* '\0' */; + char *result = malloc(len); + if (!result) { + LOG_OOM(); + return NULL; + } + snprintf(result, len, "%s%s%s=%s", url, sep, key, value); + return result; +} + +/** + * Build the connect URL for the stream sink. + * + * For known protocols: + * - srt:// adds ?mode=listener and ?latency=50 (ms) if not already set + * (override with ?latency=200 or higher for WAN links) + * - tcp:// adds ?listen=1 if not already set + * - udp://, rtp:// connectionless; returned as-is + * Unknown protocols emit a warning and are returned as-is. + * + * Returns a newly allocated string; the caller must free it. + */ +static char * +sc_stream_sink_build_connect_url(const char *url) { + bool is_srt = !strncmp(url, "srt://", 6); + bool is_tcp = !strncmp(url, "tcp://", 6); + bool is_udp = !strncmp(url, "udp://", 6); + bool is_rtp = !strncmp(url, "rtp://", 6); + + if (!is_srt && !is_tcp && !is_udp && !is_rtp) { + LOGW("Stream sink: unrecognized protocol in \"%s\"; " + "no listener mode or latency tuning applied", url); + return strdup(url); + } + + char *result = strdup(url); + if (!result) { + LOG_OOM(); + return NULL; + } + + if (is_srt) { + // scrcpy acts as the SRT listener (server) by default + if (!sc_url_has_param(result, "mode")) { + char *tmp = sc_url_append_param(result, "mode", "listener"); + free(result); + if (!tmp) { + return NULL; + } + result = tmp; + } + // Keep SRT protocol latency low (default 120 ms is too high for live + // screen mirroring; 50 ms is comfortable for LAN). + // Users on high-latency WAN links can override with e.g. ?latency=200. + if (!sc_url_has_param(result, "latency")) { + char *tmp = sc_url_append_param(result, "latency", "50"); + free(result); + if (!tmp) { + return NULL; + } + result = tmp; + } + } else if (is_tcp) { + // scrcpy acts as the TCP server, waiting for a player to connect + if (!sc_url_has_param(result, "listen")) { + char *tmp = sc_url_append_param(result, "listen", "1"); + free(result); + if (!tmp) { + return NULL; + } + result = tmp; + } + } + // udp:// and rtp:// are connectionless; no listener mode needed + + return result; +} + static AVPacket * sc_stream_sink_packet_ref(const AVPacket *packet) { AVPacket *p = av_packet_alloc(); @@ -181,22 +286,9 @@ sc_stream_sink_process_header(struct sc_stream_sink *sink) { } { - // Build the SRT listener URL. If the user already specified - // mode=, use the URL as-is; otherwise append ?mode=listener - // so that scrcpy acts as the SRT server waiting for a player. - const char *connect_url = sink->url; - char *alloc_url = NULL; - if (!strstr(sink->url, "mode=")) { - const char *sep = strchr(sink->url, '?') ? "&" : "?"; - const char *suffix = "mode=listener"; - size_t len = strlen(sink->url) + strlen(sep) + strlen(suffix) + 1; - alloc_url = malloc(len); - if (!alloc_url) { - LOG_OOM(); - goto end; - } - snprintf(alloc_url, len, "%s%s%s", sink->url, sep, suffix); - connect_url = alloc_url; + char *connect_url = sc_stream_sink_build_connect_url(sink->url); + if (!connect_url) { + goto end; } AVIOInterruptCB int_cb = { @@ -208,10 +300,10 @@ sc_stream_sink_process_header(struct sc_stream_sink *sink) { int r = avio_open2(&sink->ctx->pb, connect_url, AVIO_FLAG_WRITE, &int_cb, NULL); - free(alloc_url); + free(connect_url); if (r < 0) { if (!sink->stopped) { - LOGE("Failed to open SRT server on %s", sink->url); + LOGE("Failed to open stream server on %s", sink->url); } goto end; } @@ -693,6 +785,15 @@ sc_stream_sink_init(struct sc_stream_sink *sink, const char *url, goto error_cond_destroy; } + // Flush every packet immediately to the network: essential for live + // streaming where any output buffering adds perceivable latency. + // Trade-off: slightly higher CPU/network overhead per packet. + sink->ctx->flags |= AVFMT_FLAG_FLUSH_PACKETS; + // Limit interleave buffering so that audio and video are not held + // waiting for each other longer than 100 ms (AV_TIME_BASE / 10). + // Default (0) means "no limit", which causes unbounded buffering. + sink->ctx->max_interleave_delta = AV_TIME_BASE / 10; // 100 ms + // contrary to the deprecated API (av_oformat_next()), av_muxer_iterate() // returns (on purpose) a pointer-to-const, but AVFormatContext.oformat // still expects a pointer-to-non-const (it has not been updated accordingly) From 243f0fd568fbdcc7031fa1e78192d3286545035f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 13 Mar 2026 10:35:50 +0000 Subject: [PATCH 07/11] stream-sink: reconnect + concurrent multi-client support Co-authored-by: yeicor <4929005+yeicor@users.noreply.github.com> --- app/src/stream_sink.c | 636 ++++++++++++++++++++++++++++++------------ app/src/stream_sink.h | 18 ++ 2 files changed, 483 insertions(+), 171 deletions(-) diff --git a/app/src/stream_sink.c b/app/src/stream_sink.c index 098d155a..e9cd4b76 100644 --- a/app/src/stream_sink.c +++ b/app/src/stream_sink.c @@ -18,6 +18,9 @@ static const AVRational SCRCPY_TIME_BASE = {1, 1000000}; // timestamps in us +/** Arbitrary duration assigned to the last video packet (0.1 s in µs). */ +#define LAST_VIDEO_PACKET_DURATION_US 100000 + /** Return true if `key=` appears in the URL's query string. */ static bool sc_url_has_param(const char *url, const char *key) { @@ -163,16 +166,85 @@ sc_stream_sink_set_extradata(AVStream *ostream, const AVPacket *packet) { return true; } +/** + * Per-connection state for one active streaming client. + * + * All fields that require synchronisation are protected by the parent + * sc_stream_sink::mutex / sc_stream_sink::cond. Fields only accessed from + * the client's own thread (video/audio stream PTS tracking) need no locking. + */ +struct sc_stream_sink_client { + struct sc_stream_sink *sink; /**< back-pointer to parent */ + AVFormatContext *ctx; /**< own output context (with pb) */ + sc_thread thread; + + /** Request the write loop to stop (protected by sink->mutex). */ + bool stopped; + /** The thread has exited; safe to join (protected by sink->mutex). */ + bool finished; + + /** Per-client packet queues (protected by sink->mutex). */ + struct sc_stream_sink_queue video_queue; + struct sc_stream_sink_queue audio_queue; + + /** PTS tracking – only accessed from the client thread, no mutex needed. */ + struct sc_stream_sink_stream video_stream; + struct sc_stream_sink_stream audio_stream; + + /** Intrusive linked list, protected by sink->mutex. */ + struct sc_stream_sink_client *next; +}; + +/** + * Allocate a new AVFormatContext initialised from the template context. + * + * Copies all streams and their codec parameters (including extradata) so that + * the returned context is ready to have a pb attached and a stream header + * written. Returns NULL on allocation failure. + */ +static AVFormatContext * +sc_stream_sink_create_client_ctx(const struct sc_stream_sink *sink) { + const AVOutputFormat *oformat = av_guess_format("mpegts", NULL, NULL); + assert(oformat); // already verified in sc_stream_sink_init + + AVFormatContext *ctx = avformat_alloc_context(); + if (!ctx) { + LOG_OOM(); + return NULL; + } + + // See sc_stream_sink_init for the rationale behind these flags. + ctx->oformat = (AVOutputFormat *) oformat; + ctx->flags |= AVFMT_FLAG_FLUSH_PACKETS; + ctx->max_interleave_delta = AV_TIME_BASE / 10; // 100 ms + + for (unsigned i = 0; i < sink->ctx->nb_streams; i++) { + AVStream *src = sink->ctx->streams[i]; + AVStream *dst = avformat_new_stream(ctx, NULL); + if (!dst) { + avformat_free_context(ctx); + return NULL; + } + if (avcodec_parameters_copy(dst->codecpar, src->codecpar) < 0) { + avformat_free_context(ctx); + return NULL; + } + dst->time_base = src->time_base; + } + + return ctx; +} + static inline void sc_stream_sink_rescale_packet(AVStream *stream, AVPacket *packet) { av_packet_rescale_ts(packet, SCRCPY_TIME_BASE, stream->time_base); } static bool -sc_stream_sink_write_stream(struct sc_stream_sink *sink, +sc_stream_sink_write_stream(struct sc_stream_sink_client *client, struct sc_stream_sink_stream *st, AVPacket *packet) { - AVStream *stream = sink->ctx->streams[st->index]; + AVStream *stream = client->ctx->streams[st->index]; sc_stream_sink_rescale_packet(stream, packet); if (st->last_pts != AV_NOPTS_VALUE && packet->pts <= st->last_pts) { LOGD("Fixing PTS non monotonically increasing in stream %d " @@ -183,17 +255,19 @@ sc_stream_sink_write_stream(struct sc_stream_sink *sink, } else { st->last_pts = packet->pts; } - return av_interleaved_write_frame(sink->ctx, packet) >= 0; + return av_interleaved_write_frame(client->ctx, packet) >= 0; } static inline bool -sc_stream_sink_write_video(struct sc_stream_sink *sink, AVPacket *packet) { - return sc_stream_sink_write_stream(sink, &sink->video_stream, packet); +sc_stream_sink_write_video(struct sc_stream_sink_client *client, + AVPacket *packet) { + return sc_stream_sink_write_stream(client, &client->video_stream, packet); } static inline bool -sc_stream_sink_write_audio(struct sc_stream_sink *sink, AVPacket *packet) { - return sc_stream_sink_write_stream(sink, &sink->audio_stream, packet); +sc_stream_sink_write_audio(struct sc_stream_sink_client *client, + AVPacket *packet) { + return sc_stream_sink_write_stream(client, &client->audio_stream, packet); } static int @@ -204,6 +278,14 @@ sc_stream_sink_interrupt_cb(void *data) { return sink->stopped ? 1 : 0; } +/** Interrupt callback for a per-client AVFormatContext. */ +static int +sc_stream_sink_client_interrupt_cb(void *data) { + struct sc_stream_sink_client *client = data; + // Read without mutex: intentional (benign race; same pattern as above) + return (client->stopped || client->sink->stopped) ? 1 : 0; +} + static inline bool sc_stream_sink_must_wait_for_config_packets(struct sc_stream_sink *sink) { if (sink->video && sc_vecdeque_is_empty(&sink->video_queue)) { @@ -221,8 +303,17 @@ sc_stream_sink_must_wait_for_config_packets(struct sc_stream_sink *sink) { return false; } +/** + * Wait for codec initialisation and the first config packets, then apply the + * codec parameters and extradata to the template AVFormatContext + * (sink->ctx). Does NOT open any network connection. + * + * On success, sink->template_ready is set to true and the init-phase queues + * are cleared. Returns false only when the sink is stopped before init + * completes. + */ static bool -sc_stream_sink_process_header(struct sc_stream_sink *sink) { +sc_stream_sink_init_template(struct sc_stream_sink *sink) { sc_mutex_lock(&sink->mutex); while (!sink->stopped && @@ -234,8 +325,6 @@ sc_stream_sink_process_header(struct sc_stream_sink *sink) { if (sink->video && sc_vecdeque_is_empty(&sink->video_queue)) { assert(sink->stopped); - // If the stream sink is stopped, don't process anything if there are - // not at least video packets sc_mutex_unlock(&sink->mutex); return false; } @@ -285,69 +374,45 @@ sc_stream_sink_process_header(struct sc_stream_sink *sink) { } } - { - char *connect_url = sc_stream_sink_build_connect_url(sink->url); - if (!connect_url) { - goto end; - } - - AVIOInterruptCB int_cb = { - .callback = sc_stream_sink_interrupt_cb, - .opaque = sink, - }; - - LOGI("Stream sink: waiting for client on %s", sink->url); - - int r = avio_open2(&sink->ctx->pb, connect_url, AVIO_FLAG_WRITE, - &int_cb, NULL); - free(connect_url); - if (r < 0) { - if (!sink->stopped) { - LOGE("Failed to open stream server on %s", sink->url); - } - goto end; - } - } - - { - bool ok = avformat_write_header(sink->ctx, NULL) >= 0; - if (!ok) { - LOGE("Failed to write stream header"); - avio_close(sink->ctx->pb); - sink->ctx->pb = NULL; - goto end; - } - } - ret = true; end: - if (video_pkt) { - av_packet_free(&video_pkt); - } - if (audio_pkt) { - av_packet_free(&audio_pkt); + av_packet_free(&video_pkt); + av_packet_free(&audio_pkt); + + if (ret) { + // Atomically mark the template as ready and discard any packets that + // accumulated in the init-phase queues during the wait above. + // From this point push() fans out directly to per-client queues. + sc_mutex_lock(&sink->mutex); + sink->template_ready = true; + sc_stream_sink_queue_clear(&sink->video_queue); + sc_stream_sink_queue_clear(&sink->audio_queue); + sc_mutex_unlock(&sink->mutex); } return ret; } +/** + * Per-client packet write loop. + * + * Dequeues packets from the client's own video/audio queues (which receive + * fan-out copies from the push callbacks) and writes them to client->ctx->pb. + * Returns false only when a write error occurs (i.e. the client disconnected); + * returns true when stopped cleanly. + */ static bool -sc_stream_sink_process_packets(struct sc_stream_sink *sink) { +sc_stream_sink_client_run_stream(struct sc_stream_sink_client *client) { + struct sc_stream_sink *sink = client->sink; + int64_t pts_origin = AV_NOPTS_VALUE; - bool header_written = sc_stream_sink_process_header(sink); - if (!header_written) { - return false; - } - - LOGI("Stream sink: streaming started on %s", sink->url); - AVPacket *video_pkt = NULL; AVPacket *audio_pkt = NULL; - // We can write a video packet only once we received the next one so that - // we can set its duration (next_pts - current_pts) + // Buffer the previous video packet until the next one arrives so we can + // compute its duration. AVPacket *video_pkt_previous = NULL; bool error = false; @@ -355,55 +420,43 @@ sc_stream_sink_process_packets(struct sc_stream_sink *sink) { for (;;) { sc_mutex_lock(&sink->mutex); - while (!sink->stopped) { + while (!client->stopped) { if (sink->video && !video_pkt && - !sc_vecdeque_is_empty(&sink->video_queue)) { - // A new packet may be assigned to video_pkt and be processed + !sc_vecdeque_is_empty(&client->video_queue)) { break; } - if (sink->audio && !audio_pkt - && !sc_vecdeque_is_empty(&sink->audio_queue)) { - // A new packet may be assigned to audio_pkt and be processed + if (sink->audio && !audio_pkt && + !sc_vecdeque_is_empty(&client->audio_queue)) { break; } sc_cond_wait(&sink->cond, &sink->mutex); } - // If stopped is set, continue to process the remaining events (to - // finish the streaming) before actually stopping. + // client->stopped may now be set; drain remaining packets before exit. - // If there is no video, then the video_queue will remain empty forever - // and video_pkt will always be NULL. assert(sink->video || (!video_pkt - && sc_vecdeque_is_empty(&sink->video_queue))); - - // If there is no audio, then the audio_queue will remain empty forever - // and audio_pkt will always be NULL. + && sc_vecdeque_is_empty(&client->video_queue))); assert(sink->audio || (!audio_pkt - && sc_vecdeque_is_empty(&sink->audio_queue))); + && sc_vecdeque_is_empty(&client->audio_queue))); - if (!video_pkt && !sc_vecdeque_is_empty(&sink->video_queue)) { - video_pkt = sc_vecdeque_pop(&sink->video_queue); + if (!video_pkt && !sc_vecdeque_is_empty(&client->video_queue)) { + video_pkt = sc_vecdeque_pop(&client->video_queue); } - if (!audio_pkt && !sc_vecdeque_is_empty(&sink->audio_queue)) { - audio_pkt = sc_vecdeque_pop(&sink->audio_queue); + if (!audio_pkt && !sc_vecdeque_is_empty(&client->audio_queue)) { + audio_pkt = sc_vecdeque_pop(&client->audio_queue); } - if (sink->stopped && !video_pkt && !audio_pkt) { - assert(sc_vecdeque_is_empty(&sink->video_queue)); - assert(sc_vecdeque_is_empty(&sink->audio_queue)); + if (client->stopped && !video_pkt && !audio_pkt) { sc_mutex_unlock(&sink->mutex); break; } - assert(video_pkt || audio_pkt); // at least one + assert(video_pkt || audio_pkt); sc_mutex_unlock(&sink->mutex); - // Ignore further config packets (e.g. on device orientation - // change). The next non-config packet will have the config packet - // data prepended. + // Discard further config packets (e.g. on device orientation change). if (video_pkt && video_pkt->pts == AV_NOPTS_VALUE) { av_packet_free(&video_pkt); video_pkt = NULL; @@ -423,18 +476,15 @@ sc_stream_sink_process_packets(struct sc_stream_sink *sink) { pts_origin = audio_pkt->pts; } else if (video_pkt && audio_pkt) { pts_origin = MIN(video_pkt->pts, audio_pkt->pts); - } else if (sink->stopped) { + } else if (client->stopped) { if (video_pkt) { - // The sink is stopped without audio, stream the video - // packets pts_origin = video_pkt->pts; } else { - // Fail if there is no video - error = true; + // Stopped without any usable video: nothing to stream. goto end; } } else { - // We need both video and audio packets to initialize pts_origin + // Need both video and audio to initialise pts_origin. continue; } } @@ -446,14 +496,13 @@ sc_stream_sink_process_packets(struct sc_stream_sink *sink) { video_pkt->dts = video_pkt->pts; if (video_pkt_previous) { - // we now know the duration of the previous packet - video_pkt_previous->duration = video_pkt->pts - - video_pkt_previous->pts; + video_pkt_previous->duration = + video_pkt->pts - video_pkt_previous->pts; - bool ok = sc_stream_sink_write_video(sink, video_pkt_previous); + bool ok = sc_stream_sink_write_video(client, video_pkt_previous); av_packet_free(&video_pkt_previous); if (!ok) { - LOGE("Could not write video packet to stream"); + LOGD("Stream sink: client disconnected (video write error)"); error = true; goto end; } @@ -467,73 +516,260 @@ sc_stream_sink_process_packets(struct sc_stream_sink *sink) { audio_pkt->pts -= pts_origin; audio_pkt->dts = audio_pkt->pts; - bool ok = sc_stream_sink_write_audio(sink, audio_pkt); + bool ok = sc_stream_sink_write_audio(client, audio_pkt); + av_packet_free(&audio_pkt); + audio_pkt = NULL; if (!ok) { - LOGE("Could not write audio packet to stream"); + LOGD("Stream sink: client disconnected (audio write error)"); error = true; goto end; } - - av_packet_free(&audio_pkt); - audio_pkt = NULL; } } - // Write the last video packet - AVPacket *last = video_pkt_previous; - if (last) { - // assign an arbitrary duration to the last packet: 0.1s in us - last->duration = 100000; - bool ok = sc_stream_sink_write_video(sink, last); + // Write the last video packet. + if (video_pkt_previous) { + video_pkt_previous->duration = LAST_VIDEO_PACKET_DURATION_US; + bool ok = sc_stream_sink_write_video(client, video_pkt_previous); if (!ok) { - // failing to write the last frame is not very serious, no - // future frame may depend on it, so the resulting stream - // will still be valid - LOGW("Could not write last packet to stream"); + LOGW("Stream sink: could not write last video packet"); } - av_packet_free(&last); - last = NULL; + av_packet_free(&video_pkt_previous); + video_pkt_previous = NULL; } - av_write_trailer(sink->ctx); + av_write_trailer(client->ctx); end: - if (video_pkt) { - av_packet_free(&video_pkt); - } - if (audio_pkt) { - av_packet_free(&audio_pkt); - } - if (video_pkt_previous) { - av_packet_free(&video_pkt_previous); - } + av_packet_free(&video_pkt); + av_packet_free(&audio_pkt); + av_packet_free(&video_pkt_previous); return !error; } +/** + * Thread function for a per-client connection. + * + * Writes packets until the client disconnects or the sink is stopped, then + * closes the connection and marks itself as finished so the accept loop can + * reap it. + */ +static int +run_stream_sink_client(void *data) { + struct sc_stream_sink_client *client = data; + struct sc_stream_sink *sink = client->sink; + + sc_stream_sink_client_run_stream(client); + + // Close this client's network connection. + if (client->ctx->pb) { + avio_close(client->ctx->pb); + client->ctx->pb = NULL; + } + + // Mark as finished so the accept loop can join and free us. + sc_mutex_lock(&sink->mutex); + client->finished = true; + // Drain any packets that arrived between the write error and now. + sc_stream_sink_queue_clear(&client->video_queue); + sc_stream_sink_queue_clear(&client->audio_queue); + sc_mutex_unlock(&sink->mutex); + + LOGD("Stream sink: client thread ended"); + + return 0; +} + +/** + * Join and free all client threads that have set finished=true. + * Must be called from the accept loop (main sink thread). + */ +static void +sc_stream_sink_reap_dead_clients(struct sc_stream_sink *sink) { + struct sc_stream_sink_client *dead = NULL; + + sc_mutex_lock(&sink->mutex); + struct sc_stream_sink_client **pp = &sink->clients; + while (*pp) { + struct sc_stream_sink_client *c = *pp; + if (c->finished) { + *pp = c->next; // remove from list + c->next = dead; + dead = c; + } else { + pp = &c->next; + } + } + sc_mutex_unlock(&sink->mutex); + + while (dead) { + struct sc_stream_sink_client *c = dead; + dead = c->next; + sc_thread_join(&c->thread, NULL); + avformat_free_context(c->ctx); + sc_vecdeque_destroy(&c->video_queue); + sc_vecdeque_destroy(&c->audio_queue); + free(c); + } +} + +/** + * Accept loop: initialises the template context once, then repeatedly accepts + * incoming connections, spawning a per-client thread for each. Runs until + * sink->stopped is set (by sc_stream_sink_stop() or device EOS). + */ + +// Forward declaration: defined below alongside the other packet-sink callbacks. +static void sc_stream_sink_stream_init(struct sc_stream_sink_stream *stream); + static int run_stream_sink(void *data) { struct sc_stream_sink *sink = data; - bool ok = sc_stream_sink_process_packets(sink); - - sc_mutex_lock(&sink->mutex); - // Prevent the producer from pushing any new packet - sink->stopped = true; - // Discard pending packets - sc_stream_sink_queue_clear(&sink->video_queue); - sc_stream_sink_queue_clear(&sink->audio_queue); - sc_mutex_unlock(&sink->mutex); - - if (sink->ctx->pb) { - avio_close(sink->ctx->pb); - sink->ctx->pb = NULL; + bool ok = sc_stream_sink_init_template(sink); + if (!ok) { + LOGE("Stream sink: initialisation failed"); + goto stop; } - if (ok) { - LOGI("Stream sink complete"); - } else { - LOGE("Stream sink failed"); + char *connect_url = sc_stream_sink_build_connect_url(sink->url); + if (!connect_url) { + goto stop; + } + + LOGI("Stream sink: listening for clients on %s", sink->url); + + while (!sink->stopped) { + // Reap any client threads that finished since the last iteration. + sc_stream_sink_reap_dead_clients(sink); + + AVIOInterruptCB int_cb = { + .callback = sc_stream_sink_interrupt_cb, + .opaque = sink, + }; + + // Block here until one client connects (or sink is stopped). + AVIOContext *pb = NULL; + int r = avio_open2(&pb, connect_url, AVIO_FLAG_WRITE, &int_cb, NULL); + if (r < 0) { + if (!sink->stopped) { + LOGE("Stream sink: failed to accept connection on %s", + sink->url); + } + break; + } + + // Build a fresh output context for this client from the template. + AVFormatContext *client_ctx = sc_stream_sink_create_client_ctx(sink); + if (!client_ctx) { + avio_close(pb); + continue; + } + client_ctx->pb = pb; + + // Allocate and initialise the client struct. + struct sc_stream_sink_client *client = + calloc(1, sizeof(struct sc_stream_sink_client)); + if (!client) { + LOG_OOM(); + avio_close(client_ctx->pb); + client_ctx->pb = NULL; + avformat_free_context(client_ctx); + continue; + } + client->sink = sink; + client->ctx = client_ctx; + client->stopped = false; + client->finished = false; + sc_vecdeque_init(&client->video_queue); + sc_vecdeque_init(&client->audio_queue); + sc_stream_sink_stream_init(&client->video_stream); + sc_stream_sink_stream_init(&client->audio_stream); + // Copy shared stream indices so the client can look up its streams. + client->video_stream.index = sink->video_stream.index; + client->audio_stream.index = sink->audio_stream.index; + + // Switch the client context's interrupt callback to check client->stopped too, + // so blocking I/O in the client thread can be interrupted on demand. + client_ctx->interrupt_callback.callback = sc_stream_sink_client_interrupt_cb; + client_ctx->interrupt_callback.opaque = client; + + // Write the MPEG-TS stream header for this client. + if (avformat_write_header(client_ctx, NULL) < 0) { + LOGE("Stream sink: failed to write stream header to client"); + avio_close(client_ctx->pb); + client_ctx->pb = NULL; + avformat_free_context(client_ctx); + free(client); + continue; + } + + // Add the client to the active list before spawning its thread so + // that packets pushed between now and the thread start are queued. + sc_mutex_lock(&sink->mutex); + client->next = sink->clients; + sink->clients = client; + sc_mutex_unlock(&sink->mutex); + + bool thread_ok = sc_thread_create(&client->thread, + run_stream_sink_client, + "scrcpy-stream-client", client); + if (!thread_ok) { + LOGE("Stream sink: could not create client thread"); + sc_mutex_lock(&sink->mutex); + // Remove from list (it was just prepended) + sink->clients = client->next; + sc_stream_sink_queue_clear(&client->video_queue); + sc_stream_sink_queue_clear(&client->audio_queue); + sc_mutex_unlock(&sink->mutex); + // avformat_write_header already moved pb ownership; close it. + if (client_ctx->pb) { + avio_close(client_ctx->pb); + client_ctx->pb = NULL; + } + avformat_free_context(client_ctx); + sc_vecdeque_destroy(&client->video_queue); + sc_vecdeque_destroy(&client->audio_queue); + free(client); + continue; + } + + LOGI("Stream sink: client connected on %s", sink->url); + } + + free(connect_url); + +stop: + // Stop and drain all active clients. + sc_mutex_lock(&sink->mutex); + sink->stopped = true; + struct sc_stream_sink_client *c = sink->clients; + while (c) { + c->stopped = true; + c = c->next; + } + sc_cond_broadcast(&sink->cond); + sc_mutex_unlock(&sink->mutex); + + // Join every remaining client thread, then free the client structs. + sc_mutex_lock(&sink->mutex); + struct sc_stream_sink_client *head = sink->clients; + sink->clients = NULL; + sc_mutex_unlock(&sink->mutex); + + while (head) { + struct sc_stream_sink_client *next = head->next; + sc_thread_join(&head->thread, NULL); + if (head->ctx->pb) { + avio_close(head->ctx->pb); + head->ctx->pb = NULL; + } + avformat_free_context(head->ctx); + sc_vecdeque_destroy(&head->video_queue); + sc_vecdeque_destroy(&head->audio_queue); + free(head); + head = next; } LOGD("Stream sink thread ended"); @@ -590,7 +826,12 @@ sc_stream_sink_video_packet_sink_close(struct sc_packet_sink *sink) { sc_mutex_lock(&ss->mutex); // EOS also stops the stream sink ss->stopped = true; - sc_cond_signal(&ss->cond); + struct sc_stream_sink_client *c = ss->clients; + while (c) { + c->stopped = true; + c = c->next; + } + sc_cond_broadcast(&ss->cond); sc_mutex_unlock(&ss->mutex); } @@ -609,24 +850,46 @@ sc_stream_sink_video_packet_sink_push(struct sc_packet_sink *sink, return false; } - AVPacket *p = sc_stream_sink_packet_ref(packet); - if (!p) { - LOG_OOM(); + if (!ss->template_ready) { + // Init phase: buffer in the sink-level queue for sc_stream_sink_init_template. + AVPacket *p = sc_stream_sink_packet_ref(packet); + if (!p) { + LOG_OOM(); + sc_mutex_unlock(&ss->mutex); + return false; + } + p->stream_index = ss->video_stream.index; + bool ok = sc_vecdeque_push(&ss->video_queue, p); + if (!ok) { + LOG_OOM(); + av_packet_free(&p); + sc_mutex_unlock(&ss->mutex); + return false; + } + sc_cond_signal(&ss->cond); sc_mutex_unlock(&ss->mutex); - return false; + return true; } - p->stream_index = ss->video_stream.index; - - bool ok = sc_vecdeque_push(&ss->video_queue, p); - if (!ok) { - LOG_OOM(); - av_packet_free(&p); - sc_mutex_unlock(&ss->mutex); - return false; + // Live phase: fan out a ref-counted copy to every active client queue. + bool any_ok = false; + struct sc_stream_sink_client *c = ss->clients; + while (c) { + if (!c->stopped) { + AVPacket *p = sc_stream_sink_packet_ref(packet); + if (p) { + p->stream_index = ss->video_stream.index; + if (sc_vecdeque_push(&c->video_queue, p)) { + any_ok = true; + } else { + av_packet_free(&p); + } + } + } + c = c->next; } - - sc_cond_signal(&ss->cond); + (void) any_ok; // dropping when no clients is intentional for live streams + sc_cond_broadcast(&ss->cond); sc_mutex_unlock(&ss->mutex); return true; @@ -677,7 +940,12 @@ sc_stream_sink_audio_packet_sink_close(struct sc_packet_sink *sink) { sc_mutex_lock(&ss->mutex); // EOS also stops the stream sink ss->stopped = true; - sc_cond_signal(&ss->cond); + struct sc_stream_sink_client *c = ss->clients; + while (c) { + c->stopped = true; + c = c->next; + } + sc_cond_broadcast(&ss->cond); sc_mutex_unlock(&ss->mutex); } @@ -697,24 +965,42 @@ sc_stream_sink_audio_packet_sink_push(struct sc_packet_sink *sink, return false; } - AVPacket *p = sc_stream_sink_packet_ref(packet); - if (!p) { - LOG_OOM(); + if (!ss->template_ready) { + // Init phase: buffer in the sink-level queue. + AVPacket *p = sc_stream_sink_packet_ref(packet); + if (!p) { + LOG_OOM(); + sc_mutex_unlock(&ss->mutex); + return false; + } + p->stream_index = ss->audio_stream.index; + bool ok = sc_vecdeque_push(&ss->audio_queue, p); + if (!ok) { + LOG_OOM(); + av_packet_free(&p); + sc_mutex_unlock(&ss->mutex); + return false; + } + sc_cond_signal(&ss->cond); sc_mutex_unlock(&ss->mutex); - return false; + return true; } - p->stream_index = ss->audio_stream.index; - - bool ok = sc_vecdeque_push(&ss->audio_queue, p); - if (!ok) { - LOG_OOM(); - av_packet_free(&p); - sc_mutex_unlock(&ss->mutex); - return false; + // Live phase: fan out to every active client. + struct sc_stream_sink_client *c = ss->clients; + while (c) { + if (!c->stopped) { + AVPacket *p = sc_stream_sink_packet_ref(packet); + if (p) { + p->stream_index = ss->audio_stream.index; + if (!sc_vecdeque_push(&c->audio_queue, p)) { + av_packet_free(&p); + } + } + } + c = c->next; } - - sc_cond_signal(&ss->cond); + sc_cond_broadcast(&ss->cond); sc_mutex_unlock(&ss->mutex); return true; @@ -760,6 +1046,8 @@ sc_stream_sink_init(struct sc_stream_sink *sink, const char *url, } sink->stopped = false; + sink->template_ready = false; + sink->clients = NULL; sc_vecdeque_init(&sink->video_queue); sc_vecdeque_init(&sink->audio_queue); @@ -849,7 +1137,13 @@ void sc_stream_sink_stop(struct sc_stream_sink *sink) { sc_mutex_lock(&sink->mutex); sink->stopped = true; - sc_cond_signal(&sink->cond); + // Also stop all active clients so their I/O is interrupted promptly. + struct sc_stream_sink_client *c = sink->clients; + while (c) { + c->stopped = true; + c = c->next; + } + sc_cond_broadcast(&sink->cond); sc_mutex_unlock(&sink->mutex); } diff --git a/app/src/stream_sink.h b/app/src/stream_sink.h index 7ac3c5ec..e1aae413 100644 --- a/app/src/stream_sink.h +++ b/app/src/stream_sink.h @@ -18,6 +18,9 @@ struct sc_stream_sink_stream { int64_t last_pts; }; +/* Per-connection client state (defined in stream_sink.c). */ +struct sc_stream_sink_client; + struct sc_stream_sink { struct sc_packet_sink video_packet_sink; struct sc_packet_sink audio_packet_sink; @@ -35,6 +38,8 @@ struct sc_stream_sink { char *url; + // Template format context (no pb): holds stream definitions and codec + // parameters used to initialise a fresh context for each new connection. AVFormatContext *ctx; sc_thread thread; @@ -42,6 +47,9 @@ struct sc_stream_sink { sc_cond cond; // set on sc_stream_sink_stop(), packet_sink close or streaming failure bool stopped; + + // Init-phase queues: used only until template_ready is set. + // After that, each sc_stream_sink_client has its own queues. struct sc_stream_sink_queue video_queue; struct sc_stream_sink_queue audio_queue; @@ -51,8 +59,18 @@ struct sc_stream_sink { bool audio_expects_config_packet; + // Stream indices shared by every per-client AVFormatContext (all clients + // copy the template streams in the same order). struct sc_stream_sink_stream video_stream; struct sc_stream_sink_stream audio_stream; + + // Set to true once codec params + extradata are applied to the template + // context. Before this point packets are buffered in the init-phase queues + // above; after this point they are fanned out to active client queues. + bool template_ready; + + // Linked list of currently active client connections (protected by mutex). + struct sc_stream_sink_client *clients; }; bool From b231df842c17041124a5fbc4b0f47b1342ba5aba Mon Sep 17 00:00:00 2001 From: Yeicor Date: Fri, 13 Mar 2026 11:45:28 +0100 Subject: [PATCH 08/11] Progress --- app/src/stream_sink.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/src/stream_sink.c b/app/src/stream_sink.c index e9cd4b76..61479db4 100644 --- a/app/src/stream_sink.c +++ b/app/src/stream_sink.c @@ -714,7 +714,7 @@ run_stream_sink(void *data) { bool thread_ok = sc_thread_create(&client->thread, run_stream_sink_client, - "scrcpy-stream-client", client); + "scrcpy-sclient", client); if (!thread_ok) { LOGE("Stream sink: could not create client thread"); sc_mutex_lock(&sink->mutex); From 3cb20e2da4137ba92c9bbb7fb4e86eb56fb75c62 Mon Sep 17 00:00:00 2001 From: Yeicor Date: Fri, 13 Mar 2026 13:23:23 +0100 Subject: [PATCH 09/11] Refine stream-sink: SRT workaround and connectionless mode Remove automatic SRT latency=50 injection and simplify CLI help. Avoid calling avio_close() for srt:// to work around SRT/FFmpeg epoll deadlocks; other protocols are closed normally. Treat udp:// as connectionless and use a single output stream instead of accepting per-client threads --- app/src/cli.c | 25 +++-------- app/src/stream_sink.c | 101 +++++++++++++++++++++++++++++++----------- 2 files changed, 81 insertions(+), 45 deletions(-) diff --git a/app/src/cli.c b/app/src/cli.c index 9521a789..8f3c03d9 100644 --- a/app/src/cli.c +++ b/app/src/cli.c @@ -961,25 +961,12 @@ static const struct sc_option options[] = { .longopt_id = OPT_STREAM_SINK, .longopt = "stream-sink", .argdesc = "url", - .text = "Stream the device video (and audio, if enabled) as MPEG-TS " - "to the given URL. Tuned for low-latency live streaming.\n" - "\n" - "Supported protocols and auto-applied server settings:\n" - " srt://HOST:PORT SRT (recommended); adds ?mode=listener " - "and ?latency=50 automatically\n" - " tcp://HOST:PORT raw TCP; adds ?listen=1 automatically\n" - " udp://HOST:PORT UDP (lowest latency, unreliable)\n" - " rtp://HOST:PORT RTP over UDP\n" - "Unknown protocols are used as-is (with a warning).\n" - "\n" - "Low-latency client examples (connect after starting scrcpy):\n" - " ffplay -fflags nobuffer -flags low_delay -framedrop " - "-i srt://127.0.0.1:8080\n" - " ffplay -fflags nobuffer -flags low_delay -framedrop " - "-i tcp://127.0.0.1:8080\n" - " ffplay -fflags nobuffer -flags low_delay -framedrop " - "-i udp://127.0.0.1:8080\n" - " VLC: Media > Open Network Stream > srt://127.0.0.1:8080", + .text = "Stream the device video and audio as MPEG-TS to the given URL.\n" + "Supported protocols are srt, udp and tcp.\n" + "The URL is passed to the FFmpeg muxer, so it may contain " + "additional options (e.g. srt://HOST:PORT?latency=200).\n" + "For faster startup of clients, you may want to set " + "--video-codec-options=i-frame-interval:float=1.0." }, { .longopt_id = OPT_V4L2_SINK, diff --git a/app/src/stream_sink.c b/app/src/stream_sink.c index 61479db4..c9033078 100644 --- a/app/src/stream_sink.c +++ b/app/src/stream_sink.c @@ -99,17 +99,6 @@ sc_stream_sink_build_connect_url(const char *url) { } result = tmp; } - // Keep SRT protocol latency low (default 120 ms is too high for live - // screen mirroring; 50 ms is comfortable for LAN). - // Users on high-latency WAN links can override with e.g. ?latency=200. - if (!sc_url_has_param(result, "latency")) { - char *tmp = sc_url_append_param(result, "latency", "50"); - free(result); - if (!tmp) { - return NULL; - } - result = tmp; - } } else if (is_tcp) { // scrcpy acts as the TCP server, waiting for a player to connect if (!sc_url_has_param(result, "listen")) { @@ -121,11 +110,22 @@ sc_stream_sink_build_connect_url(const char *url) { result = tmp; } } - // udp:// and rtp:// are connectionless; no listener mode needed + // udp:// is connectionless; no listener mode needed return result; } +/** + * Check if a URL uses a connectionless protocol (UDP). + * + * For this protocol, only a single output stream is needed, + * not multiple client connections. + */ +static inline bool +sc_stream_sink_is_connectionless(const char *url) { + return !strncmp(url, "udp://", 6); +} + static AVPacket * sc_stream_sink_packet_ref(const AVPacket *packet) { AVPacket *p = av_packet_alloc(); @@ -562,10 +562,24 @@ run_stream_sink_client(void *data) { sc_stream_sink_client_run_stream(client); - // Close this client's network connection. + // WORKAROUND: SRT epoll deadlock on disconnect + // When closing SRT sockets, FFmpeg's interrupt callback and SRT's internal + // epoll management conflict, causing "no sockets to check, this would deadlock". + // Root cause: FFmpeg may call interrupt_callback during avio_close(), but SRT + // has already removed the socket from epoll, causing state inconsistency. + // TODO: Remove this workaround once SRT/FFmpeg fix the socket lifecycle interaction. + // For now, only skip avio_close() for SRT; other protocols are safe. + bool is_srt = sink->url && !strncmp(sink->url, "srt://", 6); + if (client->ctx->pb) { - avio_close(client->ctx->pb); - client->ctx->pb = NULL; + if (is_srt) { + // SRT workaround: don't call avio_close(), let avformat_free_context() handle it + client->ctx->pb = NULL; + } else { + // Safe for TCP, UDP and other protocols + avio_close(client->ctx->pb); + client->ctx->pb = NULL; + } } // Mark as finished so the accept loop can join and free us. @@ -615,9 +629,12 @@ sc_stream_sink_reap_dead_clients(struct sc_stream_sink *sink) { } /** - * Accept loop: initialises the template context once, then repeatedly accepts - * incoming connections, spawning a per-client thread for each. Runs until - * sink->stopped is set (by sc_stream_sink_stop() or device EOS). + * Main streaming loop: initialises the template context once, then either: + * - For connection-oriented protocols (TCP, SRT): repeatedly accepts incoming + * connections, spawning a per-client thread for each. + * - For connectionless protocols (UDP, RTP): creates a single output stream + * and writes all packets to it directly. + * Runs until sink->stopped is set (by sc_stream_sink_stop() or device EOS). */ // Forward declaration: defined below alongside the other packet-sink callbacks. @@ -633,17 +650,35 @@ run_stream_sink(void *data) { goto stop; } + bool is_connectionless = sc_stream_sink_is_connectionless(sink->url); + char *connect_url = sc_stream_sink_build_connect_url(sink->url); if (!connect_url) { goto stop; } - LOGI("Stream sink: listening for clients on %s", sink->url); + if (is_connectionless) { + LOGI("Stream sink: streaming to %s", connect_url); + } else { + LOGI("Stream sink: listening for clients on %s", connect_url); + } + + bool connectionless_done = false; while (!sink->stopped) { + // For connectionless protocols, only attempt one connection + if (is_connectionless && connectionless_done) { + // Keep the single client thread running; just wait until stopped + sc_mutex_lock(&sink->mutex); + while (!sink->stopped) { + sc_cond_wait(&sink->cond, &sink->mutex); + } + sc_mutex_unlock(&sink->mutex); + break; + } + // Reap any client threads that finished since the last iteration. sc_stream_sink_reap_dead_clients(sink); - AVIOInterruptCB int_cb = { .callback = sc_stream_sink_interrupt_cb, .opaque = sink, @@ -673,7 +708,10 @@ run_stream_sink(void *data) { calloc(1, sizeof(struct sc_stream_sink_client)); if (!client) { LOG_OOM(); - avio_close(client_ctx->pb); + bool is_srt = sink->url && !strncmp(sink->url, "srt://", 6); + if (!is_srt) { + avio_close(client_ctx->pb); + } client_ctx->pb = NULL; avformat_free_context(client_ctx); continue; @@ -698,8 +736,7 @@ run_stream_sink(void *data) { // Write the MPEG-TS stream header for this client. if (avformat_write_header(client_ctx, NULL) < 0) { LOGE("Stream sink: failed to write stream header to client"); - avio_close(client_ctx->pb); - client_ctx->pb = NULL; + client_ctx->pb = NULL; // Don't avio_close() - causes SRT epoll issues avformat_free_context(client_ctx); free(client); continue; @@ -723,9 +760,11 @@ run_stream_sink(void *data) { sc_stream_sink_queue_clear(&client->video_queue); sc_stream_sink_queue_clear(&client->audio_queue); sc_mutex_unlock(&sink->mutex); - // avformat_write_header already moved pb ownership; close it. if (client_ctx->pb) { - avio_close(client_ctx->pb); + bool is_srt = sink->url && !strncmp(sink->url, "srt://", 6); + if (!is_srt) { + avio_close(client_ctx->pb); + } client_ctx->pb = NULL; } avformat_free_context(client_ctx); @@ -736,6 +775,13 @@ run_stream_sink(void *data) { } LOGI("Stream sink: client connected on %s", sink->url); + + if (is_connectionless) { + // For connectionless protocols (UDP, RTP), we only need a single + // stream. Mark it as done and the loop will now wait instead of + // trying to accept new connections. + connectionless_done = true; + } } free(connect_url); @@ -762,7 +808,10 @@ stop: struct sc_stream_sink_client *next = head->next; sc_thread_join(&head->thread, NULL); if (head->ctx->pb) { - avio_close(head->ctx->pb); + bool is_srt = sink->url && !strncmp(sink->url, "srt://", 6); + if (!is_srt) { + avio_close(head->ctx->pb); + } head->ctx->pb = NULL; } avformat_free_context(head->ctx); From cf76a86c937c04cb041c6d9a848567903c1abe8b Mon Sep 17 00:00:00 2001 From: Yeicor <4929005+yeicor@users.noreply.github.com> Date: Fri, 13 Mar 2026 14:13:42 +0100 Subject: [PATCH 10/11] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- app/src/stream_sink.c | 1 + 1 file changed, 1 insertion(+) diff --git a/app/src/stream_sink.c b/app/src/stream_sink.c index c9033078..ceced111 100644 --- a/app/src/stream_sink.c +++ b/app/src/stream_sink.c @@ -2,6 +2,7 @@ #include #include +#include #include #include #include From 84d199dc78ba19d11dddfa9ef98d71336520e574 Mon Sep 17 00:00:00 2001 From: Yeicor Date: Fri, 13 Mar 2026 14:22:29 +0100 Subject: [PATCH 11/11] Potential fix for pull request remaining findings --- app/src/stream_sink.c | 62 ++++++++++++++++++++++++++++--------------- app/src/stream_sink.h | 3 ++- 2 files changed, 42 insertions(+), 23 deletions(-) diff --git a/app/src/stream_sink.c b/app/src/stream_sink.c index ceced111..58e09f9e 100644 --- a/app/src/stream_sink.c +++ b/app/src/stream_sink.c @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -63,8 +64,7 @@ sc_url_append_param(const char *url, const char *key, const char *value) { * Build the connect URL for the stream sink. * * For known protocols: - * - srt:// adds ?mode=listener and ?latency=50 (ms) if not already set - * (override with ?latency=200 or higher for WAN links) + * - srt:// adds ?mode=listener if not already set * - tcp:// adds ?listen=1 if not already set * - udp://, rtp:// connectionless; returned as-is * Unknown protocols emit a warning and are returned as-is. @@ -117,14 +117,15 @@ sc_stream_sink_build_connect_url(const char *url) { } /** - * Check if a URL uses a connectionless protocol (UDP). + * Check if a URL uses a connectionless protocol (UDP/RTP). * * For this protocol, only a single output stream is needed, * not multiple client connections. */ static inline bool sc_stream_sink_is_connectionless(const char *url) { - return !strncmp(url, "udp://", 6); + return !strncmp(url, "udp://", 6) + || !strncmp(url, "rtp://", 6); } static AVPacket * @@ -180,7 +181,7 @@ struct sc_stream_sink_client { sc_thread thread; /** Request the write loop to stop (protected by sink->mutex). */ - bool stopped; + atomic_bool stopped; /** The thread has exited; safe to join (protected by sink->mutex). */ bool finished; @@ -275,16 +276,22 @@ static int sc_stream_sink_interrupt_cb(void *data) { struct sc_stream_sink *sink = data; // Read without mutex: this is intentional (same pattern as interrupt - // callbacks in other parts of the codebase) - return sink->stopped ? 1 : 0; + // callbacks in other parts of the codebase), but use atomic load to + // avoid data races with writers from other threads. + return atomic_load_explicit(&sink->stopped, memory_order_relaxed) ? 1 : 0; } /** Interrupt callback for a per-client AVFormatContext. */ static int sc_stream_sink_client_interrupt_cb(void *data) { struct sc_stream_sink_client *client = data; - // Read without mutex: intentional (benign race; same pattern as above) - return (client->stopped || client->sink->stopped) ? 1 : 0; + // Read without mutex: use atomic loads to avoid data races; do not + // take a blocking lock in this interrupt callback. + bool client_stopped = + atomic_load_explicit(&client->stopped, memory_order_relaxed); + bool sink_stopped = + atomic_load_explicit(&client->sink->stopped, memory_order_relaxed); + return (client_stopped || sink_stopped) ? 1 : 0; } static inline bool @@ -574,8 +581,8 @@ run_stream_sink_client(void *data) { if (client->ctx->pb) { if (is_srt) { - // SRT workaround: don't call avio_close(), let avformat_free_context() handle it - client->ctx->pb = NULL; + // SRT workaround: don't call avio_close() here; let avformat_free_context() + // close and free the AVIOContext later, so resources are not leaked. } else { // Safe for TCP, UDP and other protocols avio_close(client->ctx->pb); @@ -719,7 +726,7 @@ run_stream_sink(void *data) { } client->sink = sink; client->ctx = client_ctx; - client->stopped = false; + atomic_store(&client->stopped, false); client->finished = false; sc_vecdeque_init(&client->video_queue); sc_vecdeque_init(&client->audio_queue); @@ -737,7 +744,13 @@ run_stream_sink(void *data) { // Write the MPEG-TS stream header for this client. if (avformat_write_header(client_ctx, NULL) < 0) { LOGE("Stream sink: failed to write stream header to client"); - client_ctx->pb = NULL; // Don't avio_close() - causes SRT epoll issues + if (client_ctx->pb) { + bool is_srt = sink->url && !strncmp(sink->url, "srt://", 6); + if (!is_srt) { + avio_close(client_ctx->pb); + } + client_ctx->pb = NULL; + } avformat_free_context(client_ctx); free(client); continue; @@ -790,10 +803,10 @@ run_stream_sink(void *data) { stop: // Stop and drain all active clients. sc_mutex_lock(&sink->mutex); - sink->stopped = true; + atomic_store(&sink->stopped, true); struct sc_stream_sink_client *c = sink->clients; while (c) { - c->stopped = true; + atomic_store(&c->stopped, true); c = c->next; } sc_cond_broadcast(&sink->cond); @@ -875,10 +888,10 @@ sc_stream_sink_video_packet_sink_close(struct sc_packet_sink *sink) { sc_mutex_lock(&ss->mutex); // EOS also stops the stream sink - ss->stopped = true; + atomic_store(&ss->stopped, true); struct sc_stream_sink_client *c = ss->clients; while (c) { - c->stopped = true; + atomic_store(&c->stopped, true); c = c->next; } sc_cond_broadcast(&ss->cond); @@ -989,10 +1002,10 @@ sc_stream_sink_audio_packet_sink_close(struct sc_packet_sink *sink) { sc_mutex_lock(&ss->mutex); // EOS also stops the stream sink - ss->stopped = true; + atomic_store(&ss->stopped, true); struct sc_stream_sink_client *c = ss->clients; while (c) { - c->stopped = true; + atomic_store(&c->stopped, true); c = c->next; } sc_cond_broadcast(&ss->cond); @@ -1095,7 +1108,7 @@ sc_stream_sink_init(struct sc_stream_sink *sink, const char *url, goto error_mutex_destroy; } - sink->stopped = false; + atomic_store(&sink->stopped, false); sink->template_ready = false; sink->clients = NULL; @@ -1186,11 +1199,11 @@ sc_stream_sink_start(struct sc_stream_sink *sink) { void sc_stream_sink_stop(struct sc_stream_sink *sink) { sc_mutex_lock(&sink->mutex); - sink->stopped = true; + atomic_store(&sink->stopped, true); // Also stop all active clients so their I/O is interrupted promptly. struct sc_stream_sink_client *c = sink->clients; while (c) { - c->stopped = true; + atomic_store(&c->stopped, true); c = c->next; } sc_cond_broadcast(&sink->cond); @@ -1204,6 +1217,11 @@ sc_stream_sink_join(struct sc_stream_sink *sink) { void sc_stream_sink_destroy(struct sc_stream_sink *sink) { + // The sink thread must be joined before destroying the queues. + sc_stream_sink_queue_clear(&sink->video_queue); + sc_vecdeque_destroy(&sink->video_queue); + sc_stream_sink_queue_clear(&sink->audio_queue); + sc_vecdeque_destroy(&sink->audio_queue); sc_cond_destroy(&sink->cond); sc_mutex_destroy(&sink->mutex); avformat_free_context(sink->ctx); diff --git a/app/src/stream_sink.h b/app/src/stream_sink.h index e1aae413..9a8067ec 100644 --- a/app/src/stream_sink.h +++ b/app/src/stream_sink.h @@ -4,6 +4,7 @@ #include "common.h" #include +#include #include #include @@ -46,7 +47,7 @@ struct sc_stream_sink { sc_mutex mutex; sc_cond cond; // set on sc_stream_sink_stop(), packet_sink close or streaming failure - bool stopped; + atomic_bool stopped; // Init-phase queues: used only until template_ready is set. // After that, each sc_stream_sink_client has its own queues.