diff --git a/app/src/stream_sink.c b/app/src/stream_sink.c index 098d155a..e9cd4b76 100644 --- a/app/src/stream_sink.c +++ b/app/src/stream_sink.c @@ -18,6 +18,9 @@ static const AVRational SCRCPY_TIME_BASE = {1, 1000000}; // timestamps in us +/** Arbitrary duration assigned to the last video packet (0.1 s in µs). */ +#define LAST_VIDEO_PACKET_DURATION_US 100000 + /** Return true if `key=` appears in the URL's query string. */ static bool sc_url_has_param(const char *url, const char *key) { @@ -163,16 +166,85 @@ sc_stream_sink_set_extradata(AVStream *ostream, const AVPacket *packet) { return true; } +/** + * Per-connection state for one active streaming client. + * + * All fields that require synchronisation are protected by the parent + * sc_stream_sink::mutex / sc_stream_sink::cond. Fields only accessed from + * the client's own thread (video/audio stream PTS tracking) need no locking. + */ +struct sc_stream_sink_client { + struct sc_stream_sink *sink; /**< back-pointer to parent */ + AVFormatContext *ctx; /**< own output context (with pb) */ + sc_thread thread; + + /** Request the write loop to stop (protected by sink->mutex). */ + bool stopped; + /** The thread has exited; safe to join (protected by sink->mutex). */ + bool finished; + + /** Per-client packet queues (protected by sink->mutex). */ + struct sc_stream_sink_queue video_queue; + struct sc_stream_sink_queue audio_queue; + + /** PTS tracking – only accessed from the client thread, no mutex needed. */ + struct sc_stream_sink_stream video_stream; + struct sc_stream_sink_stream audio_stream; + + /** Intrusive linked list, protected by sink->mutex. */ + struct sc_stream_sink_client *next; +}; + +/** + * Allocate a new AVFormatContext initialised from the template context. + * + * Copies all streams and their codec parameters (including extradata) so that + * the returned context is ready to have a pb attached and a stream header + * written. Returns NULL on allocation failure. + */ +static AVFormatContext * +sc_stream_sink_create_client_ctx(const struct sc_stream_sink *sink) { + const AVOutputFormat *oformat = av_guess_format("mpegts", NULL, NULL); + assert(oformat); // already verified in sc_stream_sink_init + + AVFormatContext *ctx = avformat_alloc_context(); + if (!ctx) { + LOG_OOM(); + return NULL; + } + + // See sc_stream_sink_init for the rationale behind these flags. + ctx->oformat = (AVOutputFormat *) oformat; + ctx->flags |= AVFMT_FLAG_FLUSH_PACKETS; + ctx->max_interleave_delta = AV_TIME_BASE / 10; // 100 ms + + for (unsigned i = 0; i < sink->ctx->nb_streams; i++) { + AVStream *src = sink->ctx->streams[i]; + AVStream *dst = avformat_new_stream(ctx, NULL); + if (!dst) { + avformat_free_context(ctx); + return NULL; + } + if (avcodec_parameters_copy(dst->codecpar, src->codecpar) < 0) { + avformat_free_context(ctx); + return NULL; + } + dst->time_base = src->time_base; + } + + return ctx; +} + static inline void sc_stream_sink_rescale_packet(AVStream *stream, AVPacket *packet) { av_packet_rescale_ts(packet, SCRCPY_TIME_BASE, stream->time_base); } static bool -sc_stream_sink_write_stream(struct sc_stream_sink *sink, +sc_stream_sink_write_stream(struct sc_stream_sink_client *client, struct sc_stream_sink_stream *st, AVPacket *packet) { - AVStream *stream = sink->ctx->streams[st->index]; + AVStream *stream = client->ctx->streams[st->index]; sc_stream_sink_rescale_packet(stream, packet); if (st->last_pts != AV_NOPTS_VALUE && packet->pts <= st->last_pts) { LOGD("Fixing PTS non monotonically increasing in stream %d " @@ -183,17 +255,19 @@ sc_stream_sink_write_stream(struct sc_stream_sink *sink, } else { st->last_pts = packet->pts; } - return av_interleaved_write_frame(sink->ctx, packet) >= 0; + return av_interleaved_write_frame(client->ctx, packet) >= 0; } static inline bool -sc_stream_sink_write_video(struct sc_stream_sink *sink, AVPacket *packet) { - return sc_stream_sink_write_stream(sink, &sink->video_stream, packet); +sc_stream_sink_write_video(struct sc_stream_sink_client *client, + AVPacket *packet) { + return sc_stream_sink_write_stream(client, &client->video_stream, packet); } static inline bool -sc_stream_sink_write_audio(struct sc_stream_sink *sink, AVPacket *packet) { - return sc_stream_sink_write_stream(sink, &sink->audio_stream, packet); +sc_stream_sink_write_audio(struct sc_stream_sink_client *client, + AVPacket *packet) { + return sc_stream_sink_write_stream(client, &client->audio_stream, packet); } static int @@ -204,6 +278,14 @@ sc_stream_sink_interrupt_cb(void *data) { return sink->stopped ? 1 : 0; } +/** Interrupt callback for a per-client AVFormatContext. */ +static int +sc_stream_sink_client_interrupt_cb(void *data) { + struct sc_stream_sink_client *client = data; + // Read without mutex: intentional (benign race; same pattern as above) + return (client->stopped || client->sink->stopped) ? 1 : 0; +} + static inline bool sc_stream_sink_must_wait_for_config_packets(struct sc_stream_sink *sink) { if (sink->video && sc_vecdeque_is_empty(&sink->video_queue)) { @@ -221,8 +303,17 @@ sc_stream_sink_must_wait_for_config_packets(struct sc_stream_sink *sink) { return false; } +/** + * Wait for codec initialisation and the first config packets, then apply the + * codec parameters and extradata to the template AVFormatContext + * (sink->ctx). Does NOT open any network connection. + * + * On success, sink->template_ready is set to true and the init-phase queues + * are cleared. Returns false only when the sink is stopped before init + * completes. + */ static bool -sc_stream_sink_process_header(struct sc_stream_sink *sink) { +sc_stream_sink_init_template(struct sc_stream_sink *sink) { sc_mutex_lock(&sink->mutex); while (!sink->stopped && @@ -234,8 +325,6 @@ sc_stream_sink_process_header(struct sc_stream_sink *sink) { if (sink->video && sc_vecdeque_is_empty(&sink->video_queue)) { assert(sink->stopped); - // If the stream sink is stopped, don't process anything if there are - // not at least video packets sc_mutex_unlock(&sink->mutex); return false; } @@ -285,69 +374,45 @@ sc_stream_sink_process_header(struct sc_stream_sink *sink) { } } - { - char *connect_url = sc_stream_sink_build_connect_url(sink->url); - if (!connect_url) { - goto end; - } - - AVIOInterruptCB int_cb = { - .callback = sc_stream_sink_interrupt_cb, - .opaque = sink, - }; - - LOGI("Stream sink: waiting for client on %s", sink->url); - - int r = avio_open2(&sink->ctx->pb, connect_url, AVIO_FLAG_WRITE, - &int_cb, NULL); - free(connect_url); - if (r < 0) { - if (!sink->stopped) { - LOGE("Failed to open stream server on %s", sink->url); - } - goto end; - } - } - - { - bool ok = avformat_write_header(sink->ctx, NULL) >= 0; - if (!ok) { - LOGE("Failed to write stream header"); - avio_close(sink->ctx->pb); - sink->ctx->pb = NULL; - goto end; - } - } - ret = true; end: - if (video_pkt) { - av_packet_free(&video_pkt); - } - if (audio_pkt) { - av_packet_free(&audio_pkt); + av_packet_free(&video_pkt); + av_packet_free(&audio_pkt); + + if (ret) { + // Atomically mark the template as ready and discard any packets that + // accumulated in the init-phase queues during the wait above. + // From this point push() fans out directly to per-client queues. + sc_mutex_lock(&sink->mutex); + sink->template_ready = true; + sc_stream_sink_queue_clear(&sink->video_queue); + sc_stream_sink_queue_clear(&sink->audio_queue); + sc_mutex_unlock(&sink->mutex); } return ret; } +/** + * Per-client packet write loop. + * + * Dequeues packets from the client's own video/audio queues (which receive + * fan-out copies from the push callbacks) and writes them to client->ctx->pb. + * Returns false only when a write error occurs (i.e. the client disconnected); + * returns true when stopped cleanly. + */ static bool -sc_stream_sink_process_packets(struct sc_stream_sink *sink) { +sc_stream_sink_client_run_stream(struct sc_stream_sink_client *client) { + struct sc_stream_sink *sink = client->sink; + int64_t pts_origin = AV_NOPTS_VALUE; - bool header_written = sc_stream_sink_process_header(sink); - if (!header_written) { - return false; - } - - LOGI("Stream sink: streaming started on %s", sink->url); - AVPacket *video_pkt = NULL; AVPacket *audio_pkt = NULL; - // We can write a video packet only once we received the next one so that - // we can set its duration (next_pts - current_pts) + // Buffer the previous video packet until the next one arrives so we can + // compute its duration. AVPacket *video_pkt_previous = NULL; bool error = false; @@ -355,55 +420,43 @@ sc_stream_sink_process_packets(struct sc_stream_sink *sink) { for (;;) { sc_mutex_lock(&sink->mutex); - while (!sink->stopped) { + while (!client->stopped) { if (sink->video && !video_pkt && - !sc_vecdeque_is_empty(&sink->video_queue)) { - // A new packet may be assigned to video_pkt and be processed + !sc_vecdeque_is_empty(&client->video_queue)) { break; } - if (sink->audio && !audio_pkt - && !sc_vecdeque_is_empty(&sink->audio_queue)) { - // A new packet may be assigned to audio_pkt and be processed + if (sink->audio && !audio_pkt && + !sc_vecdeque_is_empty(&client->audio_queue)) { break; } sc_cond_wait(&sink->cond, &sink->mutex); } - // If stopped is set, continue to process the remaining events (to - // finish the streaming) before actually stopping. + // client->stopped may now be set; drain remaining packets before exit. - // If there is no video, then the video_queue will remain empty forever - // and video_pkt will always be NULL. assert(sink->video || (!video_pkt - && sc_vecdeque_is_empty(&sink->video_queue))); - - // If there is no audio, then the audio_queue will remain empty forever - // and audio_pkt will always be NULL. + && sc_vecdeque_is_empty(&client->video_queue))); assert(sink->audio || (!audio_pkt - && sc_vecdeque_is_empty(&sink->audio_queue))); + && sc_vecdeque_is_empty(&client->audio_queue))); - if (!video_pkt && !sc_vecdeque_is_empty(&sink->video_queue)) { - video_pkt = sc_vecdeque_pop(&sink->video_queue); + if (!video_pkt && !sc_vecdeque_is_empty(&client->video_queue)) { + video_pkt = sc_vecdeque_pop(&client->video_queue); } - if (!audio_pkt && !sc_vecdeque_is_empty(&sink->audio_queue)) { - audio_pkt = sc_vecdeque_pop(&sink->audio_queue); + if (!audio_pkt && !sc_vecdeque_is_empty(&client->audio_queue)) { + audio_pkt = sc_vecdeque_pop(&client->audio_queue); } - if (sink->stopped && !video_pkt && !audio_pkt) { - assert(sc_vecdeque_is_empty(&sink->video_queue)); - assert(sc_vecdeque_is_empty(&sink->audio_queue)); + if (client->stopped && !video_pkt && !audio_pkt) { sc_mutex_unlock(&sink->mutex); break; } - assert(video_pkt || audio_pkt); // at least one + assert(video_pkt || audio_pkt); sc_mutex_unlock(&sink->mutex); - // Ignore further config packets (e.g. on device orientation - // change). The next non-config packet will have the config packet - // data prepended. + // Discard further config packets (e.g. on device orientation change). if (video_pkt && video_pkt->pts == AV_NOPTS_VALUE) { av_packet_free(&video_pkt); video_pkt = NULL; @@ -423,18 +476,15 @@ sc_stream_sink_process_packets(struct sc_stream_sink *sink) { pts_origin = audio_pkt->pts; } else if (video_pkt && audio_pkt) { pts_origin = MIN(video_pkt->pts, audio_pkt->pts); - } else if (sink->stopped) { + } else if (client->stopped) { if (video_pkt) { - // The sink is stopped without audio, stream the video - // packets pts_origin = video_pkt->pts; } else { - // Fail if there is no video - error = true; + // Stopped without any usable video: nothing to stream. goto end; } } else { - // We need both video and audio packets to initialize pts_origin + // Need both video and audio to initialise pts_origin. continue; } } @@ -446,14 +496,13 @@ sc_stream_sink_process_packets(struct sc_stream_sink *sink) { video_pkt->dts = video_pkt->pts; if (video_pkt_previous) { - // we now know the duration of the previous packet - video_pkt_previous->duration = video_pkt->pts - - video_pkt_previous->pts; + video_pkt_previous->duration = + video_pkt->pts - video_pkt_previous->pts; - bool ok = sc_stream_sink_write_video(sink, video_pkt_previous); + bool ok = sc_stream_sink_write_video(client, video_pkt_previous); av_packet_free(&video_pkt_previous); if (!ok) { - LOGE("Could not write video packet to stream"); + LOGD("Stream sink: client disconnected (video write error)"); error = true; goto end; } @@ -467,73 +516,260 @@ sc_stream_sink_process_packets(struct sc_stream_sink *sink) { audio_pkt->pts -= pts_origin; audio_pkt->dts = audio_pkt->pts; - bool ok = sc_stream_sink_write_audio(sink, audio_pkt); + bool ok = sc_stream_sink_write_audio(client, audio_pkt); + av_packet_free(&audio_pkt); + audio_pkt = NULL; if (!ok) { - LOGE("Could not write audio packet to stream"); + LOGD("Stream sink: client disconnected (audio write error)"); error = true; goto end; } - - av_packet_free(&audio_pkt); - audio_pkt = NULL; } } - // Write the last video packet - AVPacket *last = video_pkt_previous; - if (last) { - // assign an arbitrary duration to the last packet: 0.1s in us - last->duration = 100000; - bool ok = sc_stream_sink_write_video(sink, last); + // Write the last video packet. + if (video_pkt_previous) { + video_pkt_previous->duration = LAST_VIDEO_PACKET_DURATION_US; + bool ok = sc_stream_sink_write_video(client, video_pkt_previous); if (!ok) { - // failing to write the last frame is not very serious, no - // future frame may depend on it, so the resulting stream - // will still be valid - LOGW("Could not write last packet to stream"); + LOGW("Stream sink: could not write last video packet"); } - av_packet_free(&last); - last = NULL; + av_packet_free(&video_pkt_previous); + video_pkt_previous = NULL; } - av_write_trailer(sink->ctx); + av_write_trailer(client->ctx); end: - if (video_pkt) { - av_packet_free(&video_pkt); - } - if (audio_pkt) { - av_packet_free(&audio_pkt); - } - if (video_pkt_previous) { - av_packet_free(&video_pkt_previous); - } + av_packet_free(&video_pkt); + av_packet_free(&audio_pkt); + av_packet_free(&video_pkt_previous); return !error; } +/** + * Thread function for a per-client connection. + * + * Writes packets until the client disconnects or the sink is stopped, then + * closes the connection and marks itself as finished so the accept loop can + * reap it. + */ +static int +run_stream_sink_client(void *data) { + struct sc_stream_sink_client *client = data; + struct sc_stream_sink *sink = client->sink; + + sc_stream_sink_client_run_stream(client); + + // Close this client's network connection. + if (client->ctx->pb) { + avio_close(client->ctx->pb); + client->ctx->pb = NULL; + } + + // Mark as finished so the accept loop can join and free us. + sc_mutex_lock(&sink->mutex); + client->finished = true; + // Drain any packets that arrived between the write error and now. + sc_stream_sink_queue_clear(&client->video_queue); + sc_stream_sink_queue_clear(&client->audio_queue); + sc_mutex_unlock(&sink->mutex); + + LOGD("Stream sink: client thread ended"); + + return 0; +} + +/** + * Join and free all client threads that have set finished=true. + * Must be called from the accept loop (main sink thread). + */ +static void +sc_stream_sink_reap_dead_clients(struct sc_stream_sink *sink) { + struct sc_stream_sink_client *dead = NULL; + + sc_mutex_lock(&sink->mutex); + struct sc_stream_sink_client **pp = &sink->clients; + while (*pp) { + struct sc_stream_sink_client *c = *pp; + if (c->finished) { + *pp = c->next; // remove from list + c->next = dead; + dead = c; + } else { + pp = &c->next; + } + } + sc_mutex_unlock(&sink->mutex); + + while (dead) { + struct sc_stream_sink_client *c = dead; + dead = c->next; + sc_thread_join(&c->thread, NULL); + avformat_free_context(c->ctx); + sc_vecdeque_destroy(&c->video_queue); + sc_vecdeque_destroy(&c->audio_queue); + free(c); + } +} + +/** + * Accept loop: initialises the template context once, then repeatedly accepts + * incoming connections, spawning a per-client thread for each. Runs until + * sink->stopped is set (by sc_stream_sink_stop() or device EOS). + */ + +// Forward declaration: defined below alongside the other packet-sink callbacks. +static void sc_stream_sink_stream_init(struct sc_stream_sink_stream *stream); + static int run_stream_sink(void *data) { struct sc_stream_sink *sink = data; - bool ok = sc_stream_sink_process_packets(sink); - - sc_mutex_lock(&sink->mutex); - // Prevent the producer from pushing any new packet - sink->stopped = true; - // Discard pending packets - sc_stream_sink_queue_clear(&sink->video_queue); - sc_stream_sink_queue_clear(&sink->audio_queue); - sc_mutex_unlock(&sink->mutex); - - if (sink->ctx->pb) { - avio_close(sink->ctx->pb); - sink->ctx->pb = NULL; + bool ok = sc_stream_sink_init_template(sink); + if (!ok) { + LOGE("Stream sink: initialisation failed"); + goto stop; } - if (ok) { - LOGI("Stream sink complete"); - } else { - LOGE("Stream sink failed"); + char *connect_url = sc_stream_sink_build_connect_url(sink->url); + if (!connect_url) { + goto stop; + } + + LOGI("Stream sink: listening for clients on %s", sink->url); + + while (!sink->stopped) { + // Reap any client threads that finished since the last iteration. + sc_stream_sink_reap_dead_clients(sink); + + AVIOInterruptCB int_cb = { + .callback = sc_stream_sink_interrupt_cb, + .opaque = sink, + }; + + // Block here until one client connects (or sink is stopped). + AVIOContext *pb = NULL; + int r = avio_open2(&pb, connect_url, AVIO_FLAG_WRITE, &int_cb, NULL); + if (r < 0) { + if (!sink->stopped) { + LOGE("Stream sink: failed to accept connection on %s", + sink->url); + } + break; + } + + // Build a fresh output context for this client from the template. + AVFormatContext *client_ctx = sc_stream_sink_create_client_ctx(sink); + if (!client_ctx) { + avio_close(pb); + continue; + } + client_ctx->pb = pb; + + // Allocate and initialise the client struct. + struct sc_stream_sink_client *client = + calloc(1, sizeof(struct sc_stream_sink_client)); + if (!client) { + LOG_OOM(); + avio_close(client_ctx->pb); + client_ctx->pb = NULL; + avformat_free_context(client_ctx); + continue; + } + client->sink = sink; + client->ctx = client_ctx; + client->stopped = false; + client->finished = false; + sc_vecdeque_init(&client->video_queue); + sc_vecdeque_init(&client->audio_queue); + sc_stream_sink_stream_init(&client->video_stream); + sc_stream_sink_stream_init(&client->audio_stream); + // Copy shared stream indices so the client can look up its streams. + client->video_stream.index = sink->video_stream.index; + client->audio_stream.index = sink->audio_stream.index; + + // Switch the client context's interrupt callback to check client->stopped too, + // so blocking I/O in the client thread can be interrupted on demand. + client_ctx->interrupt_callback.callback = sc_stream_sink_client_interrupt_cb; + client_ctx->interrupt_callback.opaque = client; + + // Write the MPEG-TS stream header for this client. + if (avformat_write_header(client_ctx, NULL) < 0) { + LOGE("Stream sink: failed to write stream header to client"); + avio_close(client_ctx->pb); + client_ctx->pb = NULL; + avformat_free_context(client_ctx); + free(client); + continue; + } + + // Add the client to the active list before spawning its thread so + // that packets pushed between now and the thread start are queued. + sc_mutex_lock(&sink->mutex); + client->next = sink->clients; + sink->clients = client; + sc_mutex_unlock(&sink->mutex); + + bool thread_ok = sc_thread_create(&client->thread, + run_stream_sink_client, + "scrcpy-stream-client", client); + if (!thread_ok) { + LOGE("Stream sink: could not create client thread"); + sc_mutex_lock(&sink->mutex); + // Remove from list (it was just prepended) + sink->clients = client->next; + sc_stream_sink_queue_clear(&client->video_queue); + sc_stream_sink_queue_clear(&client->audio_queue); + sc_mutex_unlock(&sink->mutex); + // avformat_write_header already moved pb ownership; close it. + if (client_ctx->pb) { + avio_close(client_ctx->pb); + client_ctx->pb = NULL; + } + avformat_free_context(client_ctx); + sc_vecdeque_destroy(&client->video_queue); + sc_vecdeque_destroy(&client->audio_queue); + free(client); + continue; + } + + LOGI("Stream sink: client connected on %s", sink->url); + } + + free(connect_url); + +stop: + // Stop and drain all active clients. + sc_mutex_lock(&sink->mutex); + sink->stopped = true; + struct sc_stream_sink_client *c = sink->clients; + while (c) { + c->stopped = true; + c = c->next; + } + sc_cond_broadcast(&sink->cond); + sc_mutex_unlock(&sink->mutex); + + // Join every remaining client thread, then free the client structs. + sc_mutex_lock(&sink->mutex); + struct sc_stream_sink_client *head = sink->clients; + sink->clients = NULL; + sc_mutex_unlock(&sink->mutex); + + while (head) { + struct sc_stream_sink_client *next = head->next; + sc_thread_join(&head->thread, NULL); + if (head->ctx->pb) { + avio_close(head->ctx->pb); + head->ctx->pb = NULL; + } + avformat_free_context(head->ctx); + sc_vecdeque_destroy(&head->video_queue); + sc_vecdeque_destroy(&head->audio_queue); + free(head); + head = next; } LOGD("Stream sink thread ended"); @@ -590,7 +826,12 @@ sc_stream_sink_video_packet_sink_close(struct sc_packet_sink *sink) { sc_mutex_lock(&ss->mutex); // EOS also stops the stream sink ss->stopped = true; - sc_cond_signal(&ss->cond); + struct sc_stream_sink_client *c = ss->clients; + while (c) { + c->stopped = true; + c = c->next; + } + sc_cond_broadcast(&ss->cond); sc_mutex_unlock(&ss->mutex); } @@ -609,24 +850,46 @@ sc_stream_sink_video_packet_sink_push(struct sc_packet_sink *sink, return false; } - AVPacket *p = sc_stream_sink_packet_ref(packet); - if (!p) { - LOG_OOM(); + if (!ss->template_ready) { + // Init phase: buffer in the sink-level queue for sc_stream_sink_init_template. + AVPacket *p = sc_stream_sink_packet_ref(packet); + if (!p) { + LOG_OOM(); + sc_mutex_unlock(&ss->mutex); + return false; + } + p->stream_index = ss->video_stream.index; + bool ok = sc_vecdeque_push(&ss->video_queue, p); + if (!ok) { + LOG_OOM(); + av_packet_free(&p); + sc_mutex_unlock(&ss->mutex); + return false; + } + sc_cond_signal(&ss->cond); sc_mutex_unlock(&ss->mutex); - return false; + return true; } - p->stream_index = ss->video_stream.index; - - bool ok = sc_vecdeque_push(&ss->video_queue, p); - if (!ok) { - LOG_OOM(); - av_packet_free(&p); - sc_mutex_unlock(&ss->mutex); - return false; + // Live phase: fan out a ref-counted copy to every active client queue. + bool any_ok = false; + struct sc_stream_sink_client *c = ss->clients; + while (c) { + if (!c->stopped) { + AVPacket *p = sc_stream_sink_packet_ref(packet); + if (p) { + p->stream_index = ss->video_stream.index; + if (sc_vecdeque_push(&c->video_queue, p)) { + any_ok = true; + } else { + av_packet_free(&p); + } + } + } + c = c->next; } - - sc_cond_signal(&ss->cond); + (void) any_ok; // dropping when no clients is intentional for live streams + sc_cond_broadcast(&ss->cond); sc_mutex_unlock(&ss->mutex); return true; @@ -677,7 +940,12 @@ sc_stream_sink_audio_packet_sink_close(struct sc_packet_sink *sink) { sc_mutex_lock(&ss->mutex); // EOS also stops the stream sink ss->stopped = true; - sc_cond_signal(&ss->cond); + struct sc_stream_sink_client *c = ss->clients; + while (c) { + c->stopped = true; + c = c->next; + } + sc_cond_broadcast(&ss->cond); sc_mutex_unlock(&ss->mutex); } @@ -697,24 +965,42 @@ sc_stream_sink_audio_packet_sink_push(struct sc_packet_sink *sink, return false; } - AVPacket *p = sc_stream_sink_packet_ref(packet); - if (!p) { - LOG_OOM(); + if (!ss->template_ready) { + // Init phase: buffer in the sink-level queue. + AVPacket *p = sc_stream_sink_packet_ref(packet); + if (!p) { + LOG_OOM(); + sc_mutex_unlock(&ss->mutex); + return false; + } + p->stream_index = ss->audio_stream.index; + bool ok = sc_vecdeque_push(&ss->audio_queue, p); + if (!ok) { + LOG_OOM(); + av_packet_free(&p); + sc_mutex_unlock(&ss->mutex); + return false; + } + sc_cond_signal(&ss->cond); sc_mutex_unlock(&ss->mutex); - return false; + return true; } - p->stream_index = ss->audio_stream.index; - - bool ok = sc_vecdeque_push(&ss->audio_queue, p); - if (!ok) { - LOG_OOM(); - av_packet_free(&p); - sc_mutex_unlock(&ss->mutex); - return false; + // Live phase: fan out to every active client. + struct sc_stream_sink_client *c = ss->clients; + while (c) { + if (!c->stopped) { + AVPacket *p = sc_stream_sink_packet_ref(packet); + if (p) { + p->stream_index = ss->audio_stream.index; + if (!sc_vecdeque_push(&c->audio_queue, p)) { + av_packet_free(&p); + } + } + } + c = c->next; } - - sc_cond_signal(&ss->cond); + sc_cond_broadcast(&ss->cond); sc_mutex_unlock(&ss->mutex); return true; @@ -760,6 +1046,8 @@ sc_stream_sink_init(struct sc_stream_sink *sink, const char *url, } sink->stopped = false; + sink->template_ready = false; + sink->clients = NULL; sc_vecdeque_init(&sink->video_queue); sc_vecdeque_init(&sink->audio_queue); @@ -849,7 +1137,13 @@ void sc_stream_sink_stop(struct sc_stream_sink *sink) { sc_mutex_lock(&sink->mutex); sink->stopped = true; - sc_cond_signal(&sink->cond); + // Also stop all active clients so their I/O is interrupted promptly. + struct sc_stream_sink_client *c = sink->clients; + while (c) { + c->stopped = true; + c = c->next; + } + sc_cond_broadcast(&sink->cond); sc_mutex_unlock(&sink->mutex); } diff --git a/app/src/stream_sink.h b/app/src/stream_sink.h index 7ac3c5ec..e1aae413 100644 --- a/app/src/stream_sink.h +++ b/app/src/stream_sink.h @@ -18,6 +18,9 @@ struct sc_stream_sink_stream { int64_t last_pts; }; +/* Per-connection client state (defined in stream_sink.c). */ +struct sc_stream_sink_client; + struct sc_stream_sink { struct sc_packet_sink video_packet_sink; struct sc_packet_sink audio_packet_sink; @@ -35,6 +38,8 @@ struct sc_stream_sink { char *url; + // Template format context (no pb): holds stream definitions and codec + // parameters used to initialise a fresh context for each new connection. AVFormatContext *ctx; sc_thread thread; @@ -42,6 +47,9 @@ struct sc_stream_sink { sc_cond cond; // set on sc_stream_sink_stop(), packet_sink close or streaming failure bool stopped; + + // Init-phase queues: used only until template_ready is set. + // After that, each sc_stream_sink_client has its own queues. struct sc_stream_sink_queue video_queue; struct sc_stream_sink_queue audio_queue; @@ -51,8 +59,18 @@ struct sc_stream_sink { bool audio_expects_config_packet; + // Stream indices shared by every per-client AVFormatContext (all clients + // copy the template streams in the same order). struct sc_stream_sink_stream video_stream; struct sc_stream_sink_stream audio_stream; + + // Set to true once codec params + extradata are applied to the template + // context. Before this point packets are buffered in the init-phase queues + // above; after this point they are fanned out to active client queues. + bool template_ready; + + // Linked list of currently active client connections (protected by mutex). + struct sc_stream_sink_client *clients; }; bool