Potential fix for pull request remaining findings

This commit is contained in:
Yeicor 2026-03-13 14:22:29 +01:00
parent cf76a86c93
commit 84d199dc78
2 changed files with 42 additions and 23 deletions

View file

@ -5,6 +5,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdatomic.h>
#include <libavcodec/avcodec.h>
#include <libavformat/avformat.h>
#include <libavutil/time.h>
@ -63,8 +64,7 @@ sc_url_append_param(const char *url, const char *key, const char *value) {
* Build the connect URL for the stream sink.
*
* For known protocols:
* - srt:// adds ?mode=listener and ?latency=50 (ms) if not already set
* (override with ?latency=200 or higher for WAN links)
* - srt:// adds ?mode=listener if not already set
* - tcp:// adds ?listen=1 if not already set
* - udp://, rtp:// connectionless; returned as-is
* Unknown protocols emit a warning and are returned as-is.
@ -117,14 +117,15 @@ sc_stream_sink_build_connect_url(const char *url) {
}
/**
* Check if a URL uses a connectionless protocol (UDP).
* Check if a URL uses a connectionless protocol (UDP/RTP).
*
* For this protocol, only a single output stream is needed,
* not multiple client connections.
*/
static inline bool
sc_stream_sink_is_connectionless(const char *url) {
return !strncmp(url, "udp://", 6);
return !strncmp(url, "udp://", 6)
|| !strncmp(url, "rtp://", 6);
}
static AVPacket *
@ -180,7 +181,7 @@ struct sc_stream_sink_client {
sc_thread thread;
/** Request the write loop to stop (protected by sink->mutex). */
bool stopped;
atomic_bool stopped;
/** The thread has exited; safe to join (protected by sink->mutex). */
bool finished;
@ -275,16 +276,22 @@ static int
sc_stream_sink_interrupt_cb(void *data) {
struct sc_stream_sink *sink = data;
// Read without mutex: this is intentional (same pattern as interrupt
// callbacks in other parts of the codebase)
return sink->stopped ? 1 : 0;
// callbacks in other parts of the codebase), but use atomic load to
// avoid data races with writers from other threads.
return atomic_load_explicit(&sink->stopped, memory_order_relaxed) ? 1 : 0;
}
/** Interrupt callback for a per-client AVFormatContext. */
static int
sc_stream_sink_client_interrupt_cb(void *data) {
struct sc_stream_sink_client *client = data;
// Read without mutex: intentional (benign race; same pattern as above)
return (client->stopped || client->sink->stopped) ? 1 : 0;
// Read without mutex: use atomic loads to avoid data races; do not
// take a blocking lock in this interrupt callback.
bool client_stopped =
atomic_load_explicit(&client->stopped, memory_order_relaxed);
bool sink_stopped =
atomic_load_explicit(&client->sink->stopped, memory_order_relaxed);
return (client_stopped || sink_stopped) ? 1 : 0;
}
static inline bool
@ -574,8 +581,8 @@ run_stream_sink_client(void *data) {
if (client->ctx->pb) {
if (is_srt) {
// SRT workaround: don't call avio_close(), let avformat_free_context() handle it
client->ctx->pb = NULL;
// SRT workaround: don't call avio_close() here; let avformat_free_context()
// close and free the AVIOContext later, so resources are not leaked.
} else {
// Safe for TCP, UDP and other protocols
avio_close(client->ctx->pb);
@ -719,7 +726,7 @@ run_stream_sink(void *data) {
}
client->sink = sink;
client->ctx = client_ctx;
client->stopped = false;
atomic_store(&client->stopped, false);
client->finished = false;
sc_vecdeque_init(&client->video_queue);
sc_vecdeque_init(&client->audio_queue);
@ -737,7 +744,13 @@ run_stream_sink(void *data) {
// Write the MPEG-TS stream header for this client.
if (avformat_write_header(client_ctx, NULL) < 0) {
LOGE("Stream sink: failed to write stream header to client");
client_ctx->pb = NULL; // Don't avio_close() - causes SRT epoll issues
if (client_ctx->pb) {
bool is_srt = sink->url && !strncmp(sink->url, "srt://", 6);
if (!is_srt) {
avio_close(client_ctx->pb);
}
client_ctx->pb = NULL;
}
avformat_free_context(client_ctx);
free(client);
continue;
@ -790,10 +803,10 @@ run_stream_sink(void *data) {
stop:
// Stop and drain all active clients.
sc_mutex_lock(&sink->mutex);
sink->stopped = true;
atomic_store(&sink->stopped, true);
struct sc_stream_sink_client *c = sink->clients;
while (c) {
c->stopped = true;
atomic_store(&c->stopped, true);
c = c->next;
}
sc_cond_broadcast(&sink->cond);
@ -875,10 +888,10 @@ sc_stream_sink_video_packet_sink_close(struct sc_packet_sink *sink) {
sc_mutex_lock(&ss->mutex);
// EOS also stops the stream sink
ss->stopped = true;
atomic_store(&ss->stopped, true);
struct sc_stream_sink_client *c = ss->clients;
while (c) {
c->stopped = true;
atomic_store(&c->stopped, true);
c = c->next;
}
sc_cond_broadcast(&ss->cond);
@ -989,10 +1002,10 @@ sc_stream_sink_audio_packet_sink_close(struct sc_packet_sink *sink) {
sc_mutex_lock(&ss->mutex);
// EOS also stops the stream sink
ss->stopped = true;
atomic_store(&ss->stopped, true);
struct sc_stream_sink_client *c = ss->clients;
while (c) {
c->stopped = true;
atomic_store(&c->stopped, true);
c = c->next;
}
sc_cond_broadcast(&ss->cond);
@ -1095,7 +1108,7 @@ sc_stream_sink_init(struct sc_stream_sink *sink, const char *url,
goto error_mutex_destroy;
}
sink->stopped = false;
atomic_store(&sink->stopped, false);
sink->template_ready = false;
sink->clients = NULL;
@ -1186,11 +1199,11 @@ sc_stream_sink_start(struct sc_stream_sink *sink) {
void
sc_stream_sink_stop(struct sc_stream_sink *sink) {
sc_mutex_lock(&sink->mutex);
sink->stopped = true;
atomic_store(&sink->stopped, true);
// Also stop all active clients so their I/O is interrupted promptly.
struct sc_stream_sink_client *c = sink->clients;
while (c) {
c->stopped = true;
atomic_store(&c->stopped, true);
c = c->next;
}
sc_cond_broadcast(&sink->cond);
@ -1204,6 +1217,11 @@ sc_stream_sink_join(struct sc_stream_sink *sink) {
void
sc_stream_sink_destroy(struct sc_stream_sink *sink) {
// The sink thread must be joined before destroying the queues.
sc_stream_sink_queue_clear(&sink->video_queue);
sc_vecdeque_destroy(&sink->video_queue);
sc_stream_sink_queue_clear(&sink->audio_queue);
sc_vecdeque_destroy(&sink->audio_queue);
sc_cond_destroy(&sink->cond);
sc_mutex_destroy(&sink->mutex);
avformat_free_context(sink->ctx);

View file

@ -4,6 +4,7 @@
#include "common.h"
#include <stdbool.h>
#include <stdatomic.h>
#include <libavcodec/packet.h>
#include <libavformat/avformat.h>
@ -46,7 +47,7 @@ struct sc_stream_sink {
sc_mutex mutex;
sc_cond cond;
// set on sc_stream_sink_stop(), packet_sink close or streaming failure
bool stopped;
atomic_bool stopped;
// Init-phase queues: used only until template_ready is set.
// After that, each sc_stream_sink_client has its own queues.