From 84d199dc78ba19d11dddfa9ef98d71336520e574 Mon Sep 17 00:00:00 2001 From: Yeicor Date: Fri, 13 Mar 2026 14:22:29 +0100 Subject: [PATCH] Potential fix for pull request remaining findings --- app/src/stream_sink.c | 62 ++++++++++++++++++++++++++++--------------- app/src/stream_sink.h | 3 ++- 2 files changed, 42 insertions(+), 23 deletions(-) diff --git a/app/src/stream_sink.c b/app/src/stream_sink.c index ceced111..58e09f9e 100644 --- a/app/src/stream_sink.c +++ b/app/src/stream_sink.c @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -63,8 +64,7 @@ sc_url_append_param(const char *url, const char *key, const char *value) { * Build the connect URL for the stream sink. * * For known protocols: - * - srt:// adds ?mode=listener and ?latency=50 (ms) if not already set - * (override with ?latency=200 or higher for WAN links) + * - srt:// adds ?mode=listener if not already set * - tcp:// adds ?listen=1 if not already set * - udp://, rtp:// connectionless; returned as-is * Unknown protocols emit a warning and are returned as-is. @@ -117,14 +117,15 @@ sc_stream_sink_build_connect_url(const char *url) { } /** - * Check if a URL uses a connectionless protocol (UDP). + * Check if a URL uses a connectionless protocol (UDP/RTP). * * For this protocol, only a single output stream is needed, * not multiple client connections. */ static inline bool sc_stream_sink_is_connectionless(const char *url) { - return !strncmp(url, "udp://", 6); + return !strncmp(url, "udp://", 6) + || !strncmp(url, "rtp://", 6); } static AVPacket * @@ -180,7 +181,7 @@ struct sc_stream_sink_client { sc_thread thread; /** Request the write loop to stop (protected by sink->mutex). */ - bool stopped; + atomic_bool stopped; /** The thread has exited; safe to join (protected by sink->mutex). */ bool finished; @@ -275,16 +276,22 @@ static int sc_stream_sink_interrupt_cb(void *data) { struct sc_stream_sink *sink = data; // Read without mutex: this is intentional (same pattern as interrupt - // callbacks in other parts of the codebase) - return sink->stopped ? 1 : 0; + // callbacks in other parts of the codebase), but use atomic load to + // avoid data races with writers from other threads. + return atomic_load_explicit(&sink->stopped, memory_order_relaxed) ? 1 : 0; } /** Interrupt callback for a per-client AVFormatContext. */ static int sc_stream_sink_client_interrupt_cb(void *data) { struct sc_stream_sink_client *client = data; - // Read without mutex: intentional (benign race; same pattern as above) - return (client->stopped || client->sink->stopped) ? 1 : 0; + // Read without mutex: use atomic loads to avoid data races; do not + // take a blocking lock in this interrupt callback. + bool client_stopped = + atomic_load_explicit(&client->stopped, memory_order_relaxed); + bool sink_stopped = + atomic_load_explicit(&client->sink->stopped, memory_order_relaxed); + return (client_stopped || sink_stopped) ? 1 : 0; } static inline bool @@ -574,8 +581,8 @@ run_stream_sink_client(void *data) { if (client->ctx->pb) { if (is_srt) { - // SRT workaround: don't call avio_close(), let avformat_free_context() handle it - client->ctx->pb = NULL; + // SRT workaround: don't call avio_close() here; let avformat_free_context() + // close and free the AVIOContext later, so resources are not leaked. } else { // Safe for TCP, UDP and other protocols avio_close(client->ctx->pb); @@ -719,7 +726,7 @@ run_stream_sink(void *data) { } client->sink = sink; client->ctx = client_ctx; - client->stopped = false; + atomic_store(&client->stopped, false); client->finished = false; sc_vecdeque_init(&client->video_queue); sc_vecdeque_init(&client->audio_queue); @@ -737,7 +744,13 @@ run_stream_sink(void *data) { // Write the MPEG-TS stream header for this client. if (avformat_write_header(client_ctx, NULL) < 0) { LOGE("Stream sink: failed to write stream header to client"); - client_ctx->pb = NULL; // Don't avio_close() - causes SRT epoll issues + if (client_ctx->pb) { + bool is_srt = sink->url && !strncmp(sink->url, "srt://", 6); + if (!is_srt) { + avio_close(client_ctx->pb); + } + client_ctx->pb = NULL; + } avformat_free_context(client_ctx); free(client); continue; @@ -790,10 +803,10 @@ run_stream_sink(void *data) { stop: // Stop and drain all active clients. sc_mutex_lock(&sink->mutex); - sink->stopped = true; + atomic_store(&sink->stopped, true); struct sc_stream_sink_client *c = sink->clients; while (c) { - c->stopped = true; + atomic_store(&c->stopped, true); c = c->next; } sc_cond_broadcast(&sink->cond); @@ -875,10 +888,10 @@ sc_stream_sink_video_packet_sink_close(struct sc_packet_sink *sink) { sc_mutex_lock(&ss->mutex); // EOS also stops the stream sink - ss->stopped = true; + atomic_store(&ss->stopped, true); struct sc_stream_sink_client *c = ss->clients; while (c) { - c->stopped = true; + atomic_store(&c->stopped, true); c = c->next; } sc_cond_broadcast(&ss->cond); @@ -989,10 +1002,10 @@ sc_stream_sink_audio_packet_sink_close(struct sc_packet_sink *sink) { sc_mutex_lock(&ss->mutex); // EOS also stops the stream sink - ss->stopped = true; + atomic_store(&ss->stopped, true); struct sc_stream_sink_client *c = ss->clients; while (c) { - c->stopped = true; + atomic_store(&c->stopped, true); c = c->next; } sc_cond_broadcast(&ss->cond); @@ -1095,7 +1108,7 @@ sc_stream_sink_init(struct sc_stream_sink *sink, const char *url, goto error_mutex_destroy; } - sink->stopped = false; + atomic_store(&sink->stopped, false); sink->template_ready = false; sink->clients = NULL; @@ -1186,11 +1199,11 @@ sc_stream_sink_start(struct sc_stream_sink *sink) { void sc_stream_sink_stop(struct sc_stream_sink *sink) { sc_mutex_lock(&sink->mutex); - sink->stopped = true; + atomic_store(&sink->stopped, true); // Also stop all active clients so their I/O is interrupted promptly. struct sc_stream_sink_client *c = sink->clients; while (c) { - c->stopped = true; + atomic_store(&c->stopped, true); c = c->next; } sc_cond_broadcast(&sink->cond); @@ -1204,6 +1217,11 @@ sc_stream_sink_join(struct sc_stream_sink *sink) { void sc_stream_sink_destroy(struct sc_stream_sink *sink) { + // The sink thread must be joined before destroying the queues. + sc_stream_sink_queue_clear(&sink->video_queue); + sc_vecdeque_destroy(&sink->video_queue); + sc_stream_sink_queue_clear(&sink->audio_queue); + sc_vecdeque_destroy(&sink->audio_queue); sc_cond_destroy(&sink->cond); sc_mutex_destroy(&sink->mutex); avformat_free_context(sink->ctx); diff --git a/app/src/stream_sink.h b/app/src/stream_sink.h index e1aae413..9a8067ec 100644 --- a/app/src/stream_sink.h +++ b/app/src/stream_sink.h @@ -4,6 +4,7 @@ #include "common.h" #include +#include #include #include @@ -46,7 +47,7 @@ struct sc_stream_sink { sc_mutex mutex; sc_cond cond; // set on sc_stream_sink_stop(), packet_sink close or streaming failure - bool stopped; + atomic_bool stopped; // Init-phase queues: used only until template_ready is set. // After that, each sc_stream_sink_client has its own queues.