stream-sink: reconnect + concurrent multi-client support

Co-authored-by: yeicor <4929005+yeicor@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot] 2026-03-13 10:35:50 +00:00
parent 930b7c5a5f
commit 243f0fd568
2 changed files with 483 additions and 171 deletions

View file

@ -18,6 +18,9 @@
static const AVRational SCRCPY_TIME_BASE = {1, 1000000}; // timestamps in us
/** Arbitrary duration assigned to the last video packet (0.1 s in µs). */
#define LAST_VIDEO_PACKET_DURATION_US 100000
/** Return true if `key=` appears in the URL's query string. */
static bool
sc_url_has_param(const char *url, const char *key) {
@ -163,16 +166,85 @@ sc_stream_sink_set_extradata(AVStream *ostream, const AVPacket *packet) {
return true;
}
/**
* Per-connection state for one active streaming client.
*
* All fields that require synchronisation are protected by the parent
* sc_stream_sink::mutex / sc_stream_sink::cond. Fields only accessed from
* the client's own thread (video/audio stream PTS tracking) need no locking.
*/
struct sc_stream_sink_client {
struct sc_stream_sink *sink; /**< back-pointer to parent */
AVFormatContext *ctx; /**< own output context (with pb) */
sc_thread thread;
/** Request the write loop to stop (protected by sink->mutex). */
bool stopped;
/** The thread has exited; safe to join (protected by sink->mutex). */
bool finished;
/** Per-client packet queues (protected by sink->mutex). */
struct sc_stream_sink_queue video_queue;
struct sc_stream_sink_queue audio_queue;
/** PTS tracking only accessed from the client thread, no mutex needed. */
struct sc_stream_sink_stream video_stream;
struct sc_stream_sink_stream audio_stream;
/** Intrusive linked list, protected by sink->mutex. */
struct sc_stream_sink_client *next;
};
/**
* Allocate a new AVFormatContext initialised from the template context.
*
* Copies all streams and their codec parameters (including extradata) so that
* the returned context is ready to have a pb attached and a stream header
* written. Returns NULL on allocation failure.
*/
static AVFormatContext *
sc_stream_sink_create_client_ctx(const struct sc_stream_sink *sink) {
const AVOutputFormat *oformat = av_guess_format("mpegts", NULL, NULL);
assert(oformat); // already verified in sc_stream_sink_init
AVFormatContext *ctx = avformat_alloc_context();
if (!ctx) {
LOG_OOM();
return NULL;
}
// See sc_stream_sink_init for the rationale behind these flags.
ctx->oformat = (AVOutputFormat *) oformat;
ctx->flags |= AVFMT_FLAG_FLUSH_PACKETS;
ctx->max_interleave_delta = AV_TIME_BASE / 10; // 100 ms
for (unsigned i = 0; i < sink->ctx->nb_streams; i++) {
AVStream *src = sink->ctx->streams[i];
AVStream *dst = avformat_new_stream(ctx, NULL);
if (!dst) {
avformat_free_context(ctx);
return NULL;
}
if (avcodec_parameters_copy(dst->codecpar, src->codecpar) < 0) {
avformat_free_context(ctx);
return NULL;
}
dst->time_base = src->time_base;
}
return ctx;
}
static inline void
sc_stream_sink_rescale_packet(AVStream *stream, AVPacket *packet) {
av_packet_rescale_ts(packet, SCRCPY_TIME_BASE, stream->time_base);
}
static bool
sc_stream_sink_write_stream(struct sc_stream_sink *sink,
sc_stream_sink_write_stream(struct sc_stream_sink_client *client,
struct sc_stream_sink_stream *st,
AVPacket *packet) {
AVStream *stream = sink->ctx->streams[st->index];
AVStream *stream = client->ctx->streams[st->index];
sc_stream_sink_rescale_packet(stream, packet);
if (st->last_pts != AV_NOPTS_VALUE && packet->pts <= st->last_pts) {
LOGD("Fixing PTS non monotonically increasing in stream %d "
@ -183,17 +255,19 @@ sc_stream_sink_write_stream(struct sc_stream_sink *sink,
} else {
st->last_pts = packet->pts;
}
return av_interleaved_write_frame(sink->ctx, packet) >= 0;
return av_interleaved_write_frame(client->ctx, packet) >= 0;
}
static inline bool
sc_stream_sink_write_video(struct sc_stream_sink *sink, AVPacket *packet) {
return sc_stream_sink_write_stream(sink, &sink->video_stream, packet);
sc_stream_sink_write_video(struct sc_stream_sink_client *client,
AVPacket *packet) {
return sc_stream_sink_write_stream(client, &client->video_stream, packet);
}
static inline bool
sc_stream_sink_write_audio(struct sc_stream_sink *sink, AVPacket *packet) {
return sc_stream_sink_write_stream(sink, &sink->audio_stream, packet);
sc_stream_sink_write_audio(struct sc_stream_sink_client *client,
AVPacket *packet) {
return sc_stream_sink_write_stream(client, &client->audio_stream, packet);
}
static int
@ -204,6 +278,14 @@ sc_stream_sink_interrupt_cb(void *data) {
return sink->stopped ? 1 : 0;
}
/** Interrupt callback for a per-client AVFormatContext. */
static int
sc_stream_sink_client_interrupt_cb(void *data) {
struct sc_stream_sink_client *client = data;
// Read without mutex: intentional (benign race; same pattern as above)
return (client->stopped || client->sink->stopped) ? 1 : 0;
}
static inline bool
sc_stream_sink_must_wait_for_config_packets(struct sc_stream_sink *sink) {
if (sink->video && sc_vecdeque_is_empty(&sink->video_queue)) {
@ -221,8 +303,17 @@ sc_stream_sink_must_wait_for_config_packets(struct sc_stream_sink *sink) {
return false;
}
/**
* Wait for codec initialisation and the first config packets, then apply the
* codec parameters and extradata to the template AVFormatContext
* (sink->ctx). Does NOT open any network connection.
*
* On success, sink->template_ready is set to true and the init-phase queues
* are cleared. Returns false only when the sink is stopped before init
* completes.
*/
static bool
sc_stream_sink_process_header(struct sc_stream_sink *sink) {
sc_stream_sink_init_template(struct sc_stream_sink *sink) {
sc_mutex_lock(&sink->mutex);
while (!sink->stopped &&
@ -234,8 +325,6 @@ sc_stream_sink_process_header(struct sc_stream_sink *sink) {
if (sink->video && sc_vecdeque_is_empty(&sink->video_queue)) {
assert(sink->stopped);
// If the stream sink is stopped, don't process anything if there are
// not at least video packets
sc_mutex_unlock(&sink->mutex);
return false;
}
@ -285,69 +374,45 @@ sc_stream_sink_process_header(struct sc_stream_sink *sink) {
}
}
{
char *connect_url = sc_stream_sink_build_connect_url(sink->url);
if (!connect_url) {
goto end;
}
AVIOInterruptCB int_cb = {
.callback = sc_stream_sink_interrupt_cb,
.opaque = sink,
};
LOGI("Stream sink: waiting for client on %s", sink->url);
int r = avio_open2(&sink->ctx->pb, connect_url, AVIO_FLAG_WRITE,
&int_cb, NULL);
free(connect_url);
if (r < 0) {
if (!sink->stopped) {
LOGE("Failed to open stream server on %s", sink->url);
}
goto end;
}
}
{
bool ok = avformat_write_header(sink->ctx, NULL) >= 0;
if (!ok) {
LOGE("Failed to write stream header");
avio_close(sink->ctx->pb);
sink->ctx->pb = NULL;
goto end;
}
}
ret = true;
end:
if (video_pkt) {
av_packet_free(&video_pkt);
}
if (audio_pkt) {
av_packet_free(&audio_pkt);
av_packet_free(&video_pkt);
av_packet_free(&audio_pkt);
if (ret) {
// Atomically mark the template as ready and discard any packets that
// accumulated in the init-phase queues during the wait above.
// From this point push() fans out directly to per-client queues.
sc_mutex_lock(&sink->mutex);
sink->template_ready = true;
sc_stream_sink_queue_clear(&sink->video_queue);
sc_stream_sink_queue_clear(&sink->audio_queue);
sc_mutex_unlock(&sink->mutex);
}
return ret;
}
/**
* Per-client packet write loop.
*
* Dequeues packets from the client's own video/audio queues (which receive
* fan-out copies from the push callbacks) and writes them to client->ctx->pb.
* Returns false only when a write error occurs (i.e. the client disconnected);
* returns true when stopped cleanly.
*/
static bool
sc_stream_sink_process_packets(struct sc_stream_sink *sink) {
sc_stream_sink_client_run_stream(struct sc_stream_sink_client *client) {
struct sc_stream_sink *sink = client->sink;
int64_t pts_origin = AV_NOPTS_VALUE;
bool header_written = sc_stream_sink_process_header(sink);
if (!header_written) {
return false;
}
LOGI("Stream sink: streaming started on %s", sink->url);
AVPacket *video_pkt = NULL;
AVPacket *audio_pkt = NULL;
// We can write a video packet only once we received the next one so that
// we can set its duration (next_pts - current_pts)
// Buffer the previous video packet until the next one arrives so we can
// compute its duration.
AVPacket *video_pkt_previous = NULL;
bool error = false;
@ -355,55 +420,43 @@ sc_stream_sink_process_packets(struct sc_stream_sink *sink) {
for (;;) {
sc_mutex_lock(&sink->mutex);
while (!sink->stopped) {
while (!client->stopped) {
if (sink->video && !video_pkt &&
!sc_vecdeque_is_empty(&sink->video_queue)) {
// A new packet may be assigned to video_pkt and be processed
!sc_vecdeque_is_empty(&client->video_queue)) {
break;
}
if (sink->audio && !audio_pkt
&& !sc_vecdeque_is_empty(&sink->audio_queue)) {
// A new packet may be assigned to audio_pkt and be processed
if (sink->audio && !audio_pkt &&
!sc_vecdeque_is_empty(&client->audio_queue)) {
break;
}
sc_cond_wait(&sink->cond, &sink->mutex);
}
// If stopped is set, continue to process the remaining events (to
// finish the streaming) before actually stopping.
// client->stopped may now be set; drain remaining packets before exit.
// If there is no video, then the video_queue will remain empty forever
// and video_pkt will always be NULL.
assert(sink->video || (!video_pkt
&& sc_vecdeque_is_empty(&sink->video_queue)));
// If there is no audio, then the audio_queue will remain empty forever
// and audio_pkt will always be NULL.
&& sc_vecdeque_is_empty(&client->video_queue)));
assert(sink->audio || (!audio_pkt
&& sc_vecdeque_is_empty(&sink->audio_queue)));
&& sc_vecdeque_is_empty(&client->audio_queue)));
if (!video_pkt && !sc_vecdeque_is_empty(&sink->video_queue)) {
video_pkt = sc_vecdeque_pop(&sink->video_queue);
if (!video_pkt && !sc_vecdeque_is_empty(&client->video_queue)) {
video_pkt = sc_vecdeque_pop(&client->video_queue);
}
if (!audio_pkt && !sc_vecdeque_is_empty(&sink->audio_queue)) {
audio_pkt = sc_vecdeque_pop(&sink->audio_queue);
if (!audio_pkt && !sc_vecdeque_is_empty(&client->audio_queue)) {
audio_pkt = sc_vecdeque_pop(&client->audio_queue);
}
if (sink->stopped && !video_pkt && !audio_pkt) {
assert(sc_vecdeque_is_empty(&sink->video_queue));
assert(sc_vecdeque_is_empty(&sink->audio_queue));
if (client->stopped && !video_pkt && !audio_pkt) {
sc_mutex_unlock(&sink->mutex);
break;
}
assert(video_pkt || audio_pkt); // at least one
assert(video_pkt || audio_pkt);
sc_mutex_unlock(&sink->mutex);
// Ignore further config packets (e.g. on device orientation
// change). The next non-config packet will have the config packet
// data prepended.
// Discard further config packets (e.g. on device orientation change).
if (video_pkt && video_pkt->pts == AV_NOPTS_VALUE) {
av_packet_free(&video_pkt);
video_pkt = NULL;
@ -423,18 +476,15 @@ sc_stream_sink_process_packets(struct sc_stream_sink *sink) {
pts_origin = audio_pkt->pts;
} else if (video_pkt && audio_pkt) {
pts_origin = MIN(video_pkt->pts, audio_pkt->pts);
} else if (sink->stopped) {
} else if (client->stopped) {
if (video_pkt) {
// The sink is stopped without audio, stream the video
// packets
pts_origin = video_pkt->pts;
} else {
// Fail if there is no video
error = true;
// Stopped without any usable video: nothing to stream.
goto end;
}
} else {
// We need both video and audio packets to initialize pts_origin
// Need both video and audio to initialise pts_origin.
continue;
}
}
@ -446,14 +496,13 @@ sc_stream_sink_process_packets(struct sc_stream_sink *sink) {
video_pkt->dts = video_pkt->pts;
if (video_pkt_previous) {
// we now know the duration of the previous packet
video_pkt_previous->duration = video_pkt->pts
- video_pkt_previous->pts;
video_pkt_previous->duration =
video_pkt->pts - video_pkt_previous->pts;
bool ok = sc_stream_sink_write_video(sink, video_pkt_previous);
bool ok = sc_stream_sink_write_video(client, video_pkt_previous);
av_packet_free(&video_pkt_previous);
if (!ok) {
LOGE("Could not write video packet to stream");
LOGD("Stream sink: client disconnected (video write error)");
error = true;
goto end;
}
@ -467,73 +516,260 @@ sc_stream_sink_process_packets(struct sc_stream_sink *sink) {
audio_pkt->pts -= pts_origin;
audio_pkt->dts = audio_pkt->pts;
bool ok = sc_stream_sink_write_audio(sink, audio_pkt);
bool ok = sc_stream_sink_write_audio(client, audio_pkt);
av_packet_free(&audio_pkt);
audio_pkt = NULL;
if (!ok) {
LOGE("Could not write audio packet to stream");
LOGD("Stream sink: client disconnected (audio write error)");
error = true;
goto end;
}
av_packet_free(&audio_pkt);
audio_pkt = NULL;
}
}
// Write the last video packet
AVPacket *last = video_pkt_previous;
if (last) {
// assign an arbitrary duration to the last packet: 0.1s in us
last->duration = 100000;
bool ok = sc_stream_sink_write_video(sink, last);
// Write the last video packet.
if (video_pkt_previous) {
video_pkt_previous->duration = LAST_VIDEO_PACKET_DURATION_US;
bool ok = sc_stream_sink_write_video(client, video_pkt_previous);
if (!ok) {
// failing to write the last frame is not very serious, no
// future frame may depend on it, so the resulting stream
// will still be valid
LOGW("Could not write last packet to stream");
LOGW("Stream sink: could not write last video packet");
}
av_packet_free(&last);
last = NULL;
av_packet_free(&video_pkt_previous);
video_pkt_previous = NULL;
}
av_write_trailer(sink->ctx);
av_write_trailer(client->ctx);
end:
if (video_pkt) {
av_packet_free(&video_pkt);
}
if (audio_pkt) {
av_packet_free(&audio_pkt);
}
if (video_pkt_previous) {
av_packet_free(&video_pkt_previous);
}
av_packet_free(&video_pkt);
av_packet_free(&audio_pkt);
av_packet_free(&video_pkt_previous);
return !error;
}
/**
* Thread function for a per-client connection.
*
* Writes packets until the client disconnects or the sink is stopped, then
* closes the connection and marks itself as finished so the accept loop can
* reap it.
*/
static int
run_stream_sink_client(void *data) {
struct sc_stream_sink_client *client = data;
struct sc_stream_sink *sink = client->sink;
sc_stream_sink_client_run_stream(client);
// Close this client's network connection.
if (client->ctx->pb) {
avio_close(client->ctx->pb);
client->ctx->pb = NULL;
}
// Mark as finished so the accept loop can join and free us.
sc_mutex_lock(&sink->mutex);
client->finished = true;
// Drain any packets that arrived between the write error and now.
sc_stream_sink_queue_clear(&client->video_queue);
sc_stream_sink_queue_clear(&client->audio_queue);
sc_mutex_unlock(&sink->mutex);
LOGD("Stream sink: client thread ended");
return 0;
}
/**
* Join and free all client threads that have set finished=true.
* Must be called from the accept loop (main sink thread).
*/
static void
sc_stream_sink_reap_dead_clients(struct sc_stream_sink *sink) {
struct sc_stream_sink_client *dead = NULL;
sc_mutex_lock(&sink->mutex);
struct sc_stream_sink_client **pp = &sink->clients;
while (*pp) {
struct sc_stream_sink_client *c = *pp;
if (c->finished) {
*pp = c->next; // remove from list
c->next = dead;
dead = c;
} else {
pp = &c->next;
}
}
sc_mutex_unlock(&sink->mutex);
while (dead) {
struct sc_stream_sink_client *c = dead;
dead = c->next;
sc_thread_join(&c->thread, NULL);
avformat_free_context(c->ctx);
sc_vecdeque_destroy(&c->video_queue);
sc_vecdeque_destroy(&c->audio_queue);
free(c);
}
}
/**
* Accept loop: initialises the template context once, then repeatedly accepts
* incoming connections, spawning a per-client thread for each. Runs until
* sink->stopped is set (by sc_stream_sink_stop() or device EOS).
*/
// Forward declaration: defined below alongside the other packet-sink callbacks.
static void sc_stream_sink_stream_init(struct sc_stream_sink_stream *stream);
static int
run_stream_sink(void *data) {
struct sc_stream_sink *sink = data;
bool ok = sc_stream_sink_process_packets(sink);
sc_mutex_lock(&sink->mutex);
// Prevent the producer from pushing any new packet
sink->stopped = true;
// Discard pending packets
sc_stream_sink_queue_clear(&sink->video_queue);
sc_stream_sink_queue_clear(&sink->audio_queue);
sc_mutex_unlock(&sink->mutex);
if (sink->ctx->pb) {
avio_close(sink->ctx->pb);
sink->ctx->pb = NULL;
bool ok = sc_stream_sink_init_template(sink);
if (!ok) {
LOGE("Stream sink: initialisation failed");
goto stop;
}
if (ok) {
LOGI("Stream sink complete");
} else {
LOGE("Stream sink failed");
char *connect_url = sc_stream_sink_build_connect_url(sink->url);
if (!connect_url) {
goto stop;
}
LOGI("Stream sink: listening for clients on %s", sink->url);
while (!sink->stopped) {
// Reap any client threads that finished since the last iteration.
sc_stream_sink_reap_dead_clients(sink);
AVIOInterruptCB int_cb = {
.callback = sc_stream_sink_interrupt_cb,
.opaque = sink,
};
// Block here until one client connects (or sink is stopped).
AVIOContext *pb = NULL;
int r = avio_open2(&pb, connect_url, AVIO_FLAG_WRITE, &int_cb, NULL);
if (r < 0) {
if (!sink->stopped) {
LOGE("Stream sink: failed to accept connection on %s",
sink->url);
}
break;
}
// Build a fresh output context for this client from the template.
AVFormatContext *client_ctx = sc_stream_sink_create_client_ctx(sink);
if (!client_ctx) {
avio_close(pb);
continue;
}
client_ctx->pb = pb;
// Allocate and initialise the client struct.
struct sc_stream_sink_client *client =
calloc(1, sizeof(struct sc_stream_sink_client));
if (!client) {
LOG_OOM();
avio_close(client_ctx->pb);
client_ctx->pb = NULL;
avformat_free_context(client_ctx);
continue;
}
client->sink = sink;
client->ctx = client_ctx;
client->stopped = false;
client->finished = false;
sc_vecdeque_init(&client->video_queue);
sc_vecdeque_init(&client->audio_queue);
sc_stream_sink_stream_init(&client->video_stream);
sc_stream_sink_stream_init(&client->audio_stream);
// Copy shared stream indices so the client can look up its streams.
client->video_stream.index = sink->video_stream.index;
client->audio_stream.index = sink->audio_stream.index;
// Switch the client context's interrupt callback to check client->stopped too,
// so blocking I/O in the client thread can be interrupted on demand.
client_ctx->interrupt_callback.callback = sc_stream_sink_client_interrupt_cb;
client_ctx->interrupt_callback.opaque = client;
// Write the MPEG-TS stream header for this client.
if (avformat_write_header(client_ctx, NULL) < 0) {
LOGE("Stream sink: failed to write stream header to client");
avio_close(client_ctx->pb);
client_ctx->pb = NULL;
avformat_free_context(client_ctx);
free(client);
continue;
}
// Add the client to the active list before spawning its thread so
// that packets pushed between now and the thread start are queued.
sc_mutex_lock(&sink->mutex);
client->next = sink->clients;
sink->clients = client;
sc_mutex_unlock(&sink->mutex);
bool thread_ok = sc_thread_create(&client->thread,
run_stream_sink_client,
"scrcpy-stream-client", client);
if (!thread_ok) {
LOGE("Stream sink: could not create client thread");
sc_mutex_lock(&sink->mutex);
// Remove from list (it was just prepended)
sink->clients = client->next;
sc_stream_sink_queue_clear(&client->video_queue);
sc_stream_sink_queue_clear(&client->audio_queue);
sc_mutex_unlock(&sink->mutex);
// avformat_write_header already moved pb ownership; close it.
if (client_ctx->pb) {
avio_close(client_ctx->pb);
client_ctx->pb = NULL;
}
avformat_free_context(client_ctx);
sc_vecdeque_destroy(&client->video_queue);
sc_vecdeque_destroy(&client->audio_queue);
free(client);
continue;
}
LOGI("Stream sink: client connected on %s", sink->url);
}
free(connect_url);
stop:
// Stop and drain all active clients.
sc_mutex_lock(&sink->mutex);
sink->stopped = true;
struct sc_stream_sink_client *c = sink->clients;
while (c) {
c->stopped = true;
c = c->next;
}
sc_cond_broadcast(&sink->cond);
sc_mutex_unlock(&sink->mutex);
// Join every remaining client thread, then free the client structs.
sc_mutex_lock(&sink->mutex);
struct sc_stream_sink_client *head = sink->clients;
sink->clients = NULL;
sc_mutex_unlock(&sink->mutex);
while (head) {
struct sc_stream_sink_client *next = head->next;
sc_thread_join(&head->thread, NULL);
if (head->ctx->pb) {
avio_close(head->ctx->pb);
head->ctx->pb = NULL;
}
avformat_free_context(head->ctx);
sc_vecdeque_destroy(&head->video_queue);
sc_vecdeque_destroy(&head->audio_queue);
free(head);
head = next;
}
LOGD("Stream sink thread ended");
@ -590,7 +826,12 @@ sc_stream_sink_video_packet_sink_close(struct sc_packet_sink *sink) {
sc_mutex_lock(&ss->mutex);
// EOS also stops the stream sink
ss->stopped = true;
sc_cond_signal(&ss->cond);
struct sc_stream_sink_client *c = ss->clients;
while (c) {
c->stopped = true;
c = c->next;
}
sc_cond_broadcast(&ss->cond);
sc_mutex_unlock(&ss->mutex);
}
@ -609,24 +850,46 @@ sc_stream_sink_video_packet_sink_push(struct sc_packet_sink *sink,
return false;
}
AVPacket *p = sc_stream_sink_packet_ref(packet);
if (!p) {
LOG_OOM();
if (!ss->template_ready) {
// Init phase: buffer in the sink-level queue for sc_stream_sink_init_template.
AVPacket *p = sc_stream_sink_packet_ref(packet);
if (!p) {
LOG_OOM();
sc_mutex_unlock(&ss->mutex);
return false;
}
p->stream_index = ss->video_stream.index;
bool ok = sc_vecdeque_push(&ss->video_queue, p);
if (!ok) {
LOG_OOM();
av_packet_free(&p);
sc_mutex_unlock(&ss->mutex);
return false;
}
sc_cond_signal(&ss->cond);
sc_mutex_unlock(&ss->mutex);
return false;
return true;
}
p->stream_index = ss->video_stream.index;
bool ok = sc_vecdeque_push(&ss->video_queue, p);
if (!ok) {
LOG_OOM();
av_packet_free(&p);
sc_mutex_unlock(&ss->mutex);
return false;
// Live phase: fan out a ref-counted copy to every active client queue.
bool any_ok = false;
struct sc_stream_sink_client *c = ss->clients;
while (c) {
if (!c->stopped) {
AVPacket *p = sc_stream_sink_packet_ref(packet);
if (p) {
p->stream_index = ss->video_stream.index;
if (sc_vecdeque_push(&c->video_queue, p)) {
any_ok = true;
} else {
av_packet_free(&p);
}
}
}
c = c->next;
}
sc_cond_signal(&ss->cond);
(void) any_ok; // dropping when no clients is intentional for live streams
sc_cond_broadcast(&ss->cond);
sc_mutex_unlock(&ss->mutex);
return true;
@ -677,7 +940,12 @@ sc_stream_sink_audio_packet_sink_close(struct sc_packet_sink *sink) {
sc_mutex_lock(&ss->mutex);
// EOS also stops the stream sink
ss->stopped = true;
sc_cond_signal(&ss->cond);
struct sc_stream_sink_client *c = ss->clients;
while (c) {
c->stopped = true;
c = c->next;
}
sc_cond_broadcast(&ss->cond);
sc_mutex_unlock(&ss->mutex);
}
@ -697,24 +965,42 @@ sc_stream_sink_audio_packet_sink_push(struct sc_packet_sink *sink,
return false;
}
AVPacket *p = sc_stream_sink_packet_ref(packet);
if (!p) {
LOG_OOM();
if (!ss->template_ready) {
// Init phase: buffer in the sink-level queue.
AVPacket *p = sc_stream_sink_packet_ref(packet);
if (!p) {
LOG_OOM();
sc_mutex_unlock(&ss->mutex);
return false;
}
p->stream_index = ss->audio_stream.index;
bool ok = sc_vecdeque_push(&ss->audio_queue, p);
if (!ok) {
LOG_OOM();
av_packet_free(&p);
sc_mutex_unlock(&ss->mutex);
return false;
}
sc_cond_signal(&ss->cond);
sc_mutex_unlock(&ss->mutex);
return false;
return true;
}
p->stream_index = ss->audio_stream.index;
bool ok = sc_vecdeque_push(&ss->audio_queue, p);
if (!ok) {
LOG_OOM();
av_packet_free(&p);
sc_mutex_unlock(&ss->mutex);
return false;
// Live phase: fan out to every active client.
struct sc_stream_sink_client *c = ss->clients;
while (c) {
if (!c->stopped) {
AVPacket *p = sc_stream_sink_packet_ref(packet);
if (p) {
p->stream_index = ss->audio_stream.index;
if (!sc_vecdeque_push(&c->audio_queue, p)) {
av_packet_free(&p);
}
}
}
c = c->next;
}
sc_cond_signal(&ss->cond);
sc_cond_broadcast(&ss->cond);
sc_mutex_unlock(&ss->mutex);
return true;
@ -760,6 +1046,8 @@ sc_stream_sink_init(struct sc_stream_sink *sink, const char *url,
}
sink->stopped = false;
sink->template_ready = false;
sink->clients = NULL;
sc_vecdeque_init(&sink->video_queue);
sc_vecdeque_init(&sink->audio_queue);
@ -849,7 +1137,13 @@ void
sc_stream_sink_stop(struct sc_stream_sink *sink) {
sc_mutex_lock(&sink->mutex);
sink->stopped = true;
sc_cond_signal(&sink->cond);
// Also stop all active clients so their I/O is interrupted promptly.
struct sc_stream_sink_client *c = sink->clients;
while (c) {
c->stopped = true;
c = c->next;
}
sc_cond_broadcast(&sink->cond);
sc_mutex_unlock(&sink->mutex);
}

View file

@ -18,6 +18,9 @@ struct sc_stream_sink_stream {
int64_t last_pts;
};
/* Per-connection client state (defined in stream_sink.c). */
struct sc_stream_sink_client;
struct sc_stream_sink {
struct sc_packet_sink video_packet_sink;
struct sc_packet_sink audio_packet_sink;
@ -35,6 +38,8 @@ struct sc_stream_sink {
char *url;
// Template format context (no pb): holds stream definitions and codec
// parameters used to initialise a fresh context for each new connection.
AVFormatContext *ctx;
sc_thread thread;
@ -42,6 +47,9 @@ struct sc_stream_sink {
sc_cond cond;
// set on sc_stream_sink_stop(), packet_sink close or streaming failure
bool stopped;
// Init-phase queues: used only until template_ready is set.
// After that, each sc_stream_sink_client has its own queues.
struct sc_stream_sink_queue video_queue;
struct sc_stream_sink_queue audio_queue;
@ -51,8 +59,18 @@ struct sc_stream_sink {
bool audio_expects_config_packet;
// Stream indices shared by every per-client AVFormatContext (all clients
// copy the template streams in the same order).
struct sc_stream_sink_stream video_stream;
struct sc_stream_sink_stream audio_stream;
// Set to true once codec params + extradata are applied to the template
// context. Before this point packets are buffered in the init-phase queues
// above; after this point they are fanned out to active client queues.
bool template_ready;
// Linked list of currently active client connections (protected by mutex).
struct sc_stream_sink_client *clients;
};
bool