diff --git a/app/meson.build b/app/meson.build index f7df69eb..bb52c2f3 100644 --- a/app/meson.build +++ b/app/meson.build @@ -31,6 +31,7 @@ src = [ 'src/receiver.c', 'src/recorder.c', 'src/scrcpy.c', + 'src/stream_sink.c', 'src/screen.c', 'src/server.c', 'src/version.c', diff --git a/app/src/cli.c b/app/src/cli.c index b2e3e30a..b7ececbe 100644 --- a/app/src/cli.c +++ b/app/src/cli.c @@ -114,6 +114,7 @@ enum { OPT_NO_VD_SYSTEM_DECORATIONS, OPT_NO_VD_DESTROY_CONTENT, OPT_DISPLAY_IME_POLICY, + OPT_STREAM_PORT, }; struct sc_option { @@ -956,6 +957,15 @@ static const struct sc_option options[] = { "Default is info.", #endif }, + { + .longopt_id = OPT_STREAM_PORT, + .longopt = "stream-port", + .argdesc = "port", + .text = "Start a TCP server that streams the device video (and audio, " + "if enabled) as MPEG-TS on the given port. " + "Once started, connect with any compatible player using " + "tcp://127.0.0.1: (e.g. in OBS Media Source or VLC).", + }, { .longopt_id = OPT_V4L2_SINK, .longopt = "v4l2-sink", @@ -2686,6 +2696,11 @@ parse_args_with_getopt(struct scrcpy_cli_args *args, int argc, char *argv[], LOGE("OTG mode (--otg) is disabled."); return false; #endif + case OPT_STREAM_PORT: + if (!parse_port(optarg, &opts->stream_port)) { + return false; + } + break; case OPT_V4L2_SINK: #ifdef HAVE_V4L2 opts->v4l2_device = optarg; diff --git a/app/src/options.c b/app/src/options.c index 0fe82d29..a1ce74d9 100644 --- a/app/src/options.c +++ b/app/src/options.c @@ -71,6 +71,7 @@ const struct scrcpy_options scrcpy_options_default = { .v4l2_device = NULL, .v4l2_buffer = 0, #endif + .stream_port = 0, #ifdef HAVE_USB .otg = false, #endif diff --git a/app/src/options.h b/app/src/options.h index 03b42913..88ab7cfd 100644 --- a/app/src/options.h +++ b/app/src/options.h @@ -281,6 +281,7 @@ struct scrcpy_options { const char *v4l2_device; sc_tick v4l2_buffer; #endif + uint16_t stream_port; // 0 means disabled #ifdef HAVE_USB bool otg; #endif diff --git a/app/src/scrcpy.c b/app/src/scrcpy.c index aedfdf9c..5674eae2 100644 --- a/app/src/scrcpy.c +++ b/app/src/scrcpy.c @@ -26,6 +26,7 @@ #include "recorder.h" #include "screen.h" #include "server.h" +#include "stream_sink.h" #include "uhid/gamepad_uhid.h" #include "uhid/keyboard_uhid.h" #include "uhid/mouse_uhid.h" @@ -54,6 +55,7 @@ struct scrcpy { struct sc_decoder video_decoder; struct sc_decoder audio_decoder; struct sc_recorder recorder; + struct sc_stream_sink stream_sink; struct sc_delay_buffer video_buffer; #ifdef HAVE_V4L2 struct sc_v4l2_sink v4l2_sink; @@ -400,6 +402,8 @@ scrcpy(struct scrcpy_options *options) { bool file_pusher_initialized = false; bool recorder_initialized = false; bool recorder_started = false; + bool stream_sink_initialized = false; + bool stream_sink_started = false; #ifdef HAVE_V4L2 bool v4l2_sink_initialized = false; #endif @@ -632,6 +636,28 @@ scrcpy(struct scrcpy_options *options) { } } + if (options->stream_port) { + if (!sc_stream_sink_init(&s->stream_sink, options->stream_port, + options->video, options->audio)) { + goto end; + } + stream_sink_initialized = true; + + if (!sc_stream_sink_start(&s->stream_sink)) { + goto end; + } + stream_sink_started = true; + + if (options->video) { + sc_packet_source_add_sink(&s->video_demuxer.packet_source, + &s->stream_sink.video_packet_sink); + } + if (options->audio) { + sc_packet_source_add_sink(&s->audio_demuxer.packet_source, + &s->stream_sink.audio_packet_sink); + } + } + struct sc_controller *controller = NULL; struct sc_key_processor *kp = NULL; struct sc_mouse_processor *mp = NULL; @@ -989,6 +1015,9 @@ end: if (recorder_initialized) { sc_recorder_stop(&s->recorder); } + if (stream_sink_initialized) { + sc_stream_sink_stop(&s->stream_sink); + } if (screen_initialized) { sc_screen_interrupt(&s->screen); } @@ -1053,6 +1082,13 @@ end: sc_recorder_destroy(&s->recorder); } + if (stream_sink_started) { + sc_stream_sink_join(&s->stream_sink); + } + if (stream_sink_initialized) { + sc_stream_sink_destroy(&s->stream_sink); + } + if (file_pusher_initialized) { sc_file_pusher_join(&s->file_pusher); sc_file_pusher_destroy(&s->file_pusher); diff --git a/app/src/stream_sink.c b/app/src/stream_sink.c new file mode 100644 index 00000000..58d02669 --- /dev/null +++ b/app/src/stream_sink.c @@ -0,0 +1,749 @@ +#include "stream_sink.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "util/log.h" + +/** Downcast packet sinks to stream sink */ +#define DOWNCAST_VIDEO(SINK) \ + container_of(SINK, struct sc_stream_sink, video_packet_sink) +#define DOWNCAST_AUDIO(SINK) \ + container_of(SINK, struct sc_stream_sink, audio_packet_sink) + +static const AVRational SCRCPY_TIME_BASE = {1, 1000000}; // timestamps in us + +static AVPacket * +sc_stream_sink_packet_ref(const AVPacket *packet) { + AVPacket *p = av_packet_alloc(); + if (!p) { + LOG_OOM(); + return NULL; + } + + if (av_packet_ref(p, packet)) { + av_packet_free(&p); + return NULL; + } + + return p; +} + +static void +sc_stream_sink_queue_clear(struct sc_stream_sink_queue *queue) { + while (!sc_vecdeque_is_empty(queue)) { + AVPacket *p = sc_vecdeque_pop(queue); + av_packet_free(&p); + } +} + +static bool +sc_stream_sink_set_extradata(AVStream *ostream, const AVPacket *packet) { + uint8_t *extradata = av_malloc(packet->size * sizeof(uint8_t)); + if (!extradata) { + LOG_OOM(); + return false; + } + + // copy the first packet to the extra data + memcpy(extradata, packet->data, packet->size); + + ostream->codecpar->extradata = extradata; + ostream->codecpar->extradata_size = packet->size; + return true; +} + +static inline void +sc_stream_sink_rescale_packet(AVStream *stream, AVPacket *packet) { + av_packet_rescale_ts(packet, SCRCPY_TIME_BASE, stream->time_base); +} + +static bool +sc_stream_sink_write_stream(struct sc_stream_sink *sink, + struct sc_stream_sink_stream *st, + AVPacket *packet) { + AVStream *stream = sink->ctx->streams[st->index]; + sc_stream_sink_rescale_packet(stream, packet); + if (st->last_pts != AV_NOPTS_VALUE && packet->pts <= st->last_pts) { + LOGD("Fixing PTS non monotonically increasing in stream %d " + "(%" PRIi64 " >= %" PRIi64 ")", + st->index, st->last_pts, packet->pts); + packet->pts = ++st->last_pts; + packet->dts = packet->pts; + } else { + st->last_pts = packet->pts; + } + return av_interleaved_write_frame(sink->ctx, packet) >= 0; +} + +static inline bool +sc_stream_sink_write_video(struct sc_stream_sink *sink, AVPacket *packet) { + return sc_stream_sink_write_stream(sink, &sink->video_stream, packet); +} + +static inline bool +sc_stream_sink_write_audio(struct sc_stream_sink *sink, AVPacket *packet) { + return sc_stream_sink_write_stream(sink, &sink->audio_stream, packet); +} + +static int +sc_stream_sink_interrupt_cb(void *data) { + struct sc_stream_sink *sink = data; + // Read without mutex: this is intentional (same pattern as interrupt + // callbacks in other parts of the codebase) + return sink->stopped ? 1 : 0; +} + +static inline bool +sc_stream_sink_must_wait_for_config_packets(struct sc_stream_sink *sink) { + if (sink->video && sc_vecdeque_is_empty(&sink->video_queue)) { + // The video queue is empty + return true; + } + + if (sink->audio && sink->audio_expects_config_packet + && sc_vecdeque_is_empty(&sink->audio_queue)) { + // The audio queue is empty (when audio is enabled) + return true; + } + + // No queue is empty + return false; +} + +static bool +sc_stream_sink_process_header(struct sc_stream_sink *sink) { + sc_mutex_lock(&sink->mutex); + + while (!sink->stopped && + ((sink->video && !sink->video_init) + || (sink->audio && !sink->audio_init) + || sc_stream_sink_must_wait_for_config_packets(sink))) { + sc_cond_wait(&sink->cond, &sink->mutex); + } + + if (sink->video && sc_vecdeque_is_empty(&sink->video_queue)) { + assert(sink->stopped); + // If the stream sink is stopped, don't process anything if there are + // not at least video packets + sc_mutex_unlock(&sink->mutex); + return false; + } + + AVPacket *video_pkt = NULL; + if (!sc_vecdeque_is_empty(&sink->video_queue)) { + assert(sink->video); + video_pkt = sc_vecdeque_pop(&sink->video_queue); + } + + AVPacket *audio_pkt = NULL; + if (sink->audio_expects_config_packet && + !sc_vecdeque_is_empty(&sink->audio_queue)) { + assert(sink->audio); + audio_pkt = sc_vecdeque_pop(&sink->audio_queue); + } + + sc_mutex_unlock(&sink->mutex); + + bool ret = false; + + if (video_pkt) { + if (video_pkt->pts != AV_NOPTS_VALUE) { + LOGE("The first video packet is not a config packet"); + goto end; + } + + assert(sink->video_stream.index >= 0); + AVStream *video_stream = sink->ctx->streams[sink->video_stream.index]; + bool ok = sc_stream_sink_set_extradata(video_stream, video_pkt); + if (!ok) { + goto end; + } + } + + if (audio_pkt) { + if (audio_pkt->pts != AV_NOPTS_VALUE) { + LOGE("The first audio packet is not a config packet"); + goto end; + } + + assert(sink->audio_stream.index >= 0); + AVStream *audio_stream = sink->ctx->streams[sink->audio_stream.index]; + bool ok = sc_stream_sink_set_extradata(audio_stream, audio_pkt); + if (!ok) { + goto end; + } + } + + { + // Open the TCP server: this blocks until a client connects (or + // sink->stopped is set, via the interrupt callback) + char url[64]; + snprintf(url, sizeof(url), + "tcp://0.0.0.0:%" PRIu16 "?listen=1", sink->port); + + AVIOInterruptCB int_cb = { + .callback = sc_stream_sink_interrupt_cb, + .opaque = sink, + }; + + LOGI("Stream sink: waiting for client on tcp://127.0.0.1:%" PRIu16, + sink->port); + + int r = avio_open2(&sink->ctx->pb, url, AVIO_FLAG_WRITE, + &int_cb, NULL); + if (r < 0) { + if (!sink->stopped) { + LOGE("Failed to open stream server on port %" PRIu16, + sink->port); + } + goto end; + } + } + + { + bool ok = avformat_write_header(sink->ctx, NULL) >= 0; + if (!ok) { + LOGE("Failed to write stream header"); + avio_close(sink->ctx->pb); + sink->ctx->pb = NULL; + goto end; + } + } + + ret = true; + +end: + if (video_pkt) { + av_packet_free(&video_pkt); + } + if (audio_pkt) { + av_packet_free(&audio_pkt); + } + + return ret; +} + +static bool +sc_stream_sink_process_packets(struct sc_stream_sink *sink) { + int64_t pts_origin = AV_NOPTS_VALUE; + + bool header_written = sc_stream_sink_process_header(sink); + if (!header_written) { + return false; + } + + LOGI("Stream sink: streaming started on tcp://127.0.0.1:%" PRIu16, + sink->port); + + AVPacket *video_pkt = NULL; + AVPacket *audio_pkt = NULL; + + // We can write a video packet only once we received the next one so that + // we can set its duration (next_pts - current_pts) + AVPacket *video_pkt_previous = NULL; + + bool error = false; + + for (;;) { + sc_mutex_lock(&sink->mutex); + + while (!sink->stopped) { + if (sink->video && !video_pkt && + !sc_vecdeque_is_empty(&sink->video_queue)) { + // A new packet may be assigned to video_pkt and be processed + break; + } + if (sink->audio && !audio_pkt + && !sc_vecdeque_is_empty(&sink->audio_queue)) { + // A new packet may be assigned to audio_pkt and be processed + break; + } + sc_cond_wait(&sink->cond, &sink->mutex); + } + + // If stopped is set, continue to process the remaining events (to + // finish the streaming) before actually stopping. + + // If there is no video, then the video_queue will remain empty forever + // and video_pkt will always be NULL. + assert(sink->video || (!video_pkt + && sc_vecdeque_is_empty(&sink->video_queue))); + + // If there is no audio, then the audio_queue will remain empty forever + // and audio_pkt will always be NULL. + assert(sink->audio || (!audio_pkt + && sc_vecdeque_is_empty(&sink->audio_queue))); + + if (!video_pkt && !sc_vecdeque_is_empty(&sink->video_queue)) { + video_pkt = sc_vecdeque_pop(&sink->video_queue); + } + + if (!audio_pkt && !sc_vecdeque_is_empty(&sink->audio_queue)) { + audio_pkt = sc_vecdeque_pop(&sink->audio_queue); + } + + if (sink->stopped && !video_pkt && !audio_pkt) { + assert(sc_vecdeque_is_empty(&sink->video_queue)); + assert(sc_vecdeque_is_empty(&sink->audio_queue)); + sc_mutex_unlock(&sink->mutex); + break; + } + + assert(video_pkt || audio_pkt); // at least one + + sc_mutex_unlock(&sink->mutex); + + // Ignore further config packets (e.g. on device orientation + // change). The next non-config packet will have the config packet + // data prepended. + if (video_pkt && video_pkt->pts == AV_NOPTS_VALUE) { + av_packet_free(&video_pkt); + video_pkt = NULL; + } + + if (audio_pkt && audio_pkt->pts == AV_NOPTS_VALUE) { + av_packet_free(&audio_pkt); + audio_pkt = NULL; + } + + if (pts_origin == AV_NOPTS_VALUE) { + if (!sink->audio) { + assert(video_pkt); + pts_origin = video_pkt->pts; + } else if (!sink->video) { + assert(audio_pkt); + pts_origin = audio_pkt->pts; + } else if (video_pkt && audio_pkt) { + pts_origin = MIN(video_pkt->pts, audio_pkt->pts); + } else if (sink->stopped) { + if (video_pkt) { + // The sink is stopped without audio, stream the video + // packets + pts_origin = video_pkt->pts; + } else { + // Fail if there is no video + error = true; + goto end; + } + } else { + // We need both video and audio packets to initialize pts_origin + continue; + } + } + + assert(pts_origin != AV_NOPTS_VALUE); + + if (video_pkt) { + video_pkt->pts -= pts_origin; + video_pkt->dts = video_pkt->pts; + + if (video_pkt_previous) { + // we now know the duration of the previous packet + video_pkt_previous->duration = video_pkt->pts + - video_pkt_previous->pts; + + bool ok = sc_stream_sink_write_video(sink, video_pkt_previous); + av_packet_free(&video_pkt_previous); + if (!ok) { + LOGE("Could not write video packet to stream"); + error = true; + goto end; + } + } + + video_pkt_previous = video_pkt; + video_pkt = NULL; + } + + if (audio_pkt) { + audio_pkt->pts -= pts_origin; + audio_pkt->dts = audio_pkt->pts; + + bool ok = sc_stream_sink_write_audio(sink, audio_pkt); + if (!ok) { + LOGE("Could not write audio packet to stream"); + error = true; + goto end; + } + + av_packet_free(&audio_pkt); + audio_pkt = NULL; + } + } + + // Write the last video packet + AVPacket *last = video_pkt_previous; + if (last) { + // assign an arbitrary duration to the last packet: 0.1s in us + last->duration = 100000; + bool ok = sc_stream_sink_write_video(sink, last); + if (!ok) { + // failing to write the last frame is not very serious, no + // future frame may depend on it, so the resulting stream + // will still be valid + LOGW("Could not write last packet to stream"); + } + av_packet_free(&last); + last = NULL; + } + + av_write_trailer(sink->ctx); + +end: + if (video_pkt) { + av_packet_free(&video_pkt); + } + if (audio_pkt) { + av_packet_free(&audio_pkt); + } + if (video_pkt_previous) { + av_packet_free(&video_pkt_previous); + } + + return !error; +} + +static int +run_stream_sink(void *data) { + struct sc_stream_sink *sink = data; + + bool ok = sc_stream_sink_process_packets(sink); + + sc_mutex_lock(&sink->mutex); + // Prevent the producer from pushing any new packet + sink->stopped = true; + // Discard pending packets + sc_stream_sink_queue_clear(&sink->video_queue); + sc_stream_sink_queue_clear(&sink->audio_queue); + sc_mutex_unlock(&sink->mutex); + + if (sink->ctx->pb) { + avio_close(sink->ctx->pb); + sink->ctx->pb = NULL; + } + + if (ok) { + LOGI("Stream sink complete"); + } else { + LOGE("Stream sink failed"); + } + + LOGD("Stream sink thread ended"); + + return 0; +} + +static void +sc_stream_sink_stream_init(struct sc_stream_sink_stream *stream) { + stream->index = -1; + stream->last_pts = AV_NOPTS_VALUE; +} + +static bool +sc_stream_sink_video_packet_sink_open(struct sc_packet_sink *sink, + AVCodecContext *ctx) { + struct sc_stream_sink *ss = DOWNCAST_VIDEO(sink); + // only written from this thread, no need to lock + assert(!ss->video_init); + + sc_mutex_lock(&ss->mutex); + if (ss->stopped) { + sc_mutex_unlock(&ss->mutex); + return false; + } + + AVStream *stream = avformat_new_stream(ss->ctx, ctx->codec); + if (!stream) { + sc_mutex_unlock(&ss->mutex); + return false; + } + + int r = avcodec_parameters_from_context(stream->codecpar, ctx); + if (r < 0) { + sc_mutex_unlock(&ss->mutex); + return false; + } + + ss->video_stream.index = stream->index; + + ss->video_init = true; + sc_cond_signal(&ss->cond); + sc_mutex_unlock(&ss->mutex); + + return true; +} + +static void +sc_stream_sink_video_packet_sink_close(struct sc_packet_sink *sink) { + struct sc_stream_sink *ss = DOWNCAST_VIDEO(sink); + // only written from this thread, no need to lock + assert(ss->video_init); + + sc_mutex_lock(&ss->mutex); + // EOS also stops the stream sink + ss->stopped = true; + sc_cond_signal(&ss->cond); + sc_mutex_unlock(&ss->mutex); +} + +static bool +sc_stream_sink_video_packet_sink_push(struct sc_packet_sink *sink, + const AVPacket *packet) { + struct sc_stream_sink *ss = DOWNCAST_VIDEO(sink); + // only written from this thread, no need to lock + assert(ss->video_init); + + sc_mutex_lock(&ss->mutex); + + if (ss->stopped) { + // reject any new packet + sc_mutex_unlock(&ss->mutex); + return false; + } + + AVPacket *p = sc_stream_sink_packet_ref(packet); + if (!p) { + LOG_OOM(); + sc_mutex_unlock(&ss->mutex); + return false; + } + + p->stream_index = ss->video_stream.index; + + bool ok = sc_vecdeque_push(&ss->video_queue, p); + if (!ok) { + LOG_OOM(); + av_packet_free(&p); + sc_mutex_unlock(&ss->mutex); + return false; + } + + sc_cond_signal(&ss->cond); + + sc_mutex_unlock(&ss->mutex); + return true; +} + +static bool +sc_stream_sink_audio_packet_sink_open(struct sc_packet_sink *sink, + AVCodecContext *ctx) { + struct sc_stream_sink *ss = DOWNCAST_AUDIO(sink); + assert(ss->audio); + // only written from this thread, no need to lock + assert(!ss->audio_init); + + sc_mutex_lock(&ss->mutex); + + AVStream *stream = avformat_new_stream(ss->ctx, ctx->codec); + if (!stream) { + sc_mutex_unlock(&ss->mutex); + return false; + } + + int r = avcodec_parameters_from_context(stream->codecpar, ctx); + if (r < 0) { + sc_mutex_unlock(&ss->mutex); + return false; + } + + ss->audio_stream.index = stream->index; + + // A config packet is provided for all supported formats except raw audio + ss->audio_expects_config_packet = + ctx->codec_id != AV_CODEC_ID_PCM_S16LE; + + ss->audio_init = true; + sc_cond_signal(&ss->cond); + sc_mutex_unlock(&ss->mutex); + + return true; +} + +static void +sc_stream_sink_audio_packet_sink_close(struct sc_packet_sink *sink) { + struct sc_stream_sink *ss = DOWNCAST_AUDIO(sink); + assert(ss->audio); + // only written from this thread, no need to lock + assert(ss->audio_init); + + sc_mutex_lock(&ss->mutex); + // EOS also stops the stream sink + ss->stopped = true; + sc_cond_signal(&ss->cond); + sc_mutex_unlock(&ss->mutex); +} + +static bool +sc_stream_sink_audio_packet_sink_push(struct sc_packet_sink *sink, + const AVPacket *packet) { + struct sc_stream_sink *ss = DOWNCAST_AUDIO(sink); + assert(ss->audio); + // only written from this thread, no need to lock + assert(ss->audio_init); + + sc_mutex_lock(&ss->mutex); + + if (ss->stopped) { + // reject any new packet + sc_mutex_unlock(&ss->mutex); + return false; + } + + AVPacket *p = sc_stream_sink_packet_ref(packet); + if (!p) { + LOG_OOM(); + sc_mutex_unlock(&ss->mutex); + return false; + } + + p->stream_index = ss->audio_stream.index; + + bool ok = sc_vecdeque_push(&ss->audio_queue, p); + if (!ok) { + LOG_OOM(); + av_packet_free(&p); + sc_mutex_unlock(&ss->mutex); + return false; + } + + sc_cond_signal(&ss->cond); + + sc_mutex_unlock(&ss->mutex); + return true; +} + +static void +sc_stream_sink_audio_packet_sink_disable(struct sc_packet_sink *sink) { + struct sc_stream_sink *ss = DOWNCAST_AUDIO(sink); + assert(ss->audio); + // only written from this thread, no need to lock + assert(!ss->audio_init); + + LOGW("Audio stream disabled for stream sink"); + + sc_mutex_lock(&ss->mutex); + ss->audio = false; + ss->audio_init = true; + sc_cond_signal(&ss->cond); + sc_mutex_unlock(&ss->mutex); +} + +bool +sc_stream_sink_init(struct sc_stream_sink *sink, uint16_t port, + bool video, bool audio) { + assert(video || audio); + + sink->port = port; + sink->video = video; + sink->audio = audio; + + bool ok = sc_mutex_init(&sink->mutex); + if (!ok) { + return false; + } + + ok = sc_cond_init(&sink->cond); + if (!ok) { + goto error_mutex_destroy; + } + + sink->stopped = false; + + sc_vecdeque_init(&sink->video_queue); + sc_vecdeque_init(&sink->audio_queue); + + sink->video_init = false; + sink->audio_init = false; + + sink->audio_expects_config_packet = false; + + sc_stream_sink_stream_init(&sink->video_stream); + sc_stream_sink_stream_init(&sink->audio_stream); + + // Allocate the output format context with mpegts (ideal for TCP streaming) + const AVOutputFormat *oformat = av_guess_format("mpegts", NULL, NULL); + if (!oformat) { + LOGE("Could not find mpegts muxer"); + goto error_cond_destroy; + } + + sink->ctx = avformat_alloc_context(); + if (!sink->ctx) { + LOG_OOM(); + goto error_cond_destroy; + } + + // contrary to the deprecated API (av_oformat_next()), av_muxer_iterate() + // returns (on purpose) a pointer-to-const, but AVFormatContext.oformat + // still expects a pointer-to-non-const (it has not been updated accordingly) + // + sink->ctx->oformat = (AVOutputFormat *) oformat; + + if (video) { + static const struct sc_packet_sink_ops video_ops = { + .open = sc_stream_sink_video_packet_sink_open, + .close = sc_stream_sink_video_packet_sink_close, + .push = sc_stream_sink_video_packet_sink_push, + }; + + sink->video_packet_sink.ops = &video_ops; + } + + if (audio) { + static const struct sc_packet_sink_ops audio_ops = { + .open = sc_stream_sink_audio_packet_sink_open, + .close = sc_stream_sink_audio_packet_sink_close, + .push = sc_stream_sink_audio_packet_sink_push, + .disable = sc_stream_sink_audio_packet_sink_disable, + }; + + sink->audio_packet_sink.ops = &audio_ops; + } + + return true; + +error_cond_destroy: + sc_cond_destroy(&sink->cond); +error_mutex_destroy: + sc_mutex_destroy(&sink->mutex); + + return false; +} + +bool +sc_stream_sink_start(struct sc_stream_sink *sink) { + bool ok = sc_thread_create(&sink->thread, run_stream_sink, + "scrcpy-stream-sink", sink); + if (!ok) { + LOGE("Could not start stream sink thread"); + return false; + } + + return true; +} + +void +sc_stream_sink_stop(struct sc_stream_sink *sink) { + sc_mutex_lock(&sink->mutex); + sink->stopped = true; + sc_cond_signal(&sink->cond); + sc_mutex_unlock(&sink->mutex); +} + +void +sc_stream_sink_join(struct sc_stream_sink *sink) { + sc_thread_join(&sink->thread, NULL); +} + +void +sc_stream_sink_destroy(struct sc_stream_sink *sink) { + sc_cond_destroy(&sink->cond); + sc_mutex_destroy(&sink->mutex); + avformat_free_context(sink->ctx); +} diff --git a/app/src/stream_sink.h b/app/src/stream_sink.h new file mode 100644 index 00000000..f07f0ea4 --- /dev/null +++ b/app/src/stream_sink.h @@ -0,0 +1,75 @@ +#ifndef SC_STREAM_SINK_H +#define SC_STREAM_SINK_H + +#include "common.h" + +#include +#include +#include +#include + +#include "trait/packet_sink.h" +#include "util/thread.h" +#include "util/vecdeque.h" + +struct sc_stream_sink_queue SC_VECDEQUE(AVPacket *); + +struct sc_stream_sink_stream { + int index; + int64_t last_pts; +}; + +struct sc_stream_sink { + struct sc_packet_sink video_packet_sink; + struct sc_packet_sink audio_packet_sink; + + /* The audio flag is unprotected: + * - it is initialized from sc_stream_sink_init() from the main thread; + * - it may be reset once from the stream sink thread if the audio is + * disabled dynamically. + * + * Therefore, once the stream sink thread is started, only the stream sink + * thread may access it without data races. + */ + bool audio; + bool video; + + uint16_t port; + + AVFormatContext *ctx; + + sc_thread thread; + sc_mutex mutex; + sc_cond cond; + // set on sc_stream_sink_stop(), packet_sink close or streaming failure + bool stopped; + struct sc_stream_sink_queue video_queue; + struct sc_stream_sink_queue audio_queue; + + // wake up the stream sink thread once the video or audio codec is known + bool video_init; + bool audio_init; + + bool audio_expects_config_packet; + + struct sc_stream_sink_stream video_stream; + struct sc_stream_sink_stream audio_stream; +}; + +bool +sc_stream_sink_init(struct sc_stream_sink *sink, uint16_t port, + bool video, bool audio); + +bool +sc_stream_sink_start(struct sc_stream_sink *sink); + +void +sc_stream_sink_stop(struct sc_stream_sink *sink); + +void +sc_stream_sink_join(struct sc_stream_sink *sink); + +void +sc_stream_sink_destroy(struct sc_stream_sink *sink); + +#endif diff --git a/app/src/trait/packet_source.h b/app/src/trait/packet_source.h index 8788021a..71fb769e 100644 --- a/app/src/trait/packet_source.h +++ b/app/src/trait/packet_source.h @@ -7,7 +7,7 @@ #include "trait/packet_sink.h" -#define SC_PACKET_SOURCE_MAX_SINKS 2 +#define SC_PACKET_SOURCE_MAX_SINKS 3 /** * Packet source trait