From 516354da03b76e8a47d7b885cec4955067c3f9aa Mon Sep 17 00:00:00 2001 From: capriots <29807355+capriots@users.noreply.github.com> Date: Sun, 8 Mar 2026 11:04:32 +0100 Subject: [PATCH] cellDmux implementation --- rpcs3/Emu/Cell/Modules/cellDmux.cpp | 2271 +++++++++++++-------------- rpcs3/Emu/Cell/Modules/cellDmux.h | 122 +- rpcs3/Emu/Cell/lv2/sys_prx.cpp | 2 +- 3 files changed, 1163 insertions(+), 1232 deletions(-) diff --git a/rpcs3/Emu/Cell/Modules/cellDmux.cpp b/rpcs3/Emu/Cell/Modules/cellDmux.cpp index fb1f32837d..4f51539e8e 100644 --- a/rpcs3/Emu/Cell/Modules/cellDmux.cpp +++ b/rpcs3/Emu/Cell/Modules/cellDmux.cpp @@ -1,16 +1,14 @@ #include "stdafx.h" -#include "Emu/System.h" -#include "Emu/IdManager.h" -#include "Emu/Cell/PPUModule.h" +#include "Emu/Cell/lv2/sys_mutex.h" #include "Emu/Cell/lv2/sys_sync.h" +#include "Emu/Cell/lv2/sys_timer.h" +#include "Emu/Cell/PPUModule.h" +#include "Emu/savestate_utils.hpp" +#include "util/asm.hpp" #include "cellPamf.h" #include "cellDmux.h" -#include "util/asm.hpp" - -#include - LOG_CHANNEL(cellDmux); template <> @@ -31,1331 +29,1197 @@ void fmt_class_string::format(std::string& out, u64 arg) }); } -/* Demuxer Thread Classes */ - -enum +static error_code get_error(u32 internal_error) { - /* http://dvd.sourceforge.net/dvdinfo/mpeghdrs.html */ - - PACKET_START_CODE_MASK = 0xffffff00, - PACKET_START_CODE_PREFIX = 0x00000100, - - PACK_START_CODE = 0x000001ba, - SYSTEM_HEADER_START_CODE = 0x000001bb, - PRIVATE_STREAM_1 = 0x000001bd, - PADDING_STREAM = 0x000001be, - PRIVATE_STREAM_2 = 0x000001bf, -}; - -struct DemuxerStream -{ - u32 addr; - u32 size; - u64 userdata; - bool discontinuity; - - template - bool get(T& out) + switch (internal_error) { - if (sizeof(T) > size) return false; + case 0: return CELL_OK; + case 1: return CELL_DMUX_ERROR_FATAL; + case 2: // Error values two to five are all converted to CELL_DMUX_ERROR_ARG. + case 3: + case 4: + case 5: return CELL_DMUX_ERROR_ARG; + default: return CELL_DMUX_ERROR_FATAL; + } +} - std::memcpy(&out, vm::base(addr), sizeof(T)); - addr += sizeof(T); - size -= sizeof(T); +static inline std::span> get_es_handles(vm::ptr handle) +{ + return { vm::pptr::make(handle.addr() + sizeof(DmuxContext)).get_ptr(), static_cast(handle->enabled_es_num) }; +} - return true; +static inline vm::ptr get_au_queue_elements(vm::ptr es_handle) +{ + return vm::ptr::make(es_handle.addr() + sizeof(DmuxEsContext)); +} + +static inline vm::cptr get_core_ops() +{ + return vm::cptr::make(*ppu_module_manager::cellDmuxPamf.variables.find(0x28b2b7b2)->second.export_addr); +} + + +// Callbacks for cellDmuxPamf + +static error_code notify_demux_done(ppu_thread& ppu, vm::ptr core_handle, u32 error, vm::ptr handle) +{ + // Blocking savestate creation due to ppu_thread::fast_call() + const std::unique_lock savestate_lock{ g_fxo->get(), std::try_to_lock }; + + if (!savestate_lock) + { + ppu.state += cpu_flag::again; + return {}; } - template - bool peek(T& out, u32 shift = 0) - { - if (sizeof(T) + shift > size) return false; + cellDmux.trace("notify_demux_done(core_handle=*0x%x, error=%d, handle=*0x%x)", core_handle, error, handle); - std::memcpy(&out, vm::base(addr + shift), sizeof(T)); - return true; + ensure(!!handle); // Not checked on LLE + + ensure(sys_mutex_lock(ppu, handle->_dx_mhd, 0) == CELL_OK); // Failing this check on LLE would result in it dereferencing an invalid pointer. + handle->dmux_state = DMUX_STOPPED; + ensure(sys_mutex_unlock(ppu, handle->_dx_mhd) == CELL_OK); // Failing this check on LLE would result in it dereferencing an invalid pointer. + + if (handle->_this) + { + const vm::var msg{{ .msgType = CELL_DMUX_MSG_TYPE_DEMUX_DONE, .supplementalInfo = handle->user_data }}; + handle->dmux_cb.cbFunc(ppu, handle, msg, handle->dmux_cb.cbArg); } - void skip(u32 count) - { - addr += count; - size = size > count ? size - count : 0; - } + return CELL_OK; +} - bool check(u32 count) const - { - return count <= size; - } - - u64 get_ts(u8 c) - { - u8 v[4]; get(v); - return - ((u64{c} & 0x0e) << 29) | - ((u64{v[0]}) << 21) | - ((u64{v[1]} & 0x7e) << 15) | - ((u64{v[2]}) << 7) | (u64{v[3]} >> 1); - } -}; - -struct PesHeader +static error_code notify_fatal_err(ppu_thread& ppu, vm::ptr core_handle, u32 error, vm::ptr handle) { - u64 pts; - u64 dts; - u8 size; - bool has_ts; - bool is_ok; + // Blocking savestate creation due to ppu_thread::fast_call() + const std::unique_lock savestate_lock{ g_fxo->get(), std::try_to_lock }; - PesHeader(DemuxerStream& stream); -}; - -class ElementaryStream; -class Demuxer; - -enum DemuxerJobType -{ - dmuxSetStream, - dmuxResetStream, - dmuxResetStreamAndWaitDone, - dmuxEnableEs, - dmuxDisableEs, - dmuxResetEs, - dmuxFlushEs, - dmuxClose, -}; - -struct DemuxerTask -{ - DemuxerJobType type; - - union + if (!savestate_lock) { - DemuxerStream stream; + ppu.state += cpu_flag::again; + return {}; + } - struct + cellDmux.error("notify_fatal_err(core_handle=*0x%x, error=%d, handle=*0x%x)", core_handle, error, handle); + + ensure(!!handle); // Not checked on LLE + + const vm::var msg{{ .msgType = CELL_DMUX_MSG_TYPE_FATAL_ERR, .supplementalInfo = static_cast(get_error(error)) }}; + return handle->dmux_cb.cbFunc(ppu, handle, msg, handle->dmux_cb.cbArg); +} + +static error_code notify_prog_end_code(ppu_thread& ppu, vm::ptr core_handle, vm::ptr handle) +{ + // Blocking savestate creation due to ppu_thread::fast_call() + const std::unique_lock savestate_lock{ g_fxo->get(), std::try_to_lock }; + + if (!savestate_lock) + { + ppu.state += cpu_flag::again; + return {}; + } + + cellDmux.notice("notify_prog_end_code(core_handle=*0x%x, handle=*0x%x)", core_handle, handle); + + ensure(!!handle); // Not checked on LLE + + if (handle->_this) + { + const vm::var msg{{ .msgType = CELL_DMUX_MSG_TYPE_PROG_END_CODE, .supplementalInfo = handle->user_data }}; + handle->dmux_cb.cbFunc(ppu, handle, msg, handle->dmux_cb.cbArg); + } + + return CELL_OK; +} + +static error_code notify_es_au_found(ppu_thread& ppu, vm::ptr core_es_handle, vm::cptr au_info, vm::ptr es_handle) +{ + // Blocking savestate creation due to ppu_thread::fast_call() + const std::unique_lock savestate_lock{ g_fxo->get(), std::try_to_lock }; + + if (!savestate_lock) + { + ppu.state += cpu_flag::again; + return {}; + } + + cellDmux.trace("notify_es_au_found(core_es_handle=*0x%x, au_info=*0x%x, es_handle=*0x%x)", core_es_handle, au_info, es_handle); + + ensure(!!au_info && !!es_handle); // Not checked on LLE + + const auto fatal_err = [&](be_t es_is_enabled, error_code ret) + { + if (es_is_enabled) { - u32 es; - u32 auInfo_ptr_addr; - u32 auSpec_ptr_addr; - ElementaryStream* es_ptr; - } es; + const vm::var demuxerMsg{{ .msgType = CELL_DMUX_MSG_TYPE_FATAL_ERR, .supplementalInfo = static_cast(ret) }}; + es_handle->dmux_handle->dmux_cb.cbFunc(ppu, es_handle->dmux_handle, demuxerMsg, es_handle->dmux_handle->dmux_cb.cbArg); + } }; - DemuxerTask() - { - } - - DemuxerTask(DemuxerJobType type) - : type(type) - { - } -}; - -class ElementaryStream -{ - std::mutex m_mutex; - - squeue_t entries; // AU starting addresses - u32 put_count = 0; // number of AU written - u32 got_count = 0; // number of AU obtained by GetAu(Ex) - u32 released = 0; // number of AU released - - u32 put; // AU that is being written now - - bool is_full(u32 space); - -public: - static const u32 id_base = 1; - static const u32 id_step = 1; - static const u32 id_count = 1023; - SAVESTATE_INIT_POS(34); - - ElementaryStream(Demuxer* dmux, vm::ptr addr, u32 size, u32 fidMajor, u32 fidMinor, u32 sup1, u32 sup2, vm::ptr cbFunc, vm::ptr cbArg, u32 spec); - - Demuxer* dmux; - const u32 id = idm::last_id(); - const vm::ptr memAddr; - const u32 memSize; - const u32 fidMajor; - const u32 fidMinor; - const u32 sup1; - const u32 sup2; - const vm::ptr cbFunc; - const vm::ptr cbArg; - const u32 spec; //addr - - std::vector raw_data; // demultiplexed data stream (managed by demuxer thread) - usz raw_pos = 0; // should be <= raw_data.size() - u64 last_dts = CODEC_TS_INVALID; - u64 last_pts = CODEC_TS_INVALID; - - void push(DemuxerStream& stream, u32 size); // called by demuxer thread (not multithread-safe) - - bool isfull(u32 space); - - void push_au(u32 size, u64 dts, u64 pts, u64 userdata, bool rap, u32 specific); - - bool release(); - - bool peek(u32& out_data, bool no_ex, u32& out_spec, bool update_index); - - void reset(); -}; - -class Demuxer : public ppu_thread -{ -public: - squeue_t job; - const u32 memAddr; - const u32 memSize; - const vm::ptr cbFunc; - const vm::ptr cbArg; - volatile bool is_finished = false; - volatile bool is_closed = false; - atomic_t is_running = false; - atomic_t is_working = false; - - Demuxer(u32 addr, u32 size, vm::ptr func, vm::ptr arg) - : ppu_thread({}, "", 0) - , memAddr(addr) - , memSize(size) - , cbFunc(func) - , cbArg(arg) - { - } - - void non_task() - { - DemuxerTask task; - DemuxerStream stream = {}; - ElementaryStream* esALL[96]{}; - ElementaryStream** esAVC = &esALL[0]; // AVC (max 16 minus M2V count) - //ElementaryStream** esM2V = &esALL[16]; // M2V (max 16 minus AVC count) - //ElementaryStream** esDATA = &esALL[32]; // user data (max 16) - ElementaryStream** esATX = &esALL[48]; // ATRAC3+ (max 16) - //ElementaryStream** esAC3 = &esALL[64]; // AC3 (max 16) - //ElementaryStream** esPCM = &esALL[80]; // LPCM (max 16) - - u32 cb_add = 0; - - while (true) - { - if (Emu.IsStopped() || is_closed) - { - break; - } - - if (!job.try_peek(task) && is_running && stream.addr) - { - // default task (demuxing) (if there is no other work) - be_t code; - be_t len; - - if (!stream.peek(code)) - { - // demuxing finished - is_running = false; - - // callback - auto dmuxMsg = vm::ptr::make(memAddr + (cb_add ^= 16)); - dmuxMsg->msgType = CELL_DMUX_MSG_TYPE_DEMUX_DONE; - dmuxMsg->supplementalInfo = stream.userdata; - cbFunc(*this, id, dmuxMsg, cbArg); - lv2_obj::sleep(*this); - - is_working = false; - - stream = {}; - - continue; - } - - switch (code) - { - case PACK_START_CODE: - { - if (!stream.check(14)) - { - fmt::throw_exception("End of stream (PACK_START_CODE)"); - } - stream.skip(14); - break; - } - - case SYSTEM_HEADER_START_CODE: - { - if (!stream.check(18)) - { - fmt::throw_exception("End of stream (SYSTEM_HEADER_START_CODE)"); - } - stream.skip(18); - break; - } - - case PADDING_STREAM: - { - if (!stream.check(6)) - { - fmt::throw_exception("End of stream (PADDING_STREAM)"); - } - stream.skip(4); - stream.get(len); - - if (!stream.check(len)) - { - fmt::throw_exception("End of stream (PADDING_STREAM, len=%d)", len); - } - stream.skip(len); - break; - } - - case PRIVATE_STREAM_2: - { - if (!stream.check(6)) - { - fmt::throw_exception("End of stream (PRIVATE_STREAM_2)"); - } - stream.skip(4); - stream.get(len); - - cellDmux.notice("PRIVATE_STREAM_2 (%d)", len); - - if (!stream.check(len)) - { - fmt::throw_exception("End of stream (PRIVATE_STREAM_2, len=%d)", len); - } - stream.skip(len); - break; - } - - case PRIVATE_STREAM_1: - { - // audio and user data stream - DemuxerStream backup = stream; - - if (!stream.check(6)) - { - fmt::throw_exception("End of stream (PRIVATE_STREAM_1)"); - } - stream.skip(4); - stream.get(len); - - if (!stream.check(len)) - { - fmt::throw_exception("End of stream (PRIVATE_STREAM_1, len=%d)", len); - } - - const PesHeader pes(stream); - if (!pes.is_ok) - { - fmt::throw_exception("PesHeader error (PRIVATE_STREAM_1, len=%d)", len); - } - - if (len < pes.size + 4) - { - fmt::throw_exception("End of block (PRIVATE_STREAM_1, PesHeader + fid_minor, len=%d)", len); - } - len -= pes.size + 4; - - u8 fid_minor; - if (!stream.get(fid_minor)) - { - fmt::throw_exception("End of stream (PRIVATE_STREAM1, fid_minor)"); - } - - const u32 ch = fid_minor % 16; - if ((fid_minor & -0x10) == 0 && esATX[ch]) - { - ElementaryStream& es = *esATX[ch]; - if (es.raw_data.size() > 1024 * 1024) - { - stream = backup; - std::this_thread::sleep_for(1ms); // hack - continue; - } - - if (len < 3 || !stream.check(3)) - { - fmt::throw_exception("End of block (ATX, unknown header, len=%d)", len); - } - len -= 3; - stream.skip(3); - - if (pes.has_ts) - { - es.last_dts = pes.dts; - es.last_pts = pes.pts; - } - - es.push(stream, len); - - while (true) - { - auto const size = es.raw_data.size() - es.raw_pos; // size of available new data - auto const data = es.raw_data.data() + es.raw_pos; // pointer to available data - - if (size < 8) break; // skip if cannot read ATS header - - if (data[0] != 0x0f || data[1] != 0xd0) - { - fmt::throw_exception("ATX: 0x0fd0 header not found (ats=0x%llx)", *reinterpret_cast*>(data)); - } - - u32 frame_size = (((u32{data[2]} & 0x3) << 8) | u32{data[3]}) * 8 + 8; - - if (size < frame_size + 8) break; // skip non-complete AU - - if (es.isfull(frame_size + 8)) break; // skip if cannot push AU - - es.push_au(frame_size + 8, es.last_dts, es.last_pts, stream.userdata, false /* TODO: set correct value */, 0); - - //cellDmux.notice("ATX AU pushed (ats=0x%llx, frame_size=%d)", *(be_t*)data, frame_size); - - auto esMsg = vm::ptr::make(memAddr + (cb_add ^= 16)); - esMsg->msgType = CELL_DMUX_ES_MSG_TYPE_AU_FOUND; - esMsg->supplementalInfo = stream.userdata; - es.cbFunc(*this, id, es.id, esMsg, es.cbArg); - lv2_obj::sleep(*this); - } - } - else - { - cellDmux.notice("PRIVATE_STREAM_1 (len=%d, fid_minor=0x%x)", len, fid_minor); - stream.skip(len); - } - break; - } - - case 0x1e0: case 0x1e1: case 0x1e2: case 0x1e3: - case 0x1e4: case 0x1e5: case 0x1e6: case 0x1e7: - case 0x1e8: case 0x1e9: case 0x1ea: case 0x1eb: - case 0x1ec: case 0x1ed: case 0x1ee: case 0x1ef: - { - // video stream (AVC or M2V) - DemuxerStream backup = stream; - - if (!stream.check(6)) - { - fmt::throw_exception("End of stream (video, code=0x%x)", code); - } - stream.skip(4); - stream.get(len); - - if (!stream.check(len)) - { - fmt::throw_exception("End of stream (video, code=0x%x, len=%d)", code, len); - } - - const PesHeader pes(stream); - if (!pes.is_ok) - { - fmt::throw_exception("PesHeader error (video, code=0x%x, len=%d)", code, len); - } - - if (len < pes.size + 3) - { - fmt::throw_exception("End of block (video, code=0x%x, PesHeader)", code); - } - len -= pes.size + 3; - - const u32 ch = code % 16; - if (esAVC[ch]) - { - ElementaryStream& es = *esAVC[ch]; - - const u32 old_size = ::size32(es.raw_data); - if (es.isfull(old_size)) - { - stream = backup; - std::this_thread::sleep_for(1ms); // hack - continue; - } - - if ((pes.has_ts && old_size) || old_size >= 0x69800) - { - // push AU if it becomes too big or the next packet contains PTS/DTS - es.push_au(old_size, es.last_dts, es.last_pts, stream.userdata, false /* TODO: set correct value */, 0); - - // callback - auto esMsg = vm::ptr::make(memAddr + (cb_add ^= 16)); - esMsg->msgType = CELL_DMUX_ES_MSG_TYPE_AU_FOUND; - esMsg->supplementalInfo = stream.userdata; - es.cbFunc(*this, id, es.id, esMsg, es.cbArg); - lv2_obj::sleep(*this); - } - - if (pes.has_ts) - { - // preserve dts/pts for next AU - es.last_dts = pes.dts; - es.last_pts = pes.pts; - } - - // reconstruction of MPEG2-PS stream for vdec module - const u32 size = len + pes.size + 9; - stream = backup; - es.push(stream, size); - } - else - { - cellDmux.notice("Video stream (code=0x%x, len=%d)", code, len); - stream.skip(len); - } - break; - } - - default: - { - if ((code & PACKET_START_CODE_MASK) == PACKET_START_CODE_PREFIX) - { - fmt::throw_exception("Unknown code found (0x%x)", code); - } - - // search - stream.skip(1); - } - } - - continue; - } - - // wait for task if no work - if (!job.pop(task, &is_closed)) - { - break; // Emu is stopped - } - - switch (task.type) - { - case dmuxSetStream: - { - if (task.stream.discontinuity) - { - cellDmux.warning("dmuxSetStream (beginning)"); - for (u32 i = 0; i < std::size(esALL); i++) - { - if (esALL[i]) - { - esALL[i]->reset(); - } - } - } - - stream = task.stream; - //cellDmux.notice("*** stream updated(addr=0x%x, size=0x%x, discont=%d, userdata=0x%llx)", - //stream.addr, stream.size, stream.discontinuity, stream.userdata); - break; - } - - case dmuxResetStream: - case dmuxResetStreamAndWaitDone: - { - // demuxing stopped - if (is_running.exchange(false)) - { - // callback - auto dmuxMsg = vm::ptr::make(memAddr + (cb_add ^= 16)); - dmuxMsg->msgType = CELL_DMUX_MSG_TYPE_DEMUX_DONE; - dmuxMsg->supplementalInfo = stream.userdata; - cbFunc(*this, id, dmuxMsg, cbArg); - lv2_obj::sleep(*this); - - stream = {}; - - is_working = false; - } - - break; - } - - case dmuxEnableEs: - { - ElementaryStream& es = *task.es.es_ptr; - - // TODO: uncomment when ready to use - //if ((es.fidMajor & -0x10) == 0xe0 && es.fidMinor == 0 && es.sup1 == 1 && !es.sup2) - //{ - // esAVC[es.fidMajor % 16] = task.es.es_ptr; - //} - //else if ((es.fidMajor & -0x10) == 0xe0 && es.fidMinor == 0 && !es.sup1 && !es.sup2) - //{ - // esM2V[es.fidMajor % 16] = task.es.es_ptr; - //} - //else if (es.fidMajor == 0xbd && (es.fidMinor & -0x10) == 0 && !es.sup1 && !es.sup2) - //{ - // esATX[es.fidMinor % 16] = task.es.es_ptr; - //} - //else if (es.fidMajor == 0xbd && (es.fidMinor & -0x10) == 0x20 && !es.sup1 && !es.sup2) - //{ - // esDATA[es.fidMinor % 16] = task.es.es_ptr; - //} - //else if (es.fidMajor == 0xbd && (es.fidMinor & -0x10) == 0x30 && !es.sup1 && !es.sup2) - //{ - // esAC3[es.fidMinor % 16] = task.es.es_ptr; - //} - //else if (es.fidMajor == 0xbd && (es.fidMinor & -0x10) == 0x40 && !es.sup1 && !es.sup2) - //{ - // esPCM[es.fidMinor % 16] = task.es.es_ptr; - //} - //else - { - fmt::throw_exception("dmuxEnableEs: unknown filter (0x%x, 0x%x, 0x%x, 0x%x)", es.fidMajor, es.fidMinor, es.sup1, es.sup2); - } - es.dmux = this; - break; - } - - case dmuxDisableEs: - { - ElementaryStream& es = *task.es.es_ptr; - if (es.dmux != this) - { - fmt::throw_exception("dmuxDisableEs: invalid elementary stream"); - } - - for (u32 i = 0; i < std::size(esALL); i++) - { - if (esALL[i] == &es) - { - esALL[i] = nullptr; - } - } - es.dmux = nullptr; - idm::remove(task.es.es); - break; - } - - case dmuxFlushEs: - { - ElementaryStream& es = *task.es.es_ptr; - - const u32 old_size = ::size32(es.raw_data); - if (old_size && (es.fidMajor & -0x10) == 0xe0) - { - // TODO (it's only for AVC, some ATX data may be lost) - while (es.isfull(old_size)) - { - if (Emu.IsStopped() || is_closed) break; - - std::this_thread::sleep_for(1ms); // hack - } - - es.push_au(old_size, es.last_dts, es.last_pts, stream.userdata, false, 0); - - // callback - auto esMsg = vm::ptr::make(memAddr + (cb_add ^= 16)); - esMsg->msgType = CELL_DMUX_ES_MSG_TYPE_AU_FOUND; - esMsg->supplementalInfo = stream.userdata; - es.cbFunc(*this, id, es.id, esMsg, es.cbArg); - lv2_obj::sleep(*this); - } - - if (!es.raw_data.empty()) - { - cellDmux.error("dmuxFlushEs: 0x%x bytes lost (es_id=%d)", ::size32(es.raw_data), es.id); - } - - // callback - auto esMsg = vm::ptr::make(memAddr + (cb_add ^= 16)); - esMsg->msgType = CELL_DMUX_ES_MSG_TYPE_FLUSH_DONE; - esMsg->supplementalInfo = stream.userdata; - es.cbFunc(*this, id, es.id, esMsg, es.cbArg); - lv2_obj::sleep(*this); - break; - } - - case dmuxResetEs: - { - task.es.es_ptr->reset(); - break; - } - - case dmuxClose: - { - break; - } - - default: - { - fmt::throw_exception("Demuxer thread error: unknown task (0x%x)", +task.type); - } - } - } - - is_finished = true; - } -}; - - -PesHeader::PesHeader(DemuxerStream& stream) - : pts(CODEC_TS_INVALID) - , dts(CODEC_TS_INVALID) - , size(0) - , has_ts(false) - , is_ok(false) -{ - u16 header; - if (!stream.get(header)) - { - fmt::throw_exception("End of stream (header)"); - } - if (!stream.get(size)) - { - fmt::throw_exception("End of stream (size)"); - } - if (!stream.check(size)) - { - fmt::throw_exception("End of stream (size=%d)", size); - } - - u8 pos = 0; - while (pos++ < size) - { - u8 v; - if (!stream.get(v)) - { - return; // should never occur - } - - if (v == 0xff) // skip padding bytes - { - continue; - } - - if ((v & 0xf0) == 0x20 && (size - pos) >= 4) // pts only - { - pos += 4; - pts = stream.get_ts(v); - has_ts = true; - } - else if ((v & 0xf0) == 0x30 && (size - pos) >= 9) // pts and dts - { - pos += 5; - pts = stream.get_ts(v); - stream.get(v); - has_ts = true; - - if ((v & 0xf0) != 0x10) - { - cellDmux.error("PesHeader(): dts not found (v=0x%x, size=%d, pos=%d)", v, size, pos - 1); - stream.skip(size - pos); - return; - } - pos += 4; - dts = stream.get_ts(v); - } - else - { - cellDmux.warning("PesHeader(): unknown code (v=0x%x, size=%d, pos=%d)", v, size, pos - 1); - stream.skip(size - pos); - pos = size; - break; - } - } - - is_ok = true; -} - -ElementaryStream::ElementaryStream(Demuxer* dmux, vm::ptr addr, u32 size, u32 fidMajor, u32 fidMinor, u32 sup1, u32 sup2, vm::ptr cbFunc, vm::ptr cbArg, u32 spec) - : put(utils::align(addr.addr(), 128)) - , dmux(dmux) - , memAddr(vm::ptr::make(utils::align(addr.addr(), 128))) - , memSize(size - (addr.addr() - memAddr.addr())) - , fidMajor(fidMajor) - , fidMinor(fidMinor) - , sup1(sup1) - , sup2(sup2) - , cbFunc(cbFunc) - , cbArg(cbArg) - , spec(spec) -{ -} - -bool ElementaryStream::is_full(u32 space) -{ - if (released < put_count) - { - if (entries.is_full()) - { - return true; - } - - u32 first = 0; - if (!entries.peek(first, 0, &dmux->is_closed) || !first) - { - fmt::throw_exception("entries.peek() failed"); - } - else if (first >= put) - { - return first - put < space + 128; - } - else if (put + space + 128 > memAddr.addr() + memSize) - { - return first - memAddr.addr() < space + 128; - } - else - { - return false; - } - } - else - { - return false; - } -} - -bool ElementaryStream::isfull(u32 space) -{ - std::lock_guard lock(m_mutex); - return is_full(space); -} - -void ElementaryStream::push_au(u32 size, u64 dts, u64 pts, u64 userdata, bool rap, u32 specific) -{ - u32 addr; - { - std::lock_guard lock(m_mutex); - ensure(!is_full(size)); - - if (put + size + 128 > memAddr.addr() + memSize) - { - put = memAddr.addr(); - } - - std::memcpy(vm::base(put + 128), raw_data.data(), size); - raw_data.erase(raw_data.begin(), raw_data.begin() + size); - - auto info = vm::ptr::make(put); - info->auAddr.set(put + 128); - info->auSize = size; - info->dts.lower = static_cast(dts); - info->dts.upper = static_cast(dts >> 32); - info->pts.lower = static_cast(pts); - info->pts.upper = static_cast(pts >> 32); - info->isRap = rap; - info->auMaxSize = 0; - info->userData = userdata; - - auto spec = vm::ptr::make(put + u32{sizeof(CellDmuxAuInfoEx)}); - *spec = specific; - - auto inf = vm::ptr::make(put + 64); - inf->auAddr.set(put + 128); - inf->auSize = size; - inf->dts.lower = static_cast(dts); - inf->dts.upper = static_cast(dts >> 32); - inf->pts.lower = static_cast(pts); - inf->pts.upper = static_cast(pts >> 32); - inf->auMaxSize = 0; // ????? - inf->userData = userdata; - - addr = put; - - put = utils::align(put + 128 + size, 128); - - put_count++; - } - - ensure(entries.push(addr, &dmux->is_closed)); -} - -void ElementaryStream::push(DemuxerStream& stream, u32 size) -{ - auto const old_size = raw_data.size(); - - raw_data.resize(old_size + size); - - std::memcpy(raw_data.data() + old_size, vm::base(stream.addr), size); // append bytes - - stream.skip(size); -} - -bool ElementaryStream::release() -{ - std::lock_guard lock(m_mutex); - if (released >= put_count) - { - cellDmux.fatal("es::release() error: buffer is empty"); - return false; - } - if (released >= got_count) - { - cellDmux.fatal("es::release() error: buffer has not been seen yet"); - return false; - } - - u32 addr = 0; - if (!entries.pop(addr, &dmux->is_closed) || !addr) - { - cellDmux.fatal("es::release() error: entries.Pop() failed"); - return false; - } - - released++; - return true; -} - -bool ElementaryStream::peek(u32& out_data, bool no_ex, u32& out_spec, bool update_index) -{ - std::lock_guard lock(m_mutex); - if (got_count < released) - { - cellDmux.fatal("es::peek() error: got_count(%d) < released(%d) (put_count=%d)", got_count, released, put_count); - return false; - } - if (got_count >= put_count) - { - return false; - } - - u32 addr = 0; - if (!entries.peek(addr, got_count - released, &dmux->is_closed) || !addr) - { - cellDmux.fatal("es::peek() error: entries.Peek() failed"); - return false; - } - - out_data = no_ex ? addr + 64 : addr; - out_spec = addr + sizeof(CellDmuxAuInfoEx); - - if (update_index) - { - got_count++; - } - return true; -} - -void ElementaryStream::reset() -{ - std::lock_guard lock(m_mutex); - put = memAddr.addr(); - entries.clear(); - put_count = 0; - got_count = 0; - released = 0; - raw_data.clear(); - raw_pos = 0; -} - -void dmuxQueryAttr(u32 /* info_addr, may be 0 */, vm::ptr attr) -{ - attr->demuxerVerLower = 0x280000; // TODO: check values - attr->demuxerVerUpper = 0x260000; - attr->memSize = 0x10000; // 0x3e8e6 from ps3 -} - -void dmuxQueryEsAttr(u32 /* info, may be 0 */, vm::cptr esFilterId, u32 /*esSpecificInfo*/, vm::ptr attr) -{ - if (esFilterId->filterIdMajor >= 0xe0) - { - attr->memSize = 0x500000; // 0x45fa49 from ps3 - } - else - { - attr->memSize = 0x7000; // 0x73d9 from ps3 - } - - cellDmux.warning("*** filter(0x%x, 0x%x, 0x%x, 0x%x)", esFilterId->filterIdMajor, esFilterId->filterIdMinor, esFilterId->supplementalInfo1, esFilterId->supplementalInfo2); -} - -error_code cellDmuxQueryAttr(vm::cptr type, vm::ptr attr) -{ - cellDmux.warning("cellDmuxQueryAttr(type=*0x%x, attr=*0x%x)", type, attr); - - if (type->streamType != CELL_DMUX_STREAM_TYPE_PAMF) - { - return CELL_DMUX_ERROR_ARG; - } - - dmuxQueryAttr(0, attr); - return CELL_OK; -} - -error_code cellDmuxQueryAttr2(vm::cptr type2, vm::ptr attr) -{ - cellDmux.warning("cellDmuxQueryAttr2(demuxerType2=*0x%x, demuxerAttr=*0x%x)", type2, attr); - - if (type2->streamType != CELL_DMUX_STREAM_TYPE_PAMF) - { - return CELL_DMUX_ERROR_ARG; - } - - dmuxQueryAttr(type2->streamSpecificInfo, attr); - return CELL_OK; -} - -error_code cellDmuxOpen(vm::cptr type, vm::cptr res, vm::cptr cb, vm::ptr handle) -{ - cellDmux.warning("cellDmuxOpen(type=*0x%x, res=*0x%x, cb=*0x%x, handle=*0x%x)", type, res, cb, handle); - - if (type->streamType != CELL_DMUX_STREAM_TYPE_PAMF) - { - return CELL_DMUX_ERROR_ARG; - } - - // TODO: check demuxerResource and demuxerCb arguments - fmt::throw_exception("cellDmux disabled, use LLE."); -} - -error_code cellDmuxOpenEx(vm::cptr type, vm::cptr resEx, vm::cptr cb, vm::ptr handle) -{ - cellDmux.warning("cellDmuxOpenEx(type=*0x%x, resEx=*0x%x, cb=*0x%x, handle=*0x%x)", type, resEx, cb, handle); - - if (type->streamType != CELL_DMUX_STREAM_TYPE_PAMF) - { - return CELL_DMUX_ERROR_ARG; - } - - // TODO: check demuxerResourceEx and demuxerCb arguments - fmt::throw_exception("cellDmux disabled, use LLE."); -} - -error_code cellDmuxOpenExt(vm::cptr type, vm::cptr resEx, vm::cptr cb, vm::ptr handle) -{ - cellDmux.warning("cellDmuxOpenExt(type=*0x%x, resEx=*0x%x, cb=*0x%x, handle=*0x%x)", type, resEx, cb, handle); - - return cellDmuxOpenEx(type, resEx, cb, handle); -} - -error_code cellDmuxOpen2(vm::cptr type2, vm::cptr res2, vm::cptr cb, vm::ptr handle) -{ - cellDmux.warning("cellDmuxOpen2(type2=*0x%x, res2=*0x%x, cb=*0x%x, handle=*0x%x)", type2, res2, cb, handle); - - if (type2->streamType != CELL_DMUX_STREAM_TYPE_PAMF) - { - return CELL_DMUX_ERROR_ARG; - } - - // TODO: check demuxerType2, demuxerResource2 and demuxerCb arguments - fmt::throw_exception("cellDmux disabled, use LLE."); -} - -error_code cellDmuxClose(u32 handle) -{ - cellDmux.warning("cellDmuxClose(handle=0x%x)", handle); - - const auto dmux = idm::get_unlocked(handle); - - if (!dmux) - { - return CELL_DMUX_ERROR_ARG; - } - - dmux->is_closed = true; - dmux->job.try_push(DemuxerTask(dmuxClose)); - - while (!dmux->is_finished) - { - if (Emu.IsStopped()) - { - cellDmux.warning("cellDmuxClose(%d) aborted", handle); - return CELL_OK; - } - - std::this_thread::sleep_for(1ms); // hack - } - - idm::remove(handle); - return CELL_OK; -} - -error_code cellDmuxSetStream(u32 handle, u32 streamAddress, u32 streamSize, b8 discontinuity, u64 userData) -{ - cellDmux.trace("cellDmuxSetStream(handle=0x%x, streamAddress=0x%x, streamSize=%d, discontinuity=%d, userData=0x%llx)", handle, streamAddress, streamSize, discontinuity, userData); - - const auto dmux = idm::get_unlocked(handle); - - if (!dmux) - { - return CELL_DMUX_ERROR_ARG; - } - - if (dmux->is_running.exchange(true)) - { - //std::this_thread::sleep_for(1ms); // hack - return CELL_DMUX_ERROR_BUSY; - } - - DemuxerTask task(dmuxSetStream); - auto& info = task.stream; - info.addr = streamAddress; - info.size = streamSize; - info.discontinuity = discontinuity; - info.userdata = userData; - - dmux->job.push(task, &dmux->is_closed); - return CELL_OK; -} - -error_code cellDmuxResetStream(u32 handle) -{ - cellDmux.warning("cellDmuxResetStream(handle=0x%x)", handle); - - const auto dmux = idm::get_unlocked(handle); - - if (!dmux) - { - return CELL_DMUX_ERROR_ARG; - } - - dmux->job.push(DemuxerTask(dmuxResetStream), &dmux->is_closed); - return CELL_OK; -} - -error_code cellDmuxResetStreamAndWaitDone(u32 handle) -{ - cellDmux.warning("cellDmuxResetStreamAndWaitDone(handle=0x%x)", handle); - - const auto dmux = idm::get_unlocked(handle); - - if (!dmux) - { - return CELL_DMUX_ERROR_ARG; - } - - if (!dmux->is_running) + // This is frequently checked in here because the elementary stream could get disabled at any time by a different thread via cellDmuxDisableEs() or cellDmuxClose(). + if (!es_handle->is_enabled) { return CELL_OK; } - dmux->is_working = true; - - dmux->job.push(DemuxerTask(dmuxResetStreamAndWaitDone), &dmux->is_closed); - - while (dmux->is_running && dmux->is_working && !dmux->is_closed) // TODO: ensure that it is safe + if (const error_code ret = sys_mutex_lock(ppu, es_handle->_dx_mes, 0); ret != CELL_OK) { - if (Emu.IsStopped()) + fatal_err(es_handle->is_enabled, ret); + return 1; + } + + // Check if the access unit queue is full. One slot is reserved for the access unit produced by flushing the stream, so that flushing always succeeds. + if (!es_handle->is_enabled || es_handle->au_queue.allocated_size >= es_handle->au_queue.max_size - !es_handle->flush_started) + { + if (const error_code ret = sys_mutex_unlock(ppu, es_handle->_dx_mes); ret != CELL_OK) { - cellDmux.warning("cellDmuxResetStreamAndWaitDone(%d) aborted", handle); - return CELL_OK; + fatal_err(es_handle->is_enabled, ret); + return 1; } - std::this_thread::sleep_for(1ms); // hack + + return !es_handle->is_enabled ? CELL_OK : not_an_error(1); // Disable error reporting if the queue is full. This is expected to happen frequently. } + DmuxAuInfo& _au_info = get_au_queue_elements(es_handle)[es_handle->au_queue.back].au_info; + + if (const error_code ret = sys_mutex_unlock(ppu, es_handle->_dx_mes); ret != CELL_OK) + { + fatal_err(es_handle->is_enabled, ret); + return 1; + } + + _au_info.info = au_info->info; + std::memcpy(_au_info.specific_info.get_ptr(), au_info->specific_info.get_ptr(), au_info->specific_info_size); + + if (!es_handle->is_enabled) + { + return CELL_OK; + } + + if (const error_code ret = sys_mutex_lock(ppu, es_handle->_dx_mes, 0); ret != CELL_OK) + { + fatal_err(es_handle->is_enabled, ret); + return CELL_OK; // LLE returns CELL_OK + } + + if (!es_handle->is_enabled) + { + if (const error_code ret = sys_mutex_unlock(ppu, es_handle->_dx_mes); ret != CELL_OK) + { + fatal_err(es_handle->is_enabled, ret); + } + + return CELL_OK; + } + + es_handle->au_queue.back = (es_handle->au_queue.back + 1) % es_handle->au_queue.max_size; + es_handle->au_queue.allocated_size++; + es_handle->au_queue.size++; + + if (const error_code ret = sys_mutex_unlock(ppu, es_handle->_dx_mes); ret != CELL_OK) + { + fatal_err(es_handle->is_enabled, ret); + return CELL_OK; // LLE returns CELL_OK + } + + if (!es_handle->is_enabled) + { + return CELL_OK; + } + + const vm::var es_msg{{ .msgType = CELL_DMUX_ES_MSG_TYPE_AU_FOUND, .supplementalInfo = es_handle->dmux_handle->user_data }}; + es_handle->es_cb.cbFunc(ppu, es_handle->dmux_handle, es_handle, es_msg, es_handle->es_cb.cbArg); + return CELL_OK; } -error_code cellDmuxQueryEsAttr(vm::cptr type, vm::cptr esFilterId, u32 esSpecificInfo, vm::ptr esAttr) +static error_code notify_es_flush_done(ppu_thread& ppu, vm::ptr core_es_handle, vm::ptr es_handle) { - cellDmux.warning("cellDmuxQueryEsAttr(demuxerType=*0x%x, esFilterId=*0x%x, esSpecificInfo=*0x%x, esAttr=*0x%x)", type, esFilterId, esSpecificInfo, esAttr); + // Blocking savestate creation due to ppu_thread::fast_call() + const std::unique_lock savestate_lock{ g_fxo->get(), std::try_to_lock }; - if (type->streamType != CELL_DMUX_STREAM_TYPE_PAMF) + if (!savestate_lock) { - return CELL_DMUX_ERROR_ARG; + ppu.state += cpu_flag::again; + return {}; } - // TODO: check esFilterId and esSpecificInfo correctly - dmuxQueryEsAttr(0, esFilterId, esSpecificInfo, esAttr); + cellDmux.notice("dmuxEsNotifyFlushDone(unk=*0x%x, es_handle=*0x%x)", core_es_handle, es_handle); + + ensure(!!es_handle); // Not checked on LLE + + if (!es_handle->dmux_handle->_this || !es_handle->is_enabled) + { + return CELL_OK; + } + + es_handle->flush_started = false; + + const vm::var es_msg{{ .msgType = CELL_DMUX_ES_MSG_TYPE_FLUSH_DONE, .supplementalInfo = es_handle->dmux_handle->user_data }}; + es_handle->es_cb.cbFunc(ppu, es_handle->dmux_handle, es_handle, es_msg, es_handle->es_cb.cbArg); + return CELL_OK; } -error_code cellDmuxQueryEsAttr2(vm::cptr type2, vm::cptr esFilterId, u32 esSpecificInfo, vm::ptr esAttr) -{ - cellDmux.warning("cellDmuxQueryEsAttr2(type2=*0x%x, esFilterId=*0x%x, esSpecificInfo=*0x%x, esAttr=*0x%x)", type2, esFilterId, esSpecificInfo, esAttr); - if (type2->streamType != CELL_DMUX_STREAM_TYPE_PAMF) +static error_code query_attr(ppu_thread& ppu, vm::ptr demuxerAttr, vm::cptr streamSpecificInfo) +{ + // Blocking savestate creation due to ppu_thread::fast_call() + const std::unique_lock savestate_lock{ g_fxo->get(), std::try_to_lock }; + + if (!savestate_lock) { - return CELL_DMUX_ERROR_ARG; + ppu.state += cpu_flag::again; + return {}; } - // TODO: check demuxerType2, esFilterId and esSpecificInfo correctly - dmuxQueryEsAttr(type2->streamSpecificInfo, esFilterId, esSpecificInfo, esAttr); + const vm::var pamf_attr; + + if (const error_code ret = get_error(get_core_ops()->queryAttr(ppu, streamSpecificInfo, pamf_attr)); ret != CELL_OK) + { + return ret; + } + + demuxerAttr->memSize = utils::align(sizeof(DmuxContext) + (pamf_attr->maxEnabledEsNum * sizeof(vm::addr_t)) + sizeof(DmuxEsContext), alignof(DmuxContext)) + + pamf_attr->memSize + 0xf; + demuxerAttr->demuxerVerUpper = 0x260000; + demuxerAttr->demuxerVerLower = pamf_attr->version; + return CELL_OK; } -error_code cellDmuxEnableEs(u32 handle, vm::cptr esFilterId, vm::cptr esResourceInfo, vm::cptr esCb, u32 esSpecificInfo, vm::ptr esHandle) +error_code cellDmuxQueryAttr(ppu_thread& ppu, vm::cptr demuxerType, vm::ptr demuxerAttr) { - cellDmux.warning("cellDmuxEnableEs(handle=0x%x, esFilterId=*0x%x, esResourceInfo=*0x%x, esCb=*0x%x, esSpecificInfo=*0x%x, esHandle=*0x%x)", handle, esFilterId, esResourceInfo, esCb, esSpecificInfo, esHandle); + cellDmux.notice("cellDmuxQueryAttr(demuxerType=*0x%x, demuxerAttr=*0x%x)", demuxerType, demuxerAttr); - const auto dmux = idm::get_unlocked(handle); - - if (!dmux) + if (!demuxerType || !demuxerAttr || demuxerType->streamType != CELL_DMUX_STREAM_TYPE_PAMF) { return CELL_DMUX_ERROR_ARG; } - // TODO: check esFilterId, esResourceInfo, esCb and esSpecificInfo correctly + return query_attr(ppu, demuxerAttr, vm::null); +} - const auto es = idm::make_ptr(dmux.get(), esResourceInfo->memAddr, esResourceInfo->memSize, - esFilterId->filterIdMajor, esFilterId->filterIdMinor, esFilterId->supplementalInfo1, esFilterId->supplementalInfo2, - esCb->cbFunc, esCb->cbArg, esSpecificInfo); +error_code cellDmuxQueryAttr2(ppu_thread& ppu, vm::cptr demuxerType2, vm::ptr demuxerAttr) +{ + cellDmux.notice("cellDmuxQueryAttr2(demuxerType2=*0x%x, demuxerAttr=*0x%x)", demuxerType2, demuxerAttr); - *esHandle = es->id; + if (!demuxerType2 || !demuxerAttr || demuxerType2->streamType != CELL_DMUX_STREAM_TYPE_PAMF) + { + return CELL_DMUX_ERROR_ARG; + } - cellDmux.warning("*** New ES(dmux=0x%x, addr=0x%x, size=0x%x, filter={0x%x, 0x%x, 0x%x, 0x%x}, cb=0x%x, arg=0x%x, spec=0x%x): id = 0x%x", - handle, es->memAddr, es->memSize, es->fidMajor, es->fidMinor, es->sup1, es->sup2, es->cbFunc, es->cbArg, es->spec, es->id); + return query_attr(ppu, demuxerAttr, demuxerType2->streamSpecificInfo); +} - DemuxerTask task(dmuxEnableEs); - task.es.es = es->id; - task.es.es_ptr = es.get(); +static error_code open(ppu_thread& ppu, vm::cptr demuxerType, vm::cptr demuxerResource, vm::cptr demuxerResourceEx, + vm::cptr demuxerCb, vm::cptr streamSpecificInfo, vm::pptr demuxerHandle) +{ + // Blocking savestate creation due to ppu_thread::fast_call() + const std::unique_lock savestate_lock{ g_fxo->get(), std::try_to_lock }; + + if (!savestate_lock) + { + ppu.state += cpu_flag::again; + return {}; + } + + const vm::var type{{ .streamType = demuxerType->streamType, .streamSpecificInfo = streamSpecificInfo }}; + const vm::var attr; + + if (const error_code ret = cellDmuxQueryAttr2(ppu, type, attr); ret != CELL_OK) + { + return ret; + } + + if (attr->memSize > demuxerResource->memSize) + { + return CELL_DMUX_ERROR_ARG; + } + + const vm::var core_attr; + + if (const error_code ret = get_error(get_core_ops()->queryAttr(ppu, streamSpecificInfo, core_attr)); ret != CELL_OK) + { + return ret; + } + + const auto handle = vm::ptr::make(utils::align(demuxerResource->memAddr.addr(), alignof(DmuxContext))); + const u32 es_handles_size = core_attr->maxEnabledEsNum * sizeof(vm::addr_t); + const auto core_mem_addr = vm::ptr::make(utils::align(handle.addr() + sizeof(DmuxContext) + es_handles_size, 0x10)); + + const vm::var core_resource = + {{ + .memAddr = core_mem_addr, + .memSize = demuxerResource->memSize - (core_mem_addr.addr() - demuxerResource->memAddr.addr()), + .ppuThreadPriority = demuxerResource->ppuThreadPriority, + .ppuThreadStackSize = demuxerResource->ppuThreadStackSize, + .spuThreadPriority = demuxerResource->spuThreadPriority, + .numOfSpus = demuxerResource->numOfSpus + }}; + + const vm::var res_spurs; + + if (demuxerResourceEx) + { + res_spurs->spurs = demuxerResourceEx->spurs; + res_spurs->priority = demuxerResourceEx->priority; + res_spurs->maxContention = demuxerResourceEx->maxContention; + } + + const auto demux_done_func = vm::bptr::make(g_fxo->get().func_addr(FIND_FUNC(notify_demux_done))); + const auto prog_end_code_func = vm::bptr::make(g_fxo->get().func_addr(FIND_FUNC(notify_prog_end_code))); + const auto fatal_err_func = vm::bptr::make(g_fxo->get().func_addr(FIND_FUNC(notify_fatal_err))); + const vm::var> cb_demux_done{{ .cbFunc = demux_done_func, .cbArg = handle }}; + const vm::var> cb_prog_end_code{{ .cbFunc = prog_end_code_func, .cbArg = handle }}; + const vm::var> cb_fatal_err{{ .cbFunc = fatal_err_func, .cbArg = handle }}; + + const vm::var> core_handle; + + if (const error_code ret = get_error(get_core_ops()->open(ppu, streamSpecificInfo, core_resource, demuxerResourceEx ? +res_spurs : vm::null, + cb_demux_done, cb_prog_end_code, cb_fatal_err, core_handle)); + ret != CELL_OK) + { + return ret; + } + + handle->_this = handle; + handle->_this_size = sizeof(DmuxContext) + es_handles_size; + handle->version = core_attr->version; + handle->dmux_state = DMUX_STOPPED; + handle->dmux_type = *demuxerType; + handle->dmux_cb = *demuxerCb; + handle->stream_is_set = false; + handle->core_handle = *core_handle; + handle->version_ = core_attr->version; + handle->user_data = 0; + handle->max_enabled_es_num = core_attr->maxEnabledEsNum; + handle->enabled_es_num = 0; + + const vm::var mutex_attr = + {{ + .protocol = SYS_SYNC_PRIORITY, + .recursive = SYS_SYNC_NOT_RECURSIVE, + .pshared = SYS_SYNC_NOT_PROCESS_SHARED, + .adaptive = SYS_SYNC_NOT_ADAPTIVE, + .name_u64 = "_dx_mhd"_u64 + }}; + + if (const error_code ret = sys_mutex_create(ppu, handle.ptr(&DmuxContext::_dx_mhd), mutex_attr); ret != CELL_OK) + { + return ret; + } + + *demuxerHandle = handle; - dmux->job.push(task, &dmux->is_closed); return CELL_OK; } -error_code cellDmuxDisableEs(u32 esHandle) +error_code cellDmuxOpen(ppu_thread& ppu, vm::cptr demuxerType, vm::cptr demuxerResource, vm::cptr demuxerCb, vm::pptr demuxerHandle) { - cellDmux.warning("cellDmuxDisableEs(esHandle=0x%x)", esHandle); + cellDmux.notice("cellDmuxOpen(demuxerType=*0x%x, demuxerResource=*0x%x, demuxerCb=*0x%x, handle=*0x%x)", demuxerType, demuxerResource, demuxerCb, demuxerHandle); - const auto es = idm::get_unlocked(esHandle); - - if (!es) + if (!demuxerType || demuxerType->streamType != CELL_DMUX_STREAM_TYPE_PAMF + || !demuxerResource || !demuxerResource->memAddr || demuxerResource->memSize == umax || demuxerResource->ppuThreadStackSize == umax + || !demuxerCb || !demuxerCb->cbFunc + || !demuxerHandle) { return CELL_DMUX_ERROR_ARG; } - DemuxerTask task(dmuxDisableEs); - task.es.es = esHandle; - task.es.es_ptr = es.get(); - - es->dmux->job.push(task, &es->dmux->is_closed); - return CELL_OK; + return open(ppu, demuxerType, demuxerResource, vm::null, demuxerCb, vm::null, demuxerHandle); } -error_code cellDmuxResetEs(u32 esHandle) +error_code cellDmuxOpenEx(ppu_thread& ppu, vm::cptr demuxerType, vm::cptr demuxerResourceEx, vm::cptr demuxerCb, vm::pptr demuxerHandle) { - cellDmux.trace("cellDmuxResetEs(esHandle=0x%x)", esHandle); + cellDmux.notice("cellDmuxOpenEx(demuxerType=*0x%x, demuxerResourceEx=*0x%x, demuxerCb=*0x%x, demuxerHandle=*0x%x)", demuxerType, demuxerResourceEx, demuxerCb, demuxerHandle); - const auto es = idm::get_unlocked(esHandle); - - if (!es) + if (!demuxerType || demuxerType->streamType != CELL_DMUX_STREAM_TYPE_PAMF + || !demuxerResourceEx || !demuxerResourceEx->memAddr || demuxerResourceEx->memSize == umax || demuxerResourceEx->ppuThreadStackSize == umax + || !demuxerResourceEx->spurs || demuxerResourceEx->maxContention == 0u + || (demuxerResourceEx->priority & 0xf0f0f0f0f0f0f0f0ull) != 0u // Each byte in priority must be less than 0x10 + || !demuxerCb + || !demuxerHandle) { return CELL_DMUX_ERROR_ARG; } - DemuxerTask task(dmuxResetEs); - task.es.es = esHandle; - task.es.es_ptr = es.get(); + const vm::var resource + {{ + .memAddr = demuxerResourceEx->memAddr, + .memSize = demuxerResourceEx->memSize, + .ppuThreadPriority = demuxerResourceEx->ppuThreadPriority, + .ppuThreadStackSize = demuxerResourceEx->ppuThreadStackSize, + .spuThreadPriority = 0xfa, + .numOfSpus = 1 + }}; - es->dmux->job.push(task, &es->dmux->is_closed); - return CELL_OK; + return open(ppu, demuxerType, resource, demuxerResourceEx, demuxerCb, vm::null, demuxerHandle); } -error_code cellDmuxGetAu(u32 esHandle, vm::ptr auInfo, vm::ptr auSpecificInfo) +error_code cellDmuxOpenExt(ppu_thread& ppu, vm::cptr demuxerType, vm::cptr demuxerResourceEx, vm::cptr demuxerCb, vm::pptr demuxerHandle) { - cellDmux.trace("cellDmuxGetAu(esHandle=0x%x, auInfo=**0x%x, auSpecificInfo=**0x%x)", esHandle, auInfo, auSpecificInfo); + cellDmux.notice("cellDmuxOpenExt(demuxerType=*0x%x, demuxerResourceEx=*0x%x, demuxerCb=*0x%x, demuxerHandle=*0x%x)", demuxerType, demuxerResourceEx, demuxerCb, demuxerHandle); - const auto es = idm::get_unlocked(esHandle); - - if (!es) - { - return CELL_DMUX_ERROR_ARG; - } - - u32 info; - u32 spec; - if (!es->peek(info, true, spec, true)) - { - return CELL_DMUX_ERROR_EMPTY; - } - - *auInfo = info; - *auSpecificInfo = spec; - return CELL_OK; + return cellDmuxOpenEx(ppu, demuxerType, demuxerResourceEx, demuxerCb, demuxerHandle); } -error_code cellDmuxPeekAu(u32 esHandle, vm::ptr auInfo, vm::ptr auSpecificInfo) +error_code cellDmuxOpen2(ppu_thread& ppu, vm::cptr demuxerType2, vm::cptr demuxerResource2, vm::cptr demuxerCb, vm::pptr demuxerHandle) { - cellDmux.trace("cellDmuxPeekAu(esHandle=0x%x, auInfo=**0x%x, auSpecificInfo=**0x%x)", esHandle, auInfo, auSpecificInfo); + cellDmux.notice("cellDmuxOpen2(demuxerType2=*0x%x, demuxerResource2=*0x%x, demuxerCb=*0x%x, demuxerHandle=*0x%x)", demuxerType2, demuxerResource2, demuxerCb, demuxerHandle); - const auto es = idm::get_unlocked(esHandle); - - if (!es) + if (!demuxerType2 || demuxerType2->streamType != CELL_DMUX_STREAM_TYPE_PAMF + || !demuxerResource2 + || !demuxerCb || !demuxerCb->cbFunc + || !demuxerHandle) { return CELL_DMUX_ERROR_ARG; } - u32 info; - u32 spec; - if (!es->peek(info, true, spec, false)) + const vm::var type{{ .streamType = CELL_DMUX_STREAM_TYPE_PAMF }}; + + if (demuxerResource2->isResourceEx) { - return CELL_DMUX_ERROR_EMPTY; + if (!demuxerResource2->resourceEx.memAddr || demuxerResource2->resourceEx.memSize == umax || demuxerResource2->resourceEx.ppuThreadStackSize == umax + || !demuxerResource2->resourceEx.spurs || demuxerResource2->resourceEx.maxContention == 0u + || (demuxerResource2->resourceEx.priority & 0xf0f0f0f0f0f0f0f0ull) != 0u) // Each byte in priority must be less than 0x10 + { + return CELL_DMUX_ERROR_ARG; + } + + const vm::var resource + {{ + .memAddr = demuxerResource2->resourceEx.memAddr, + .memSize = demuxerResource2->resourceEx.memSize, + .ppuThreadPriority = demuxerResource2->resourceEx.ppuThreadPriority, + .ppuThreadStackSize = demuxerResource2->resourceEx.ppuThreadStackSize, + .spuThreadPriority = 0xfa, + .numOfSpus = 1 + }}; + + return open(ppu, type, resource, demuxerResource2.ptr(&CellDmuxResource2::resourceEx), demuxerCb, demuxerType2->streamSpecificInfo, demuxerHandle); } - *auInfo = info; - *auSpecificInfo = spec; - return CELL_OK; + if (!demuxerResource2->resource.memAddr || demuxerResource2->resource.memSize == umax || demuxerResource2->resource.ppuThreadStackSize == umax) + { + return CELL_DMUX_ERROR_ARG; + } + + return open(ppu, type, demuxerResource2.ptr(&CellDmuxResource2::resource), vm::null, demuxerCb, demuxerType2->streamSpecificInfo, demuxerHandle); } -error_code cellDmuxGetAuEx(u32 esHandle, vm::ptr auInfoEx, vm::ptr auSpecificInfo) +static error_code disable_es(ppu_thread& ppu, DmuxEsContext& esHandle) { - cellDmux.trace("cellDmuxGetAuEx(esHandle=0x%x, auInfoEx=**0x%x, auSpecificInfo=**0x%x)", esHandle, auInfoEx, auSpecificInfo); - - const auto es = idm::get_unlocked(esHandle); - - if (!es) + if (const error_code ret = sys_mutex_lock(ppu, esHandle._dx_mes, 0); ret != CELL_OK) { - return CELL_DMUX_ERROR_ARG; + return ret; } - u32 info; - u32 spec; - if (!es->peek(info, false, spec, true)) + const error_code core_ret = get_core_ops()->disableEs(ppu, esHandle.core_es_handle); + + esHandle.is_enabled = false; + + if (const error_code ret = sys_mutex_unlock(ppu, esHandle._dx_mes); ret != CELL_OK) { - return CELL_DMUX_ERROR_EMPTY; + return ret; } - *auInfoEx = info; - *auSpecificInfo = spec; - return CELL_OK; + error_code ret; + while ((ret = sys_mutex_destroy(ppu, esHandle._dx_mes)) == static_cast(CELL_EBUSY)) + { + sys_timer_usleep(ppu, 200); + } + + if (ret != CELL_OK) + { + return ret; + } + + esHandle._this = vm::null; + + return get_error(core_ret); } -error_code cellDmuxPeekAuEx(u32 esHandle, vm::ptr auInfoEx, vm::ptr auSpecificInfo) +error_code cellDmuxClose(ppu_thread& ppu, vm::ptr demuxerHandle) { - cellDmux.trace("cellDmuxPeekAuEx(esHandle=0x%x, auInfoEx=**0x%x, auSpecificInfo=**0x%x)", esHandle, auInfoEx, auSpecificInfo); + // Blocking savestate creation due to ppu_thread::fast_call() + const std::unique_lock savestate_lock{ g_fxo->get(), std::try_to_lock }; - const auto es = idm::get_unlocked(esHandle); + if (!savestate_lock) + { + ppu.state += cpu_flag::again; + return {}; + } - if (!es) + cellDmux.notice("cellDmuxClose(demuxerHandle=*0x%x)", demuxerHandle); + + if (!demuxerHandle || !demuxerHandle->_this || demuxerHandle->dmux_type.streamType != CELL_DMUX_STREAM_TYPE_PAMF) { return CELL_DMUX_ERROR_ARG; } - u32 info; - u32 spec; - if (!es->peek(info, false, spec, false)) + demuxerHandle->_this = vm::null; + + if (const error_code ret = sys_mutex_lock(ppu, demuxerHandle->_dx_mhd, 0); ret != CELL_OK) { - return CELL_DMUX_ERROR_EMPTY; + demuxerHandle->_this = demuxerHandle; + return ret; } - *auInfoEx = info; - *auSpecificInfo = spec; - return CELL_OK; + for (const vm::ptr es_handle : get_es_handles(demuxerHandle)) + { + if (const error_code ret = disable_es(ppu, *es_handle); ret != CELL_OK) + { + ensure(sys_mutex_unlock(ppu, demuxerHandle->_dx_mhd) == CELL_OK); // Not checked on LLE + demuxerHandle->_this = demuxerHandle; + return ret; + } + + es_handle->dmux_handle = vm::null; + demuxerHandle->enabled_es_num--; + } + + error_code ret = sys_mutex_unlock(ppu, demuxerHandle->_dx_mhd); + ret = ret ? ret : get_error(get_core_ops()->close(ppu, demuxerHandle->core_handle)); + ret = ret ? ret : sys_mutex_destroy(ppu, demuxerHandle->_dx_mhd); + + if (ret != CELL_OK) + { + demuxerHandle->_this = demuxerHandle; + } + + return ret; } -error_code cellDmuxReleaseAu(u32 esHandle) +error_code cellDmuxSetStream(ppu_thread& ppu, vm::ptr demuxerHandle, vm::cptr streamAddress, u32 streamSize, b8 discontinuity, u64 userData) { - cellDmux.trace("cellDmuxReleaseAu(esHandle=0x%x)", esHandle); + // Blocking savestate creation due to ppu_thread::fast_call() + const std::unique_lock savestate_lock{ g_fxo->get(), std::try_to_lock }; - const auto es = idm::get_unlocked(esHandle); + if (!savestate_lock) + { + ppu.state += cpu_flag::again; + return {}; + } - if (!es) + cellDmux.trace("cellDmuxSetStream(demuxerHandle=*0x%x, streamAddress=*0x%x, streamSize=0x%x, discontinuity=%d, userData=0x%llx)", + demuxerHandle, streamAddress, streamSize, +discontinuity, userData); + + if (!demuxerHandle || !demuxerHandle->_this || streamSize == 0 || streamSize == umax || demuxerHandle->dmux_type.streamType != CELL_DMUX_STREAM_TYPE_PAMF) { return CELL_DMUX_ERROR_ARG; } - if (!es->release()) + if (!(demuxerHandle->dmux_state & DMUX_STOPPED)) + { + return CELL_DMUX_ERROR_BUSY; + } + + if (const error_code ret = sys_mutex_lock(ppu, demuxerHandle->_dx_mhd, 0); ret != CELL_OK) + { + return ret; + } + + if (const error_code ret = get_error(get_core_ops()->setStream(ppu, demuxerHandle->core_handle, streamAddress, streamSize, discontinuity, userData)); + ret != CELL_OK) + { + const error_code mutex_unlock_ret = sys_mutex_unlock(ppu, demuxerHandle->_dx_mhd); + return mutex_unlock_ret ? mutex_unlock_ret : ret; + } + + demuxerHandle->stream_is_set = true; + demuxerHandle->dmux_state = DMUX_RUNNING; + demuxerHandle->user_data = userData; + + return sys_mutex_unlock(ppu, demuxerHandle->_dx_mhd); +} + +error_code cellDmuxResetStream(ppu_thread& ppu, vm::ptr demuxerHandle) +{ + // Blocking savestate creation due to ppu_thread::fast_call() + const std::unique_lock savestate_lock{ g_fxo->get(), std::try_to_lock }; + + if (!savestate_lock) + { + ppu.state += cpu_flag::again; + return {}; + } + + cellDmux.notice("cellDmuxResetStream(demuxerHandle=*0x%x)", demuxerHandle); + + if (!demuxerHandle || !demuxerHandle->_this || demuxerHandle->dmux_type.streamType != CELL_DMUX_STREAM_TYPE_PAMF) + { + return CELL_DMUX_ERROR_ARG; + } + + if (const error_code ret = sys_mutex_lock(ppu, demuxerHandle->_dx_mhd, 0); ret != CELL_OK) + { + return ret; + } + + const u32 dmux_status = demuxerHandle->dmux_state; + + if (const error_code ret = sys_mutex_unlock(ppu, demuxerHandle->_dx_mhd); ret != CELL_OK) + { + return ret; + } + + if (!(dmux_status & DMUX_RUNNING) || !demuxerHandle->stream_is_set) { return CELL_DMUX_ERROR_SEQ; } + + if (const error_code ret = get_error(get_core_ops()->resetStream(ppu, demuxerHandle->core_handle)); ret != CELL_OK) + { + return ret; + } + + demuxerHandle->stream_is_set = false; + return CELL_OK; } -error_code cellDmuxFlushEs(u32 esHandle) +error_code cellDmuxResetStreamAndWaitDone(ppu_thread& ppu, vm::ptr demuxerHandle) { - cellDmux.warning("cellDmuxFlushEs(esHandle=0x%x)", esHandle); + // Blocking savestate creation due to ppu_thread::fast_call() + const std::unique_lock savestate_lock{ g_fxo->get(), std::try_to_lock }; - const auto es = idm::get_unlocked(esHandle); + if (!savestate_lock) + { + ppu.state += cpu_flag::again; + return {}; + } - if (!es) + cellDmux.notice("cellDmuxResetStreamAndWaitDone(demuxerHandle=*0x%x)", demuxerHandle); + + if (!demuxerHandle || !demuxerHandle->_this || demuxerHandle->dmux_type.streamType != CELL_DMUX_STREAM_TYPE_PAMF) { return CELL_DMUX_ERROR_ARG; } - DemuxerTask task(dmuxFlushEs); - task.es.es = esHandle; - task.es.es_ptr = es.get(); + if (const error_code ret = get_error(get_core_ops()->resetStreamAndWaitDone(ppu, demuxerHandle->core_handle)); ret != CELL_OK) + { + return ret; + } + + // LLE doesn't set DmuxContext::stream_is_set to false + + return CELL_OK; +} + +error_code cellDmuxQueryEsAttr(ppu_thread& ppu, vm::cptr demuxerType, vm::cptr esFilterId, vm::cptr esSpecificInfo, vm::ptr esAttr) +{ + // Blocking savestate creation due to ppu_thread::fast_call() + const std::unique_lock savestate_lock{ g_fxo->get(), std::try_to_lock }; + + if (!savestate_lock) + { + ppu.state += cpu_flag::again; + return {}; + } + + cellDmux.notice("cellDmuxQueryEsAttr(demuxerType=*0x%x, esFilterId=*0x%x, esSpecificInfo=*0x%x, esAttr=*0x%x)", demuxerType, esFilterId, esSpecificInfo, esAttr); + + if (!demuxerType || demuxerType->streamType != CELL_DMUX_STREAM_TYPE_PAMF || !esFilterId || !esAttr) + { + return CELL_DMUX_ERROR_ARG; + } + + const vm::var core_es_attr; + + if (const error_code ret = get_error(get_core_ops()->queryEsAttr(ppu, vm::make_var(*esFilterId), esSpecificInfo, core_es_attr)); + ret != CELL_OK) + { + return ret; + } + + esAttr->memSize = utils::align(sizeof(DmuxEsContext) + ((core_es_attr->auQueueMaxSize + 1) * (core_es_attr->specificInfoSize + sizeof(DmuxAuQueueElement))), alignof(DmuxEsContext)) + + core_es_attr->memSize + 0xf; + + return CELL_OK; +} + +error_code cellDmuxQueryEsAttr2(ppu_thread& ppu, vm::cptr demuxerType2, vm::cptr esFilterId, vm::cptr esSpecificInfo, vm::ptr esAttr) +{ + cellDmux.notice("cellDmuxQueryEsAttr2(demuxerType2=*0x%x, esFilterId=*0x%x, esSpecificInfo=*0x%x, esAttr=*0x%x)", demuxerType2, esFilterId, esSpecificInfo, esAttr); + + ensure(!!demuxerType2); // Not checked on LLE + + const vm::var demuxerType{{ .streamType = demuxerType2->streamType }}; + + return cellDmuxQueryEsAttr(ppu, demuxerType, esFilterId, esSpecificInfo, esAttr); +} + +error_code cellDmuxEnableEs(ppu_thread& ppu, vm::ptr demuxerHandle, vm::cptr esFilterId, vm::cptr esResourceInfo, + vm::cptr esCb, vm::cptr esSpecificInfo, vm::pptr esHandle) +{ + // Blocking savestate creation due to ppu_thread::fast_call() + const std::unique_lock savestate_lock{ g_fxo->get(), std::try_to_lock }; + + if (!savestate_lock) + { + ppu.state += cpu_flag::again; + return {}; + } + + cellDmux.notice("cellDmuxEnableEs(demuxerHandle=*0x%x, esFilterId=*0x%x, esResourceInfo=*0x%x, esCb=*0x%x, esSpecificInfo=*0x%x, esHandle=**0x%x)", + demuxerHandle, esFilterId, esResourceInfo, esCb, esSpecificInfo, esHandle); + + if (!demuxerHandle || !demuxerHandle->_this || demuxerHandle->dmux_type.streamType != CELL_DMUX_STREAM_TYPE_PAMF + || !esFilterId + || !esResourceInfo || !esResourceInfo->memAddr || esResourceInfo->memSize == umax + || !esCb || !esCb->cbFunc + || !esHandle) + { + return CELL_DMUX_ERROR_ARG; + } + + if (const error_code ret = sys_mutex_lock(ppu, demuxerHandle->_dx_mhd, 0); ret != CELL_OK) + { + return ret; + } + + if (demuxerHandle->enabled_es_num >= demuxerHandle->max_enabled_es_num) + { + const error_code mutex_unlock_ret = sys_mutex_unlock(ppu, demuxerHandle->_dx_mhd); + return mutex_unlock_ret ? mutex_unlock_ret : CELL_DMUX_ERROR_ARG; + } + + const vm::var es_attr; + + if (const error_code ret = cellDmuxQueryEsAttr(ppu, demuxerHandle.ptr(&DmuxContext::dmux_type), esFilterId, esSpecificInfo, es_attr); ret != CELL_OK) + { + const error_code mutex_unlock_ret = sys_mutex_unlock(ppu, demuxerHandle->_dx_mhd); + return mutex_unlock_ret ? mutex_unlock_ret : ret; + } + + if (es_attr->memSize > esResourceInfo->memSize) + { + const error_code mutex_unlock_ret = sys_mutex_unlock(ppu, demuxerHandle->_dx_mhd); + return mutex_unlock_ret ? mutex_unlock_ret : CELL_DMUX_ERROR_ARG; + } + + const vm::var es_filter_id{ *esFilterId }; + const vm::var core_es_attr; + + if (const error_code ret = get_error(get_core_ops()->queryEsAttr(ppu, es_filter_id, esSpecificInfo, core_es_attr)); ret != CELL_OK) + { + const error_code mutex_unlock_ret = sys_mutex_unlock(ppu, demuxerHandle->_dx_mhd); + return mutex_unlock_ret ? mutex_unlock_ret : ret; + } + + core_es_attr->auQueueMaxSize++; // One extra slot for the access unit produced by flushing the stream, so that flushing always succeeds + + const auto es_handle = vm::ptr::make(utils::align(esResourceInfo->memAddr.addr(), alignof(DmuxEsContext))); + const u32 au_queue_elements_size = core_es_attr->auQueueMaxSize * (core_es_attr->specificInfoSize + sizeof(DmuxAuQueueElement)); + const auto core_mem_addr = vm::bptr::make(utils::align(es_handle.addr() + sizeof(DmuxEsContext) + au_queue_elements_size, 0x10)); + + const vm::var core_es_resource + {{ + .memAddr = core_mem_addr, + .memSize = esResourceInfo->memSize - (core_mem_addr.addr() - esResourceInfo->memAddr.addr()) + }}; + + const vm::var mutex_attr = + {{ + .protocol = SYS_SYNC_PRIORITY, + .recursive = SYS_SYNC_NOT_RECURSIVE, + .pshared = SYS_SYNC_NOT_PROCESS_SHARED, + .adaptive = SYS_SYNC_NOT_ADAPTIVE, + .name_u64 = "_dx_mes"_u64 + }}; + + if (const error_code ret = sys_mutex_create(ppu, es_handle.ptr(&DmuxEsContext::_dx_mes), mutex_attr); ret != CELL_OK) + { + ensure(sys_mutex_unlock(ppu, demuxerHandle->_dx_mhd) == CELL_OK); // Not checked on LLE + return ret; + } + + if (const error_code ret = sys_mutex_lock(ppu, es_handle->_dx_mes, 0); ret != CELL_OK) + { + ensure(sys_mutex_destroy(ppu, es_handle->_dx_mes) == CELL_OK); // Not checked on LLE + ensure(sys_mutex_unlock(ppu, demuxerHandle->_dx_mhd) == CELL_OK); // Not checked on LLE + return ret; + } + + const auto au_found_func = vm::bptr::make(g_fxo->get().func_addr(FIND_FUNC(notify_es_au_found))); + const auto flush_done_func = vm::bptr::make(g_fxo->get().func_addr(FIND_FUNC(notify_es_flush_done))); + const vm::var> cb_au_found{{ .cbFunc = au_found_func, .cbArg = es_handle }}; + const vm::var> cb_flush_done{{ .cbFunc = flush_done_func, .cbArg = es_handle }}; + + const vm::var> core_es_handle; + + if (const error_code ret = get_error(get_core_ops()->enableEs(ppu, demuxerHandle->core_handle, es_filter_id, core_es_resource, cb_au_found, cb_flush_done, + esSpecificInfo, core_es_handle)); + ret != CELL_OK) + { + const error_code mutex_unlock_ret = sys_mutex_unlock(ppu, es_handle->_dx_mes); + const error_code mutex_destroy_ret = sys_mutex_destroy(ppu, es_handle->_dx_mes); + + if (mutex_unlock_ret != CELL_OK) + { + ensure(sys_mutex_unlock(ppu, demuxerHandle->_dx_mhd) == CELL_OK); // Not checked on LLE + return mutex_unlock_ret; + } + + if (mutex_destroy_ret != CELL_OK) + { + ensure(sys_mutex_unlock(ppu, demuxerHandle->_dx_mhd) == CELL_OK); // Not checked on LLE + return mutex_destroy_ret; + } + + const error_code mutex_unlock_ret2 = sys_mutex_unlock(ppu, demuxerHandle->_dx_mhd); + return mutex_unlock_ret2 ? mutex_unlock_ret2 : ret; + } + + es_handle->is_enabled = true; + es_handle->error_mem_size = 0; + es_handle->error_count = 0; + // es_handle->error_mem_addr is not initialized on LLE + es_handle->_this = es_handle; + es_handle->_this_size = sizeof(DmuxEsContext) + au_queue_elements_size; + es_handle->_this_index = demuxerHandle->enabled_es_num; + es_handle->dmux_handle = demuxerHandle; + es_handle->es_cb = *esCb; + es_handle->core_es_handle = *core_es_handle; + es_handle->flush_started = bf_t, 0, 1>{}; + es_handle->au_queue.max_size = core_es_attr->auQueueMaxSize; + es_handle->au_queue.allocated_size = 0; + es_handle->au_queue.size = 0; + es_handle->au_queue.front = 0; + es_handle->au_queue.back = 0; + es_handle->au_queue.allocated_back = 0; + + const vm::ptr au_queue_elements = get_au_queue_elements(es_handle); + + for (u32 i = 0; i < core_es_attr->auQueueMaxSize; i++) + { + au_queue_elements[i].index = i; + au_queue_elements[i].unk = 0; + au_queue_elements[i].au_info.info.auAddr = vm::null; + au_queue_elements[i].au_info.info.auMaxSize = 0; + au_queue_elements[i].au_info.specific_info.set(au_queue_elements.addr() + (core_es_attr->auQueueMaxSize * static_cast(sizeof(DmuxAuQueueElement))) + (i * core_es_attr->specificInfoSize)); + au_queue_elements[i].au_info.specific_info_size = core_es_attr->specificInfoSize; + } + + demuxerHandle->enabled_es_num++; + *get_es_handles(demuxerHandle).rbegin() = es_handle; + *esHandle = es_handle; + + if (const error_code ret = sys_mutex_unlock(ppu, es_handle->_dx_mes); ret != CELL_OK) + { + ensure(sys_mutex_destroy(ppu, es_handle->_dx_mes) == CELL_OK); // Not checked on LLE + ensure(sys_mutex_unlock(ppu, demuxerHandle->_dx_mhd) == CELL_OK); // Not checked on LLE + return ret; + } + + return sys_mutex_unlock(ppu, demuxerHandle->_dx_mhd); +} + +error_code cellDmuxDisableEs(ppu_thread& ppu, vm::ptr esHandle) +{ + // Blocking savestate creation due to ppu_thread::fast_call() + const std::unique_lock savestate_lock{ g_fxo->get(), std::try_to_lock }; + + if (!savestate_lock) + { + ppu.state += cpu_flag::again; + return {}; + } + + cellDmux.notice("cellDmuxDisableEs(esHandle=*0x%x)", esHandle); + + if (!esHandle || !esHandle->_this || !esHandle->dmux_handle || esHandle->dmux_handle->dmux_type.streamType != CELL_DMUX_STREAM_TYPE_PAMF) + { + return CELL_DMUX_ERROR_ARG; + } + + if (const error_code ret = sys_mutex_lock(ppu, esHandle->dmux_handle->_dx_mhd, 0); ret != CELL_OK) + { + return ret; + } + + if (const error_code ret = disable_es(ppu, *esHandle); ret != CELL_OK) + { + ensure(sys_mutex_unlock(ppu, esHandle->dmux_handle->_dx_mhd) == CELL_OK); // Not checked on LLE + return ret; + } + + const std::span> es_handles = get_es_handles(esHandle->dmux_handle); + + std::shift_left(std::ranges::find(es_handles, static_cast>(esHandle)), es_handles.end(), 1); + + esHandle->dmux_handle->enabled_es_num--; + *es_handles.rbegin() = vm::null; + + return sys_mutex_unlock(ppu, esHandle->dmux_handle->_dx_mhd); +} + +error_code cellDmuxResetEs(ppu_thread& ppu, vm::ptr esHandle) +{ + // Blocking savestate creation due to ppu_thread::fast_call() + const std::unique_lock savestate_lock{ g_fxo->get(), std::try_to_lock }; + + if (!savestate_lock) + { + ppu.state += cpu_flag::again; + return {}; + } + + cellDmux.notice("cellDmuxResetEs(esHandle=*0x%x)", esHandle); + + if (!esHandle || !esHandle->_this || !esHandle->dmux_handle || esHandle->dmux_handle->dmux_type.streamType != CELL_DMUX_STREAM_TYPE_PAMF) + { + return CELL_DMUX_ERROR_ARG; + } + + if (const error_code ret = sys_mutex_lock(ppu, esHandle->dmux_handle->_dx_mhd, 0); ret != CELL_OK) + { + return ret; + } + + const u32 dmux_status = esHandle->dmux_handle->dmux_state; + + if (const error_code ret = sys_mutex_unlock(ppu, esHandle->dmux_handle->_dx_mhd); ret != CELL_OK) + { + return ret; + } + + if (dmux_status & DMUX_STOPPED) + { + return CELL_DMUX_ERROR_SEQ; + } + + if (const error_code ret = sys_mutex_lock(ppu, esHandle->_dx_mes, 0); ret != CELL_OK) + { + return ret; + } + + if (const error_code ret = get_error(get_core_ops()->resetEs(ppu, esHandle->core_es_handle)); ret != CELL_OK) + { + const error_code mutex_unlock_ret = sys_mutex_unlock(ppu, esHandle->_dx_mes); + return mutex_unlock_ret ? mutex_unlock_ret : ret; + } + + const auto au_queue_elements = get_au_queue_elements(esHandle); + + for (s32 i = 0; i < esHandle->au_queue.max_size; i++) + { + au_queue_elements[i].index = i; + au_queue_elements[i].unk = 0; + au_queue_elements[i].au_info.info.auAddr = vm::null; + au_queue_elements[i].au_info.info.auMaxSize = 0; + } + + esHandle->error_mem_size = 0; + esHandle->error_count = 0; + esHandle->au_queue.allocated_size = 0; + esHandle->au_queue.size = 0; + esHandle->au_queue.front = 0; + esHandle->au_queue.back = 0; + esHandle->au_queue.allocated_back = 0; + + return sys_mutex_unlock(ppu, esHandle->_dx_mes); +} + +template +static error_code pop_au(ppu_thread& ppu, vm::ptr esHandle, vm::cpptr auInfo, vm::cpptr auSpecificInfo) +{ + if (!esHandle || !esHandle->_this || !esHandle->dmux_handle || esHandle->dmux_handle->dmux_type.streamType != CELL_DMUX_STREAM_TYPE_PAMF) + { + return CELL_DMUX_ERROR_ARG; + } + + if (const error_code ret = sys_mutex_lock(ppu, esHandle->_dx_mes, 0); ret != CELL_OK) + { + return ret; + } + + if (ppu.state & cpu_flag::again) + { + return {}; + } + + if (esHandle->au_queue.size <= 0) + { + const error_code mutex_unlock_ret = sys_mutex_unlock(ppu, esHandle->_dx_mes); + return mutex_unlock_ret ? mutex_unlock_ret : CELL_DMUX_ERROR_EMPTY; + } + + const vm::ptr au_info = (get_au_queue_elements(esHandle) + esHandle->au_queue.front).ptr(&DmuxAuQueueElement::au_info); + + if (auInfo) + { + *auInfo = au_info.ptr(&DmuxAuInfo::info); + } + + if (auSpecificInfo) + { + *auSpecificInfo = au_info->specific_info; + } + + if constexpr (!is_peek) + { + esHandle->au_queue.front = (esHandle->au_queue.front + 1) % esHandle->au_queue.max_size; + esHandle->au_queue.size--; + } + + return sys_mutex_unlock(ppu, esHandle->_dx_mes); +} + +error_code cellDmuxGetAu(ppu_thread& ppu, vm::ptr esHandle, vm::cpptr auInfo, vm::cpptr auSpecificInfo) +{ + cellDmux.trace("cellDmuxGetAu(esHandle=*0x%x, auInfo=**0x%x, auSpecificInfo=**0x%x)", esHandle, auInfo, auSpecificInfo); + + return pop_au(ppu, esHandle, auInfo, auSpecificInfo); +} + +error_code cellDmuxPeekAu(ppu_thread& ppu, vm::ptr esHandle, vm::cpptr auInfo, vm::cpptr auSpecificInfo) +{ + cellDmux.trace("cellDmuxPeekAu(esHandle=*0x%x, auInfo=**0x%x, auSpecificInfo=**0x%x)", esHandle, auInfo, auSpecificInfo); + + return pop_au(ppu, esHandle, auInfo, auSpecificInfo); +} + +error_code cellDmuxGetAuEx(ppu_thread& ppu, vm::ptr esHandle, vm::cpptr auInfoEx, vm::cpptr auSpecificInfo) +{ + cellDmux.trace("cellDmuxGetAuEx(esHandle=*0x%x, auInfoEx=**0x%x, auSpecificInfo=**0x%x)", esHandle, auInfoEx, auSpecificInfo); + + return pop_au(ppu, esHandle, auInfoEx, auSpecificInfo); +} + +error_code cellDmuxPeekAuEx(ppu_thread& ppu, vm::ptr esHandle, vm::cpptr auInfoEx, vm::cpptr auSpecificInfo) +{ + cellDmux.trace("cellDmuxPeekAuEx(esHandle=*0x%x, auInfoEx=**0x%x, auSpecificInfo=**0x%x)", esHandle, auInfoEx, auSpecificInfo); + + return pop_au(ppu, esHandle, auInfoEx, auSpecificInfo); +} + +error_code cellDmuxReleaseAu(ppu_thread& ppu, vm::ptr esHandle) +{ + // Blocking savestate creation due to ppu_thread::fast_call() + const std::unique_lock savestate_lock{ g_fxo->get(), std::try_to_lock }; + + if (!savestate_lock) + { + ppu.state += cpu_flag::again; + return {}; + } + + cellDmux.trace("cellDmuxReleaseAu(esHandle=*0x%x)", esHandle); + + if (!esHandle || !esHandle->_this || !esHandle->dmux_handle || esHandle->dmux_handle->dmux_type.streamType != CELL_DMUX_STREAM_TYPE_PAMF) + { + return CELL_DMUX_ERROR_ARG; + } + + if (const error_code ret = sys_mutex_lock(ppu, esHandle->_dx_mes, 0); ret != CELL_OK) + { + return ret; + } + + vm::bptr mem_addr; + u32 mem_size; + + if (esHandle->au_queue.allocated_size < 1) + { + if (esHandle->error_count == 0u) + { + const error_code mutex_unlock_ret = sys_mutex_unlock(ppu, esHandle->_dx_mes); + return mutex_unlock_ret ? mutex_unlock_ret : CELL_DMUX_ERROR_SEQ; + } + + mem_addr = esHandle->error_mem_addr; + mem_size = esHandle->error_mem_size; + } + else + { + const DmuxAuInfo& au_info = get_au_queue_elements(esHandle)[esHandle->au_queue.allocated_back].au_info; + + mem_size = + esHandle->error_mem_size += au_info.info.auSize; + + if (esHandle->error_count == 0u) + { + mem_addr = au_info.info.auAddr; + } + else + { + mem_addr = esHandle->error_mem_addr; + } + + esHandle->au_queue.allocated_back = (esHandle->au_queue.allocated_back + 1) % esHandle->au_queue.max_size; + esHandle->au_queue.allocated_size--; + + if (esHandle->au_queue.allocated_size < esHandle->au_queue.size) + { + esHandle->au_queue.front = (esHandle->au_queue.front + 1) % esHandle->au_queue.max_size; + esHandle->au_queue.size--; + } + } + + if (const error_code ret = get_error(get_core_ops()->releaseAu(ppu, esHandle->core_es_handle, mem_addr, mem_size)); ret != CELL_OK) + { + if (esHandle->error_count == 0u) + { + esHandle->error_mem_addr = mem_addr; + } + + esHandle->error_count++; + + const error_code mutex_unlock_ret = sys_mutex_unlock(ppu, esHandle->_dx_mes); + return mutex_unlock_ret ? mutex_unlock_ret : ret; + } + + esHandle->error_count = 0; + esHandle->error_mem_size = 0; + + return sys_mutex_unlock(ppu, esHandle->_dx_mes); +} + +error_code cellDmuxFlushEs(ppu_thread& ppu, vm::ptr esHandle) +{ + // Blocking savestate creation due to ppu_thread::fast_call() + const std::unique_lock savestate_lock{ g_fxo->get(), std::try_to_lock }; + + if (!savestate_lock) + { + ppu.state += cpu_flag::again; + return {}; + } + + cellDmux.notice("cellDmuxFlushEs(esHandle=*0x%x)", esHandle); + + if (!esHandle || !esHandle->_this || !esHandle->dmux_handle || esHandle->dmux_handle->dmux_type.streamType != CELL_DMUX_STREAM_TYPE_PAMF) + { + return CELL_DMUX_ERROR_ARG; + } + + if (const error_code ret = sys_mutex_lock(ppu, esHandle->dmux_handle->_dx_mhd, 0); ret != CELL_OK) + { + return ret; + } + + const u32 dmux_state = esHandle->dmux_handle->dmux_state; + + if (const error_code ret = sys_mutex_unlock(ppu, esHandle->dmux_handle->_dx_mhd); ret != CELL_OK) + { + return ret; + } + + if (!(dmux_state & DMUX_STOPPED)) + { + return CELL_DMUX_ERROR_SEQ; + } + + esHandle->flush_started = true; + + if (const error_code ret = get_error(get_core_ops()->flushEs(ppu, esHandle->core_es_handle)); ret != CELL_OK) + { + esHandle->flush_started = false; + return ret; + } - es->dmux->job.push(task, &es->dmux->is_closed); return CELL_OK; } @@ -1382,4 +1246,11 @@ DECLARE(ppu_module_manager::cellDmux)("cellDmux", []() REG_FUNC(cellDmux, cellDmuxPeekAuEx); REG_FUNC(cellDmux, cellDmuxReleaseAu); REG_FUNC(cellDmux, cellDmuxFlushEs); + + REG_HIDDEN_FUNC(notify_demux_done); + REG_HIDDEN_FUNC(notify_fatal_err); + REG_HIDDEN_FUNC(notify_prog_end_code); + + REG_HIDDEN_FUNC(notify_es_au_found); + REG_HIDDEN_FUNC(notify_es_flush_done); }); diff --git a/rpcs3/Emu/Cell/Modules/cellDmux.h b/rpcs3/Emu/Cell/Modules/cellDmux.h index dc17cb3314..3db8c63bee 100644 --- a/rpcs3/Emu/Cell/Modules/cellDmux.h +++ b/rpcs3/Emu/Cell/Modules/cellDmux.h @@ -1,7 +1,8 @@ #pragma once #include "Emu/Memory/vm_ptr.h" -#include "cellPamf.h" +#include "Emu/Cell/ErrorCodes.h" +#include "Utilities/BitField.h" // Error Codes enum CellDmuxError :u32 @@ -18,6 +19,10 @@ enum CellDmuxStreamType : s32 CELL_DMUX_STREAM_TYPE_UNDEF = 0, CELL_DMUX_STREAM_TYPE_PAMF = 1, CELL_DMUX_STREAM_TYPE_TERMINATOR = 2, + + // Only used in cellSail + CELL_DMUX_STREAM_TYPE_MP4 = 0x81, + CELL_DMUX_STREAM_TYPE_AVI = 0x82 }; enum CellDmuxMsgType : s32 @@ -48,13 +53,14 @@ struct CellDmuxEsMsg struct CellDmuxType { be_t streamType; // CellDmuxStreamType - be_t reserved[2]; + be_t reserved1; + be_t reserved2; }; struct CellDmuxType2 { - be_t streamType; // CellDmuxStreamType - be_t streamSpecificInfo; + be_t streamType; + vm::bcptr streamSpecificInfo; }; struct CellDmuxResource @@ -73,8 +79,8 @@ struct CellDmuxResourceEx be_t memSize; be_t ppuThreadPriority; be_t ppuThreadStackSize; - be_t spurs_addr; - u8 priority[8]; + vm::bptr spurs; // CellSpurs* + be_t priority; be_t maxContention; }; @@ -85,33 +91,23 @@ struct CellDmuxResourceSpurs be_t maxContention; }; -/* -struct CellDmuxResource2Ex -{ - b8 isResourceEx; //true - CellDmuxResourceEx resourceEx; -}; - -struct CellDmuxResource2NoEx -{ - b8 isResourceEx; //false - CellDmuxResource resource; -}; -*/ - struct CellDmuxResource2 { b8 isResourceEx; - be_t memAddr; - be_t memSize; - be_t ppuThreadPriority; - be_t ppuThreadStackSize; - be_t shit[4]; + + union + { + CellDmuxResource resource; + CellDmuxResourceEx resourceEx; + }; }; -using CellDmuxCbMsg = u32(u32 demuxerHandle, vm::cptr demuxerMsg, vm::ptr cbArg); +struct DmuxContext; +struct DmuxEsContext; -using CellDmuxCbEsMsg = u32(u32 demuxerHandle, u32 esHandle, vm::cptr esMsg, vm::ptr cbArg); +using CellDmuxCbMsg = u32(vm::ptr demuxerHandle, vm::cptr demuxerMsg, vm::ptr cbArg); + +using CellDmuxCbEsMsg = u32(vm::ptr demuxerHandle, vm::ptr esHandle, vm::cptr esMsg, vm::ptr cbArg); // Used for internal callbacks as well template @@ -177,6 +173,70 @@ struct DmuxAuInfo be_t specific_info_size; }; +struct DmuxAuQueueElement +{ + be_t index; + u8 unk; // unused + DmuxAuInfo au_info; +}; + +CHECK_SIZE(DmuxAuQueueElement, 0x38); + +enum DmuxState : u32 +{ + DMUX_STOPPED = 1 << 0, + DMUX_RUNNING = 1 << 1, +}; + +struct alignas(0x10) DmuxContext // CellDmuxHandle = DmuxContext* +{ + vm::bptr _this; + be_t _this_size; + be_t version; + be_t dmux_state; + CellDmuxType dmux_type; + CellDmuxCb dmux_cb; + b8 stream_is_set; + vm::bptr core_handle; + be_t version_; // Same value as 'version' + be_t user_data; + be_t max_enabled_es_num; + be_t enabled_es_num; + be_t _dx_mhd; // sys_mutex_t + u8 reserved[0x7c]; +}; + +CHECK_SIZE_ALIGN(DmuxContext, 0xc0, 0x10); + +struct alignas(0x10) DmuxEsContext // CellDmuxEsHandle = DmuxEsContext* +{ + be_t _dx_mes; // sys_mutex_t + be_t is_enabled; + be_t error_mem_size; + be_t error_count; + vm::bptr error_mem_addr; + vm::bptr _this; + be_t _this_size; + be_t _this_index; + vm::bptr dmux_handle; + CellDmuxEsCb es_cb; + vm::bptr core_es_handle; + bf_t, 0, 1> flush_started; + + struct + { + be_t max_size; + be_t allocated_size; + be_t size; + be_t front; + be_t back; + be_t allocated_back; + } + au_queue; +}; + +CHECK_SIZE_ALIGN(DmuxEsContext, 0x50, 0x10); + using DmuxNotifyDemuxDone = error_code(vm::ptr, u32, vm::ptr); using DmuxNotifyFatalErr = error_code(vm::ptr, u32, vm::ptr); using DmuxNotifyProgEndCode = error_code(vm::ptr, vm::ptr); @@ -194,10 +254,10 @@ using CellDmuxCoreOpSetStream = error_code(vm::ptr, vm::cptr, u32, b using CellDmuxCoreOpReleaseAu = error_code(vm::ptr, vm::ptr, u32); using CellDmuxCoreOpQueryEsAttr = error_code(vm::cptr, vm::cptr, vm::ptr); using CellDmuxCoreOpEnableEs = error_code(vm::ptr, vm::cptr, vm::cptr, vm::cptr>, vm::cptr>, vm::cptr, vm::pptr); -using CellDmuxCoreOpDisableEs = u32(vm::ptr); -using CellDmuxCoreOpFlushEs = u32(vm::ptr); -using CellDmuxCoreOpResetEs = u32(vm::ptr); -using CellDmuxCoreOpResetStreamAndWaitDone = u32(vm::ptr); +using CellDmuxCoreOpDisableEs = error_code(vm::ptr); +using CellDmuxCoreOpFlushEs = error_code(vm::ptr); +using CellDmuxCoreOpResetEs = error_code(vm::ptr); +using CellDmuxCoreOpResetStreamAndWaitDone = error_code(vm::ptr); struct CellDmuxCoreOps { diff --git a/rpcs3/Emu/Cell/lv2/sys_prx.cpp b/rpcs3/Emu/Cell/lv2/sys_prx.cpp index f46f3bdafa..6f930e79dd 100644 --- a/rpcs3/Emu/Cell/lv2/sys_prx.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_prx.cpp @@ -63,7 +63,7 @@ extern const std::map g_prx_list { "libcelpenc.sprx", 0 }, { "libddpdec.sprx", 0 }, { "libdivxdec.sprx", 0 }, - { "libdmux.sprx", 0 }, + { "libdmux.sprx", 1 }, { "libdmuxpamf.sprx", 1 }, { "libdtslbrdec.sprx", 0 }, { "libfiber.sprx", 0 },