From 1c2a2ab01a381081d9b2cb84642974ec5abdb4b1 Mon Sep 17 00:00:00 2001 From: capriots <29807355+capriots@users.noreply.github.com> Date: Sat, 19 Jul 2025 13:59:21 +0200 Subject: [PATCH] cellDmuxPamf implementation part 1: SPU thread --- rpcs3/Emu/Cell/Modules/cellDmux.h | 4 +- rpcs3/Emu/Cell/Modules/cellDmuxPamf.cpp | 1145 ++++++++++++++++++++++- rpcs3/Emu/Cell/Modules/cellDmuxPamf.h | 661 +++++++++++++ rpcs3/Emu/Cell/Modules/cellPamf.h | 1 + rpcs3/Emu/savestate_utils.cpp | 3 +- 5 files changed, 1789 insertions(+), 25 deletions(-) diff --git a/rpcs3/Emu/Cell/Modules/cellDmux.h b/rpcs3/Emu/Cell/Modules/cellDmux.h index 1767165283..7c31bbf105 100644 --- a/rpcs3/Emu/Cell/Modules/cellDmux.h +++ b/rpcs3/Emu/Cell/Modules/cellDmux.h @@ -301,7 +301,7 @@ using CellDmuxCoreOpResetStream = error_code(vm::ptr); using CellDmuxCoreOpCreateThread = error_code(vm::ptr); using CellDmuxCoreOpJoinThread = error_code(vm::ptr); using CellDmuxCoreOpSetStream = error_code(vm::ptr, vm::cptr, u32, b8, u64); -using CellDmuxCoreOpFreeMemory = error_code(vm::ptr, vm::ptr, u32); +using CellDmuxCoreOpReleaseAu = error_code(vm::ptr, vm::ptr, u32); using CellDmuxCoreOpQueryEsAttr = error_code(vm::cptr, vm::cptr, vm::ptr); using CellDmuxCoreOpEnableEs = error_code(vm::ptr, vm::cptr, vm::cptr, vm::cptr>, vm::cptr>, vm::cptr, vm::pptr); using CellDmuxCoreOpDisableEs = u32(vm::ptr); @@ -318,7 +318,7 @@ struct CellDmuxCoreOps vm::bptr createThread; vm::bptr joinThread; vm::bptr setStream; - vm::bptr freeMemory; + vm::bptr releaseAu; vm::bptr queryEsAttr; vm::bptr enableEs; vm::bptr disableEs; diff --git a/rpcs3/Emu/Cell/Modules/cellDmuxPamf.cpp b/rpcs3/Emu/Cell/Modules/cellDmuxPamf.cpp index 70162d4031..7c0fd8ec39 100644 --- a/rpcs3/Emu/Cell/Modules/cellDmuxPamf.cpp +++ b/rpcs3/Emu/Cell/Modules/cellDmuxPamf.cpp @@ -1,16 +1,1126 @@ #include "stdafx.h" #include "Emu/Cell/PPUModule.h" -#include "Emu/IdManager.h" +#include "Emu/Cell/lv2/sys_sync.h" -#include "cellDmux.h" #include "cellDmuxPamf.h" - +#include vm::gvar g_cell_dmux_core_ops_pamf; vm::gvar g_cell_dmux_core_ops_raw_es; LOG_CHANNEL(cellDmuxPamf) +inline std::pair dmuxPamfStreamIdToTypeChannel(u16 stream_id, u16 private_stream_id) +{ + if ((stream_id & 0xf0) == 0xe0) + { + return { DMUX_PAMF_STREAM_TYPE_INDEX_VIDEO, stream_id & 0x0f }; + } + + if ((stream_id & 0xff) != 0xbd) + { + return { DMUX_PAMF_STREAM_TYPE_INDEX_INVALID, 0 }; + } + + switch (private_stream_id & 0xf0) + { + case 0x40: return { DMUX_PAMF_STREAM_TYPE_INDEX_LPCM, private_stream_id & 0x0f }; + case 0x30: return { DMUX_PAMF_STREAM_TYPE_INDEX_AC3, private_stream_id & 0x0f }; + case 0x00: return { DMUX_PAMF_STREAM_TYPE_INDEX_ATRACX, private_stream_id & 0x0f }; + case 0x20: return { DMUX_PAMF_STREAM_TYPE_INDEX_USER_DATA, private_stream_id & 0x0f }; + default: return { DMUX_PAMF_STREAM_TYPE_INDEX_INVALID, 0 }; + } +} + + +// SPU thread + +void dmux_pamf_base::output_queue::pop_back(u32 au_size) +{ + ensure(back - au_size >= buffer.data(), "Invalid au_size"); + back -= au_size; +} + +void dmux_pamf_base::output_queue::pop_back(u8* au_addr) +{ + ensure(au_addr >= buffer.data() && au_addr < std::to_address(buffer.end()), "Invalid au_addr"); + + // If au_begin is in front of the back pointer, unwrap the back pointer (there are no more access units behind the back pointer) + if (au_addr > back) + { + wrap_pos = buffer.data(); + } + + back = au_addr; +} + +void dmux_pamf_base::output_queue::pop_front(u32 au_size) +{ + ensure(front + au_size <= std::to_address(buffer.end()), "Invalid au_size"); + front += au_size; + + // When front reaches wrap_pos, unwrap the queue + if (wrap_pos != buffer.data() && wrap_pos <= front) + { + ensure(wrap_pos == front, "Invalid au_size"); + front = buffer.data(); + wrap_pos = buffer.data(); + } +} + +void dmux_pamf_base::output_queue::push_unchecked(const access_unit_chunk& au_chunk) +{ + std::ranges::copy(au_chunk.cached_data, back); + std::ranges::copy(au_chunk.data, back + au_chunk.cached_data.size()); + back += au_chunk.data.size() + au_chunk.cached_data.size(); +} + +bool dmux_pamf_base::output_queue::push(const access_unit_chunk& au_chunk, const std::function& on_fatal_error) +{ + // If there are any unconsumed access units behind the back pointer, the distance between the front and back pointers is the remaining capacity, + // otherwise the distance between the end of the buffer and the back pointer is the remaining capacity + if (wrap_pos == buffer.data()) + { + // Since it was already checked if there is enough space for au_max_size, this can only occur if the current access unit is larger than au_max_size + if (au_chunk.data.size() + au_chunk.cached_data.size() > static_cast(std::to_address(buffer.end()) - back)) + { + cellDmuxPamf.error("Access unit larger than specified maximum access unit size"); + on_fatal_error(); + return false; + } + } + else if (au_chunk.data.size() + au_chunk.cached_data.size() + 0x10 > static_cast(front - back)) // + sizeof(v128) because of SPU shenanigans probably + { + return false; + } + + push_unchecked(au_chunk); + return true; +} + +bool dmux_pamf_base::output_queue::prepare_next_au(u32 au_max_size) +{ + // LLE always checks the distance between the end of the buffer and the back pointer, even if the back pointer is wrapped around and there are unconsumed access units behind it + if (std::to_address(buffer.end()) - back < au_max_size) + { + // Can't wrap the back pointer around again as long as there are unconsumed access units behind it + if (wrap_pos != buffer.data()) + { + return false; + } + + wrap_pos = back; + back = buffer.data(); + } + + return true; +} + +void dmux_pamf_base::elementary_stream::flush_es() +{ + if (current_au.accumulated_size != 0) + { + ensure(au_queue.get_free_size() >= cache.size()); + au_queue.push_unchecked({ {}, cache }); + + current_au.accumulated_size += static_cast(cache.size()); + + ctx.on_au_found(get_stream_id().first, get_stream_id().second, user_data, { au_queue.peek_back(current_au.accumulated_size), current_au.accumulated_size }, current_au.pts, current_au.dts, + current_au.rap, au_specific_info_size, current_au.au_specific_info_buf); + } + + reset(); + + while (!ctx.on_flush_done(get_stream_id().first, get_stream_id().second, user_data)) {} // The flush_done event is repeatedly fired until it succeeds +} + +void dmux_pamf_base::elementary_stream::reset_es(u8* au_addr) +{ + if (!au_addr) + { + reset(); + au_queue.clear(); + } + else + { + au_queue.pop_back(au_addr); + } +} + +void dmux_pamf_base::elementary_stream::discard_access_unit() +{ + au_queue.pop_back(current_au.accumulated_size - static_cast(au_chunk.data.size() + au_chunk.cached_data.size())); + reset(); + cache.clear(); +} + +template +u32 dmux_pamf_base::elementary_stream::parse_audio_stream_header(std::span pes_packet_data) +{ + u32 extra_header_size_unk = 0; // No clue what this is, I have not found a single instance in any PAMF stream where it is something other than zero + + if (!au_size_unk) // For some reason, LLE uses the member that stores the size of user data access units here as bool + { + // Not checked on LLE + if (pes_packet_data.size() < sizeof(u32)) + { + return umax; + } + + extra_header_size_unk = read_from_ptr>(pes_packet_data) & extra_header_size_unk_mask; + au_size_unk = true; + } + + return extra_header_size_unk + sizeof(u32); +} + +bool dmux_pamf_base::elementary_stream::process_pes_packet_data() +{ + ensure(pes_packet_data, "set_pes_packet_data() should be used before process_stream()"); + + for (;;) + { + switch (state) + { + case state::initial: + if (stream_chunk.empty()) + { + pes_packet_data.reset(); + return true; + } + + // Parse the current stream section and increment the reading position by the amount that was consumed + stream_chunk = stream_chunk.subspan(parse_stream(stream_chunk)); + + current_au.accumulated_size += static_cast(au_chunk.data.size() + au_chunk.cached_data.size()); + + // If the beginning of a new access unit was found, set the current timestamps and rap indicator + if (!current_au.timestamps_rap_set && (current_au.state == access_unit::state::commenced || current_au.state == access_unit::state::m2v_sequence + || (current_au.state == access_unit::state::complete && au_chunk.cached_data.empty()))) + { + set_au_timestamps_rap(); + } + + state = state::pushing_au_queue; + [[fallthrough]]; + + case state::pushing_au_queue: + if (!au_chunk.data.empty() || !au_chunk.cached_data.empty()) + { + if (!au_queue.push(au_chunk, std::bind_front(&dmux_pamf_base::on_fatal_error, &ctx))) + { + ctx.on_au_queue_full(); + return false; + } + + au_chunk.data = {}; + au_chunk.cached_data.clear(); + } + + // This happens if the distance between two delimiters is greater than the size indicated in the info header of the stream. + if (current_au.state == access_unit::state::size_mismatch) + { + // LLE cuts off one byte from the beginning of the current PES packet data and then starts over. + pes_packet_data = pes_packet_data->subspan<1>(); + stream_chunk = *pes_packet_data; + + // It also removes the entire current access unit from the queue, even if it began in an earlier PES packet + au_queue.pop_back(current_au.accumulated_size); + current_au.accumulated_size = 0; + + state = state::initial; + continue; + } + + state = state::notifying_au_found; + [[fallthrough]]; + + case state::notifying_au_found: + if (current_au.state == access_unit::state::complete && !ctx.on_au_found(get_stream_id().first, get_stream_id().second, user_data, + { au_queue.peek_back(current_au.accumulated_size), current_au.accumulated_size }, current_au.pts, current_au.dts, current_au.rap, au_specific_info_size, current_au.au_specific_info_buf)) + { + return false; + } + + state = state::preparing_for_next_au; + [[fallthrough]]; + + case state::preparing_for_next_au: + if (current_au.state == access_unit::state::complete) + { + if (!au_queue.prepare_next_au(au_max_size)) + { + ctx.on_au_queue_full(); + return false; + } + + current_au = {}; + } + + state = state::initial; + } + } +} + +template +u32 dmux_pamf_base::video_stream::parse_stream(std::span stream) +{ + if (current_au.state != access_unit::state::none && (avc || current_au.state != access_unit::state::m2v_sequence)) + { + current_au.state = access_unit::state::incomplete; + } + + // Concatenate the cache of the previous stream section and the beginning of the current section + std::array buf{ 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff }; // Prevent false positives (M2V pic start code ends with 0x00) + ensure(cache.size() <= 3, "The size of the cache should never exceed three bytes"); + std::ranges::copy(cache, buf.begin()); + std::copy_n(stream.begin(), std::min(sizeof(u32) - 1, stream.size()), buf.begin() + cache.size()); // Not entirely accurate: LLE always reads three bytes from the stream, even if it is smaller than that + + auto au_chunk_begin = stream.begin(); + s32 cache_idx = 0; + auto stream_it = stream.begin(); + [&] + { + // Search for delimiter in cache + for (; cache_idx < static_cast(cache.size()); cache_idx++) + { + if (const be_t code = read_from_ptr>(buf.data() + cache_idx); + (avc && code == AVC_AU_DELIMITER) || (!avc && (code == M2V_PIC_START || code == M2V_SEQUENCE_HEADER || code == M2V_SEQUENCE_END))) + { + if (current_au.state != access_unit::state::none && (avc || current_au.state != access_unit::state::m2v_sequence)) + { + // The sequence end code is included in the access unit + // LLE increments the stream pointer instead of the cache index, which will cause the access unit to be corrupted at the end + if (!avc && code == M2V_SEQUENCE_END) + { + cellDmuxPamf.warning("M2V sequence end code in cache"); + stream_it += std::min(sizeof(u32), stream.size()); // Not accurate, LLE always increments by four, regardless of the stream size + } + + current_au.state = access_unit::state::complete; + return; + } + + // If current_au.state is none and there was a delimiter found here, then LLE outputs the entire cache, even if the access unit starts at cache_idx > 0 + + current_au.state = avc || code == M2V_PIC_START ? access_unit::state::commenced : access_unit::state::m2v_sequence; + } + } + + // Search for delimiter in stream + for (; stream_it <= stream.end() - sizeof(u32); stream_it++) + { + if (const be_t code = read_from_ptr>(stream_it); + (avc && code == AVC_AU_DELIMITER) || (!avc && (code == M2V_PIC_START || code == M2V_SEQUENCE_HEADER || code == M2V_SEQUENCE_END))) + { + if (current_au.state != access_unit::state::none && (avc || current_au.state != access_unit::state::m2v_sequence)) + { + stream_it += !avc && code == M2V_SEQUENCE_END ? sizeof(u32) : 0; // The sequence end code is included in the access unit + current_au.state = access_unit::state::complete; + return; + } + + au_chunk_begin = avc || current_au.state == access_unit::state::none ? stream_it : au_chunk_begin; + current_au.state = avc || code == M2V_PIC_START ? access_unit::state::commenced : access_unit::state::m2v_sequence; + } + } + }(); + + if (current_au.state != access_unit::state::none) + { + au_chunk.data = { au_chunk_begin, stream_it }; + std::copy_n(cache.begin(), cache_idx, std::back_inserter(au_chunk.cached_data)); + cache.erase(cache.begin(), cache.begin() + cache_idx); + } + + // Cache the end of the stream if an access unit wasn't completed. There could be the beginning of a delimiter in the last three bytes + if (current_au.state != access_unit::state::complete) + { + std::copy(stream_it, stream.end(), std::back_inserter(cache)); + } + + return static_cast((current_au.state != access_unit::state::complete || stream_it > stream.end() ? stream.end() : stream_it) - stream.begin()); +} + +u32 dmux_pamf_base::lpcm_stream::parse_stream_header(std::span pes_packet_data, [[maybe_unused]] s8 pts_dts_flag) +{ + // Not checked on LLE + if (pes_packet_data.size() < sizeof(u8) + 0x10) + { + return umax; + } + + std::memcpy(au_specific_info_buf.data(), &pes_packet_data[1], au_specific_info_buf.size()); + return parse_audio_stream_header<0x7ff>(pes_packet_data); +} + +u32 dmux_pamf_base::lpcm_stream::parse_stream(std::span stream) +{ + if (current_au.state == access_unit::state::none) + { + current_au.au_specific_info_buf = au_specific_info_buf; + } + + if (au_max_size - current_au.accumulated_size > stream.size()) + { + au_chunk.data = stream; + current_au.state = current_au.state == access_unit::state::none ? access_unit::state::commenced : access_unit::state::incomplete; + } + else + { + au_chunk.data = stream.first(au_max_size - current_au.accumulated_size); + current_au.state = access_unit::state::complete; + } + + return static_cast(au_chunk.data.size()); +} + +template +u32 dmux_pamf_base::audio_stream::parse_stream(std::span stream) +{ + const auto parse_au_size = [](be_t data) -> u16 + { + if constexpr (ac3) + { + if (const u8 fscod = data >> 14, frmsizecod = data >> 8 & 0x3f; fscod < 3 && frmsizecod < 38) + { + return AC3_FRMSIZE_TABLE[fscod][frmsizecod] * sizeof(s16); + } + } + else if ((data & 0x3ff) < 0x200) + { + return ((data & 0x3ff) + 1) * 8 + ATRACX_ATS_HEADER_SIZE; + } + + return 0; + }; + + if (current_au.state != access_unit::state::none) + { + current_au.state = access_unit::state::incomplete; + } + + // Concatenate the cache of the previous stream section and the beginning of the current section + std::array buf{}; + ensure(cache.size() <= 3, "The size of the cache should never exceed three bytes"); + std::ranges::copy(cache, buf.begin()); + std::copy_n(stream.begin(), std::min(sizeof(u16) - 1, stream.size()), buf.begin() + cache.size()); + + auto au_chunk_begin = stream.begin(); + s32 cache_idx = 0; + auto stream_it = stream.begin(); + [&] + { + // Search for delimiter in cache + for (; cache_idx <= static_cast(cache.size() + std::min(sizeof(u16) - 1, stream.size()) - sizeof(u16)); cache_idx++) + { + if (const be_t tmp = read_from_ptr>(buf.data() + cache_idx); current_au.size_info_offset != 0) + { + if (--current_au.size_info_offset == 0) + { + current_au.parsed_size = parse_au_size(tmp); + } + } + else if (tmp == SYNC_WORD) + { + if (current_au.state == access_unit::state::none) + { + // If current_au.state is none and there was a delimiter found here, then LLE outputs the entire cache, even if the access unit starts at cache_idx > 0 + + current_au.size_info_offset = ac3 ? sizeof(u16) * 2 : sizeof(u16); + current_au.state = access_unit::state::commenced; + } + else if (const u32 au_size = current_au.accumulated_size + cache_idx; au_size >= current_au.parsed_size) + { + current_au.state = au_size == current_au.parsed_size ? access_unit::state::complete : access_unit::state::size_mismatch; + return; + } + } + } + + // As long as the current access unit hasn't reached the size indicated in its header, we don't need to parse the stream + if (current_au.state != access_unit::state::none && current_au.size_info_offset == 0 && current_au.accumulated_size + cache.size() < current_au.parsed_size) + { + stream_it += std::min(current_au.parsed_size - current_au.accumulated_size - cache.size(), stream.size() - sizeof(u32)); + } + + // Search for delimiter in stream + for (; stream_it <= stream.end() - sizeof(u32); stream_it++) // LLE uses sizeof(u32), even though the delimiter is only two bytes large + { + if (const be_t tmp = read_from_ptr>(stream_it); current_au.size_info_offset != 0) + { + if (--current_au.size_info_offset == 0) + { + current_au.parsed_size = parse_au_size(tmp); + } + } + else if (tmp == SYNC_WORD) + { + if (current_au.state == access_unit::state::none) + { + au_chunk_begin = stream_it; + current_au.size_info_offset = ac3 ? sizeof(u16) * 2 : sizeof(u16); + current_au.state = access_unit::state::commenced; + } + else if (const u32 au_size = static_cast(current_au.accumulated_size + stream_it - au_chunk_begin + cache.size()); au_size >= current_au.parsed_size) + { + current_au.state = au_size == current_au.parsed_size ? access_unit::state::complete : access_unit::state::size_mismatch; + return; + } + } + } + }(); + + if (current_au.state != access_unit::state::none) + { + au_chunk.data = { au_chunk_begin, stream_it }; + std::copy_n(cache.begin(), cache_idx, std::back_inserter(au_chunk.cached_data)); + cache.erase(cache.begin(), cache.begin() + cache_idx); + } + + // Cache the end of the stream if an access unit wasn't completed. There could be the beginning of a delimiter in the last three bytes + if (current_au.state != access_unit::state::complete && current_au.state != access_unit::state::size_mismatch) + { + std::copy(stream_it, stream.end(), std::back_inserter(cache)); + } + + return static_cast((current_au.state != access_unit::state::complete ? stream.end() : stream_it) - stream.begin()); +} + +u32 dmux_pamf_base::user_data_stream::parse_stream_header(std::span pes_packet_data, s8 pts_dts_flag) +{ + if (pts_dts_flag < 0) // PTS field exists + { + // Not checked on LLE + if (pes_packet_data.size() < 2 + sizeof(u32)) + { + return umax; + } + + au_size_unk = read_from_ptr>(pes_packet_data.begin() + 2) - sizeof(u32); + return 10; + } + + return 2; +} + +u32 dmux_pamf_base::user_data_stream::parse_stream(std::span stream) +{ + if (au_size_unk > stream.size()) + { + au_chunk.data = stream; + au_size_unk -= static_cast(stream.size()); + current_au.state = access_unit::state::commenced; // User data streams always use commenced + } + else + { + au_chunk.data = stream.first(au_size_unk); + au_size_unk = 0; + current_au.state = access_unit::state::complete; + } + + return static_cast(stream.size()); // Always consume the entire stream +} + +bool dmux_pamf_base::enable_es(u32 stream_id, u32 private_stream_id, bool is_avc, std::span au_queue_buffer, u32 au_max_size, bool raw_es, u32 user_data) +{ + const auto [type_idx, channel] = dmuxPamfStreamIdToTypeChannel(stream_id, private_stream_id); + + if (type_idx == DMUX_PAMF_STREAM_TYPE_INDEX_INVALID || elementary_stream::is_enabled(elementary_streams[type_idx][channel])) + { + return false; + } + + this->raw_es = raw_es; + pack_es_type_idx = type_idx; + + switch (type_idx) + { + case DMUX_PAMF_STREAM_TYPE_INDEX_VIDEO: + elementary_streams[0][channel] = is_avc ? static_cast>(std::make_unique>(channel, au_max_size, *this, user_data, au_queue_buffer)) + : std::make_unique>(channel, au_max_size, *this, user_data, au_queue_buffer); + return true; + + case DMUX_PAMF_STREAM_TYPE_INDEX_LPCM: elementary_streams[1][channel] = std::make_unique(channel, au_max_size, *this, user_data, au_queue_buffer); return true; + case DMUX_PAMF_STREAM_TYPE_INDEX_AC3: elementary_streams[2][channel] = std::make_unique>(channel, au_max_size, *this, user_data, au_queue_buffer); return true; + case DMUX_PAMF_STREAM_TYPE_INDEX_ATRACX: elementary_streams[3][channel] = std::make_unique>(channel, au_max_size, *this, user_data, au_queue_buffer); return true; + case DMUX_PAMF_STREAM_TYPE_INDEX_USER_DATA: elementary_streams[4][channel] = std::make_unique(channel, au_max_size, *this, user_data, au_queue_buffer); return true; + default: fmt::throw_exception("Unreachable"); + } +} + +bool dmux_pamf_base::disable_es(u32 stream_id, u32 private_stream_id) +{ + const auto [type_idx, channel] = dmuxPamfStreamIdToTypeChannel(stream_id, private_stream_id); + + if (type_idx == DMUX_PAMF_STREAM_TYPE_INDEX_INVALID || !elementary_stream::is_enabled(elementary_streams[type_idx][channel])) + { + return false; + } + + elementary_streams[type_idx][channel] = nullptr; + return true; +} + +bool dmux_pamf_base::release_au(u32 stream_id, u32 private_stream_id, u32 au_size) const +{ + const auto [type_idx, channel] = dmuxPamfStreamIdToTypeChannel(stream_id, private_stream_id); + + if (type_idx == DMUX_PAMF_STREAM_TYPE_INDEX_INVALID || !elementary_stream::is_enabled(elementary_streams[type_idx][channel])) + { + return false; + } + + elementary_streams[type_idx][channel]->release_au(au_size); + return true; +} + +bool dmux_pamf_base::flush_es(u32 stream_id, u32 private_stream_id) +{ + const auto [type_idx, channel] = dmuxPamfStreamIdToTypeChannel(stream_id, private_stream_id); + + if (type_idx == DMUX_PAMF_STREAM_TYPE_INDEX_INVALID || !elementary_stream::is_enabled(elementary_streams[type_idx][channel])) + { + return false; + } + + state = state::initial; + elementary_streams[type_idx][channel]->flush_es(); + return true; +} + +void dmux_pamf_base::set_stream(std::span stream, bool continuity) +{ + if (!continuity) + { + std::ranges::for_each(elementary_streams | std::views::join | std::views::filter(elementary_stream::is_enabled), &elementary_stream::discard_access_unit); + } + + state = state::initial; + + // Not checked on LLE, it would parse old memory contents or uninitialized memory if the size of the input stream set by the user is not a multiple of 0x800. + // Valid PAMF streams are always a multiple of 0x800 bytes large. + if ((stream.size() & 0x7ff) != 0) + { + cellDmuxPamf.warning("Invalid stream size"); + } + + this->stream = stream; + demux_done_notified = false; +} + +void dmux_pamf_base::reset_stream() +{ + std::ranges::for_each(elementary_streams | std::views::join | std::views::filter(elementary_stream::is_enabled), &elementary_stream::discard_access_unit); + state = state::initial; + stream.reset(); +} + +bool dmux_pamf_base::reset_es(u32 stream_id, u32 private_stream_id, u8* au_addr) +{ + const auto [type_idx, channel] = dmuxPamfStreamIdToTypeChannel(stream_id, private_stream_id); + + if (type_idx == DMUX_PAMF_STREAM_TYPE_INDEX_INVALID || !elementary_stream::is_enabled(elementary_streams[type_idx][channel])) + { + return false; + } + + if (!au_addr) + { + state = state::initial; + } + + elementary_streams[type_idx][channel]->reset_es(au_addr); + return true; +} + +bool dmux_pamf_base::process_next_pack() +{ + if (!stream) + { + demux_done_notified = demux_done_notified || on_demux_done(); + return true; + } + + switch (state) + { + case state::initial: + { + // Search for the next pack start code or prog end code + std::span pack{ static_cast(nullptr), PACK_SIZE }; // This initial value is not used, can't be default constructed + + for (;;) + { + if (stream->empty()) + { + stream.reset(); + demux_done_notified = on_demux_done(); + return true; + } + + pack = stream->subspan<0, PACK_SIZE>(); + stream = stream->subspan(); + + // If the input stream is a raw elementary stream, skip everything MPEG-PS related and go straight to elementary stream parsing + if (raw_es) + { + if (elementary_stream::is_enabled(elementary_streams[pack_es_type_idx][0])) + { + elementary_streams[pack_es_type_idx][0]->set_pes_packet_data(pack); + } + + state = state::elementary_stream; + return true; + } + + // While LLE is actually searching the entire section for a pack start code or program end code, + // it doesn't set its current reading position to the address where it found the code, so it would bug out if there isn't one at the start of the section + + if (const be_t code = read_from_ptr>(pack); code == PACK_START) + { + break; + } + else if (code == PROG_END) + { + if (!on_prog_end()) + { + state = state::prog_end; + } + + return true; + } + + cellDmuxPamf.warning("No start code found at the beginning of the current section"); + } + + // Skip over pack header + const u8 pack_stuffing_length = read_from_ptr(pack.subspan()) & 0x7; + std::span current_pes_packet = pack.subspan(PACK_STUFFING_LENGTH_OFFSET + sizeof(u8) + pack_stuffing_length); + + if (read_from_ptr>(current_pes_packet) >> 8 != PACKET_START_CODE_PREFIX) + { + cellDmuxPamf.error("Invalid start code after pack header"); + return false; + } + + // Skip over system header if present + if (read_from_ptr>(current_pes_packet) == SYSTEM_HEADER) + { + const u32 system_header_length = read_from_ptr>(current_pes_packet.begin() + PES_PACKET_LENGTH_OFFSET) + PES_PACKET_LENGTH_OFFSET + sizeof(u16); + + // Not checked on LLE, the SPU task would just increment the reading position and read random data in the SPU local store + if (system_header_length + PES_HEADER_DATA_LENGTH_OFFSET + sizeof(u8) > current_pes_packet.size()) + { + cellDmuxPamf.error("Invalid system header length"); + return false; + } + + current_pes_packet = current_pes_packet.subspan(system_header_length); + + // The SPU thread isn't doing load + rotate here for 4-byte loading (in valid PAMF streams, the next start code after a system header is always 0x10 byte aligned) + const u32 offset_low = (current_pes_packet.data() - pack.data()) & 0xf; + current_pes_packet = { current_pes_packet.begin() - offset_low, current_pes_packet.end() }; + + if (const be_t code = read_from_ptr>(current_pes_packet); code >> 8 != PACKET_START_CODE_PREFIX) + { + cellDmuxPamf.error("Invalid start code after system header"); + return false; + } + else if (code == PRIVATE_STREAM_2) + { + // A system header is optionally followed by a private stream 2 + // The first two bytes of the stream are the stream id of a video stream. The next access unit of that stream is a random access point/keyframe + + const u16 pes_packet_length = read_from_ptr>(current_pes_packet.begin() + PES_PACKET_LENGTH_OFFSET) + PES_PACKET_LENGTH_OFFSET + sizeof(u16); + + // Not checked on LLE, the SPU task would just increment the reading position and read random data in the SPU local store + if (pes_packet_length + PES_HEADER_DATA_LENGTH_OFFSET + sizeof(u8) > current_pes_packet.size()) + { + cellDmuxPamf.error("Invalid private stream 2 length"); + return false; + } + + if (const u8 channel = read_from_ptr>(current_pes_packet.begin() + PES_PACKET_LENGTH_OFFSET + sizeof(u16)) & 0xf; + elementary_stream::is_enabled(elementary_streams[0][channel])) + { + elementary_streams[0][channel]->set_rap(); + } + + current_pes_packet = current_pes_packet.subspan(pes_packet_length); + } + } + + // Parse PES packet + // LLE only parses the first PES packet per pack (valid PAMF streams only have one PES packet per pack, not including the system header + private stream 2) + + const u32 pes_packet_start_code = read_from_ptr>(current_pes_packet); + + if (pes_packet_start_code >> 8 != PACKET_START_CODE_PREFIX) + { + cellDmuxPamf.error("Invalid start code"); + return false; + } + + // The size of the stream is not checked here because if coming from a pack header, it is guaranteed that there is enough space, + // and if coming from a system header or private stream 2, it was already checked above + const u16 pes_packet_length = read_from_ptr>(current_pes_packet.begin() + PES_PACKET_LENGTH_OFFSET) + PES_PACKET_LENGTH_OFFSET + sizeof(u16); + const u8 pes_header_data_length = read_from_ptr(current_pes_packet.begin() + PES_HEADER_DATA_LENGTH_OFFSET) + PES_HEADER_DATA_LENGTH_OFFSET + sizeof(u8); + + // Not checked on LLE, the SPU task would just increment the reading position and read random data in the SPU local store + if (pes_packet_length > current_pes_packet.size() || pes_packet_length <= pes_header_data_length) + { + cellDmuxPamf.error("Invalid pes packet length"); + return false; + } + + const std::span pes_packet_data = current_pes_packet.subspan(pes_header_data_length, pes_packet_length - pes_header_data_length); + + const auto [type_idx, channel] = dmuxPamfStreamIdToTypeChannel(pes_packet_start_code, read_from_ptr(pes_packet_data)); + + if (type_idx == DMUX_PAMF_STREAM_TYPE_INDEX_INVALID) + { + cellDmuxPamf.error("Invalid stream type"); + return false; + } + + pack_es_type_idx = type_idx; + pack_es_channel = channel; + + if (elementary_stream::is_enabled(elementary_streams[type_idx][channel])) + { + const s8 pts_dts_flag = read_from_ptr(current_pes_packet.begin() + PTS_DTS_FLAG_OFFSET); + + if (pts_dts_flag < 0) + { + // The timestamps should be unsigned, but are sign-extended from s32 to u64 on LLE. They probably forgot about integer promotion + const s32 PTS_32_30 = read_from_ptr(current_pes_packet.begin() + 9) >> 1; + const s32 PTS_29_15 = read_from_ptr>(current_pes_packet.begin() + 10) >> 1; + const s32 PTS_14_0 = read_from_ptr>(current_pes_packet.begin() + 12) >> 1; + + elementary_streams[type_idx][channel]->set_pts(PTS_32_30 << 30 | PTS_29_15 << 15 | PTS_14_0); // Bit 32 is discarded + } + + if (pts_dts_flag & 0x40) + { + const s32 DTS_32_30 = read_from_ptr(current_pes_packet.begin() + 14) >> 1; + const s32 DTS_29_15 = read_from_ptr>(current_pes_packet.begin() + 15) >> 1; + const s32 DTS_14_0 = read_from_ptr>(current_pes_packet.begin() + 17) >> 1; + + elementary_streams[type_idx][channel]->set_dts(DTS_32_30 << 30 | DTS_29_15 << 15 | DTS_14_0); // Bit 32 is discarded + } + + const usz stream_header_size = elementary_streams[type_idx][channel]->parse_stream_header(pes_packet_data, pts_dts_flag); + + // Not checked on LLE, the SPU task would just increment the reading position and read random data in the SPU local store + if (stream_header_size > pes_packet_data.size()) + { + cellDmuxPamf.error("Invalid stream header size"); + return false; + } + + elementary_streams[type_idx][channel]->set_pes_packet_data(pes_packet_data.subspan(stream_header_size)); + } + + state = state::elementary_stream; + [[fallthrough]]; + } + case state::elementary_stream: + { + if (!elementary_stream::is_enabled(elementary_streams[pack_es_type_idx][pack_es_channel]) || elementary_streams[pack_es_type_idx][pack_es_channel]->process_pes_packet_data()) + { + state = state::initial; + } + + return true; + } + case state::prog_end: + { + if (on_prog_end()) + { + state = state::initial; + } + + return true; + } + default: + fmt::throw_exception("Unreachable"); + } +} + +u32 dmux_pamf_base::get_enabled_es_count() const +{ + return static_cast(std::ranges::count_if(elementary_streams | std::views::join, elementary_stream::is_enabled)); +} + +bool dmux_pamf_spu_context::get_next_cmd(DmuxPamfCommand& lhs, bool new_stream) const +{ + cellDmuxPamf.trace("Getting next command"); + + if (cmd_queue->pop(lhs)) + { + cellDmuxPamf.trace("Command type: %d", static_cast(lhs.type.get())); + return true; + } + + if ((new_stream || has_work()) && !wait_for_au_queue && !wait_for_event_queue) + { + cellDmuxPamf.trace("No new command, continuing demuxing"); + return false; + } + + cellDmuxPamf.trace("No new command and nothing to do, waiting..."); + + cmd_queue->wait(); + + if (thread_ctrl::state() == thread_state::aborting) + { + return false; + } + + ensure(cmd_queue->pop(lhs)); + + cellDmuxPamf.trace("Command type: %d", static_cast(lhs.type.get())); + return true; +} + +bool dmux_pamf_spu_context::send_event(auto&&... args) const +{ + if (event_queue->size() >= max_enqueued_events) + { + return false; + } + + return ensure(event_queue->emplace(std::forward(args)..., event_queue_was_too_full)); +} + +void dmux_pamf_spu_context::operator()() // cellSpursMain() +{ + DmuxPamfCommand cmd; + + while (thread_ctrl::state() != thread_state::aborting) + { + if (get_next_cmd(cmd, new_stream)) + { + event_queue_was_too_full = wait_for_event_queue; + wait_for_event_queue = false; + wait_for_au_queue = false; + + ensure(cmd_result_queue->emplace(static_cast(cmd.type.value()) + 1)); + + switch (cmd.type) + { + case DmuxPamfCommandType::enable_es: + max_enqueued_events += 2; + enable_es(cmd.enable_es.stream_id, cmd.enable_es.private_stream_id, cmd.enable_es.is_avc, { cmd.enable_es.au_queue_buffer.get_ptr(), cmd.enable_es.au_queue_buffer_size }, + cmd.enable_es.au_max_size, cmd.enable_es.is_raw_es, cmd.enable_es.user_data); + break; + + case DmuxPamfCommandType::disable_es: + disable_es(cmd.disable_flush_es.stream_id, cmd.disable_flush_es.private_stream_id); + max_enqueued_events -= 2; + break; + + case DmuxPamfCommandType::set_stream: + new_stream = true; + break; + + case DmuxPamfCommandType::release_au: + release_au(cmd.release_au.stream_id, cmd.release_au.private_stream_id, cmd.release_au.au_size); + break; + + case DmuxPamfCommandType::flush_es: + flush_es(cmd.disable_flush_es.stream_id, cmd.disable_flush_es.private_stream_id); + break; + + case DmuxPamfCommandType::close: + while (!send_event(DmuxPamfEventType::close)) {} + return; + + case DmuxPamfCommandType::reset_stream: + reset_stream(); + break; + + case DmuxPamfCommandType::reset_es: + reset_es(cmd.reset_es.stream_id, cmd.reset_es.private_stream_id, cmd.reset_es.au_addr ? cmd.reset_es.au_addr.get_ptr() : nullptr); + break; + + case DmuxPamfCommandType::resume: + break; + + default: + cellDmuxPamf.error("Invalid command"); + return; + } + } + else if (thread_ctrl::state() == thread_state::aborting) + { + return; + } + + // Only set the new stream once the previous one has been entirely consumed + if (new_stream && !has_work()) + { + new_stream = false; + + DmuxPamfStreamInfo stream_info; + ensure(stream_info_queue->pop(stream_info)); + + set_stream({ stream_info.stream_addr.get_ptr(), stream_info.stream_size }, stream_info.continuity); + } + + process_next_pack(); + } +} + +void dmux_pamf_base::elementary_stream::save(utils::serial& ar) +{ + // These need to be saved first since they need to be initialized in the constructor's initializer list + if (ar.is_writing()) + { + ar(au_max_size, user_data); + au_queue.save(ar); + } + + ar(state, au_size_unk, au_specific_info_buf, current_au, pts, dts, rap); + + if (state == state::pushing_au_queue) + { + ar(au_chunk.cached_data); + + if (ar.is_writing()) + { + ar(vm::get_addr(au_chunk.data.data()), static_cast(au_chunk.data.size())); + } + else + { + au_chunk.data = { vm::_ptr(ar.pop()), ar.pop() }; + } + } + + if (current_au.state != access_unit::state::complete) + { + ar(cache); + } + + bool save_stream = !!pes_packet_data; + ar(save_stream); + + if (save_stream) + { + if (ar.is_writing()) + { + ensure(stream_chunk.size() <= pes_packet_data->size()); + ar(vm::get_addr(pes_packet_data->data()), static_cast(pes_packet_data->size()), static_cast(stream_chunk.data() - pes_packet_data->data())); + } + else + { + pes_packet_data = { vm::_ptr(ar.pop()), ar.pop() }; + stream_chunk = { pes_packet_data->begin() + ar.pop(), pes_packet_data->end() }; + } + } +} + +void dmux_pamf_base::save_base(utils::serial& ar) +{ + bool stream_not_consumed = !!stream; + + ar(state, stream_not_consumed, demux_done_notified, pack_es_type_idx, raw_es); + + if (stream_not_consumed) + { + if (ar.is_writing()) + { + ar(vm::get_addr(stream->data()), static_cast(stream->size())); + } + else + { + stream = std::span{ vm::_ptr(ar.pop()), ar.pop() }; + } + } + + if (state == state::elementary_stream) + { + ar(pack_es_channel); + } + + std::array enabled_video_streams; + std::array avc_video_streams; + std::array enabled_lpcm_streams; + std::array enabled_ac3_streams; + std::array enabled_atracx_streams; + std::array enabled_user_data_streams; + + if (ar.is_writing()) + { + std::ranges::transform(elementary_streams[DMUX_PAMF_STREAM_TYPE_INDEX_VIDEO], enabled_video_streams.begin(), elementary_stream::is_enabled); + std::ranges::transform(elementary_streams[DMUX_PAMF_STREAM_TYPE_INDEX_VIDEO], avc_video_streams.begin(), [](auto& es){ return !!dynamic_cast*>(es.get()); }); + std::ranges::transform(elementary_streams[DMUX_PAMF_STREAM_TYPE_INDEX_LPCM], enabled_lpcm_streams.begin(), elementary_stream::is_enabled); + std::ranges::transform(elementary_streams[DMUX_PAMF_STREAM_TYPE_INDEX_AC3], enabled_ac3_streams.begin(), elementary_stream::is_enabled); + std::ranges::transform(elementary_streams[DMUX_PAMF_STREAM_TYPE_INDEX_ATRACX], enabled_atracx_streams.begin(), elementary_stream::is_enabled); + std::ranges::transform(elementary_streams[DMUX_PAMF_STREAM_TYPE_INDEX_USER_DATA], enabled_user_data_streams.begin(), elementary_stream::is_enabled); + } + + ar(enabled_video_streams, avc_video_streams, enabled_lpcm_streams, enabled_ac3_streams, enabled_atracx_streams, enabled_user_data_streams); + + if (ar.is_writing()) + { + std::ranges::for_each(elementary_streams | std::views::join | std::views::filter(elementary_stream::is_enabled), [&](const auto& es){ es->save(ar); }); + } + else + { + for (u32 ch = 0; ch < 0x10; ch++) + { + if (enabled_video_streams[ch]) + { + elementary_streams[DMUX_PAMF_STREAM_TYPE_INDEX_VIDEO][ch] = avc_video_streams[ch] ? static_cast>(std::make_unique>(ar, ch, *this)) + : std::make_unique>(ar, ch, *this); + } + } + + for (u32 ch = 0; ch < 0x10; ch++) + { + if (enabled_lpcm_streams[ch]) + { + elementary_streams[DMUX_PAMF_STREAM_TYPE_INDEX_LPCM][ch] = std::make_unique(ar, ch, *this); + } + } + + for (u32 ch = 0; ch < 0x10; ch++) + { + if (enabled_ac3_streams[ch]) + { + elementary_streams[DMUX_PAMF_STREAM_TYPE_INDEX_AC3][ch] = std::make_unique>(ar, ch, *this); + } + } + + for (u32 ch = 0; ch < 0x10; ch++) + { + if (enabled_atracx_streams[ch]) + { + elementary_streams[DMUX_PAMF_STREAM_TYPE_INDEX_ATRACX][ch] = std::make_unique>(ar, ch, *this); + } + } + + for (u32 ch = 0; ch < 0x10; ch++) + { + if (enabled_user_data_streams[ch]) + { + elementary_streams[DMUX_PAMF_STREAM_TYPE_INDEX_USER_DATA][ch] = std::make_unique(ar, ch, *this); + } + } + } +} + +void dmux_pamf_spu_context::save(utils::serial& ar) +{ + USING_SERIALIZATION_VERSION(cellDmuxPamf); + ar(cmd_queue, new_stream); // The queues are contiguous in guest memory, so we only need to save the address of the first one + save_base(ar); +} + + +// PPU thread + error_code _CellDmuxCoreOpQueryAttr(vm::cptr pamfSpecificInfo, vm::ptr pamfAttr) { cellDmuxPamf.todo("_CellDmuxCoreOpQueryAttr(pamfSpecificInfo=*0x%x, pamfAttr=*0x%x)", pamfSpecificInfo, pamfAttr); @@ -63,9 +1173,9 @@ error_code _CellDmuxCoreOpSetStream(vm::ptr handle, vm::cptr streamA return CELL_OK; } -error_code _CellDmuxCoreOpFreeMemory(vm::ptr esHandle, vm::ptr memAddr, u32 memSize) +error_code _CellDmuxCoreOpReleaseAu(vm::ptr esHandle, vm::ptr memAddr, u32 memSize) { - cellDmuxPamf.todo("_CellDmuxCoreOpFreeMemory(esHandle=*0x%x, memAddr=*0x%x, memSize=0x%x)", esHandle, memAddr, memSize); + cellDmuxPamf.todo("_CellDmuxCoreOpReleaseAu(esHandle=*0x%x, memAddr=*0x%x, memSize=0x%x)", esHandle, memAddr, memSize); return CELL_OK; } @@ -116,6 +1226,7 @@ error_code _CellDmuxCoreOpResetStreamAndWaitDone(vm::ptr handle) return CELL_OK; } +template static void init_gvar(const vm::gvar& var) { var->queryAttr.set(g_fxo->get().func_addr(FIND_FUNC(_CellDmuxCoreOpQueryAttr))); @@ -124,7 +1235,10 @@ static void init_gvar(const vm::gvar& var) var->resetStream.set(g_fxo->get().func_addr(FIND_FUNC(_CellDmuxCoreOpResetStream))); var->createThread.set(g_fxo->get().func_addr(FIND_FUNC(_CellDmuxCoreOpCreateThread))); var->joinThread.set(g_fxo->get().func_addr(FIND_FUNC(_CellDmuxCoreOpJoinThread))); - var->freeMemory.set(g_fxo->get().func_addr(FIND_FUNC(_CellDmuxCoreOpFreeMemory))); + var->setStream.set(g_fxo->get().func_addr(FIND_FUNC(_CellDmuxCoreOpSetStream))); + var->releaseAu.set(g_fxo->get().func_addr(FIND_FUNC(_CellDmuxCoreOpReleaseAu))); + var->queryEsAttr.set(g_fxo->get().func_addr(FIND_FUNC(_CellDmuxCoreOpQueryEsAttr))); + var->enableEs.set(g_fxo->get().func_addr(FIND_FUNC(_CellDmuxCoreOpEnableEs))); var->disableEs.set(g_fxo->get().func_addr(FIND_FUNC(_CellDmuxCoreOpDisableEs))); var->flushEs.set(g_fxo->get().func_addr(FIND_FUNC(_CellDmuxCoreOpFlushEs))); var->resetEs.set(g_fxo->get().func_addr(FIND_FUNC(_CellDmuxCoreOpResetEs))); @@ -133,21 +1247,8 @@ static void init_gvar(const vm::gvar& var) DECLARE(ppu_module_manager::cellDmuxPamf)("cellDmuxPamf", [] { - REG_VNID(cellDmuxPamf, 0x28b2b7b2, g_cell_dmux_core_ops_pamf).init = [] - { - g_cell_dmux_core_ops_pamf->setStream.set(g_fxo->get().func_addr(FIND_FUNC(_CellDmuxCoreOpSetStream))); - g_cell_dmux_core_ops_pamf->queryEsAttr.set(g_fxo->get().func_addr(FIND_FUNC(_CellDmuxCoreOpQueryEsAttr))); - g_cell_dmux_core_ops_pamf->enableEs.set(g_fxo->get().func_addr(FIND_FUNC(_CellDmuxCoreOpEnableEs))); - init_gvar(g_cell_dmux_core_ops_pamf); - }; - - REG_VNID(cellDmuxPamf, 0x9728a0e9, g_cell_dmux_core_ops_raw_es).init = [] - { - g_cell_dmux_core_ops_raw_es->setStream.set(g_fxo->get().func_addr(FIND_FUNC(_CellDmuxCoreOpSetStream))); - g_cell_dmux_core_ops_raw_es->queryEsAttr.set(g_fxo->get().func_addr(FIND_FUNC(_CellDmuxCoreOpQueryEsAttr))); - g_cell_dmux_core_ops_raw_es->enableEs.set(g_fxo->get().func_addr(FIND_FUNC(_CellDmuxCoreOpEnableEs))); - init_gvar(g_cell_dmux_core_ops_raw_es); - }; + REG_VNID(cellDmuxPamf, 0x28b2b7b2, g_cell_dmux_core_ops_pamf).init = []{ init_gvar(g_cell_dmux_core_ops_pamf); }; + REG_VNID(cellDmuxPamf, 0x9728a0e9, g_cell_dmux_core_ops_raw_es).init = []{ init_gvar(g_cell_dmux_core_ops_raw_es); }; REG_HIDDEN_FUNC(_CellDmuxCoreOpQueryAttr); REG_HIDDEN_FUNC(_CellDmuxCoreOpOpen); @@ -157,7 +1258,7 @@ DECLARE(ppu_module_manager::cellDmuxPamf)("cellDmuxPamf", [] REG_HIDDEN_FUNC(_CellDmuxCoreOpJoinThread); REG_HIDDEN_FUNC(_CellDmuxCoreOpSetStream); REG_HIDDEN_FUNC(_CellDmuxCoreOpSetStream); - REG_HIDDEN_FUNC(_CellDmuxCoreOpFreeMemory); + REG_HIDDEN_FUNC(_CellDmuxCoreOpReleaseAu); REG_HIDDEN_FUNC(_CellDmuxCoreOpQueryEsAttr); REG_HIDDEN_FUNC(_CellDmuxCoreOpQueryEsAttr); REG_HIDDEN_FUNC(_CellDmuxCoreOpEnableEs); diff --git a/rpcs3/Emu/Cell/Modules/cellDmuxPamf.h b/rpcs3/Emu/Cell/Modules/cellDmuxPamf.h index 01983b724a..c720fb89c5 100644 --- a/rpcs3/Emu/Cell/Modules/cellDmuxPamf.h +++ b/rpcs3/Emu/Cell/Modules/cellDmuxPamf.h @@ -1,5 +1,666 @@ #pragma once +#include "Emu/savestate_utils.hpp" +#include "Utilities/Thread.h" +#include "cellPamf.h" +#include "cellDmux.h" +#include + +// Replacement for CellSpursQueue +template requires(std::is_trivial_v && max_num_of_entries > 0) +class alignas(0x80) dmux_pamf_hle_spurs_queue +{ + T* buffer; + + alignas(atomic_t) std::array)> _size; // Stored in a byte array since the PPU context needs to be trivial + u32 front; + u32 back; + + template + bool _pop(T* lhs) + { + atomic_t& _size = *std::launder(reinterpret_cast*>(this->_size.data())); + + if (_size == 0) + { + return false; + } + + if (lhs) + { + *lhs = buffer[front]; + } + + if constexpr (!is_peek) + { + front = (front + 1) % max_num_of_entries; + _size--; + _size.notify_one(); + } + + return true; + } + +public: + void init(T (&buffer)[max_num_of_entries]) + { + this->buffer = buffer; + new (_size.data()) atomic_t(0); + front = 0; + back = 0; + } + + bool pop(T& lhs) { return _pop(&lhs); } + bool pop() { return _pop(nullptr); } + bool peek(T& lhs) const { return const_cast(this)->_pop(&lhs); } + bool emplace(auto&&... args) + { + atomic_t& _size = *std::launder(reinterpret_cast*>(this->_size.data())); + + if (_size >= max_num_of_entries) + { + return false; + } + + new (&buffer[back]) T(std::forward(args)...); + + back = (back + 1) % max_num_of_entries; + _size++; + _size.notify_one(); + + return true; + } + + [[nodiscard]] u32 size() const { return std::launder(reinterpret_cast*>(this->_size.data()))->observe(); } + + void wait() const + { + const atomic_t& _size = *std::launder(reinterpret_cast*>(this->_size.data())); + + while (_size == 0 && thread_ctrl::state() != thread_state::aborting) + { + thread_ctrl::wait_on(_size, 0); + } + } +}; + +enum class DmuxPamfCommandType : u32 +{ + enable_es = 0, + disable_es = 2, + set_stream = 4, + release_au = 6, + flush_es = 8, + close = 10, + reset_stream = 12, + reset_es = 14, + resume = 16, +}; + +struct alignas(0x80) DmuxPamfCommand +{ + be_t type; + + union + { + struct + { + be_t stream_id; + be_t private_stream_id; + be_t is_avc; + vm::bptr au_queue_buffer; + be_t au_queue_buffer_size; + be_t au_max_size; + be_t au_specific_info_size; + be_t is_raw_es; + be_t user_data; + } + enable_es; + + struct + { + be_t stream_id; + be_t private_stream_id; + } + disable_flush_es; + + struct + { + vm::ptr> au_addr; + be_t au_size; + be_t stream_id; + be_t private_stream_id; + } + release_au; + + struct + { + be_t stream_id; + be_t private_stream_id; + vm::ptr> au_addr; + } + reset_es; + }; + + DmuxPamfCommand() = default; + + DmuxPamfCommand(be_t&& type) + : type(type) + { + } + + DmuxPamfCommand(be_t&& type, const be_t& stream_id, const be_t& private_stream_id) + : type(type), disable_flush_es{ stream_id, private_stream_id } + { + } + + DmuxPamfCommand(be_t&& type, const be_t& stream_id, const be_t& private_stream_id, const vm::ptr>& au_addr) + : type(type), reset_es{ stream_id, private_stream_id, au_addr } + { + } + + DmuxPamfCommand(be_t&& type, const vm::ptr>& au_addr, const be_t& au_size, const be_t& stream_id, const be_t& private_stream_id) + : type(type), release_au{ au_addr, au_size, stream_id, private_stream_id } + { + } + + DmuxPamfCommand(be_t&& type, const be_t& stream_id, const be_t& private_stream_id, const be_t& is_avc, const vm::bptr& au_queue_buffer, + const be_t& au_queue_buffer_size, const be_t& au_max_size, const be_t& au_specific_info_size, const be_t& is_raw_es, const be_t& user_data) + : type(type), enable_es{ stream_id, private_stream_id, is_avc, au_queue_buffer, au_queue_buffer_size, au_max_size, au_specific_info_size, is_raw_es, user_data } + { + } +}; + +CHECK_SIZE_ALIGN(DmuxPamfCommand, 0x80, 0x80); + +enum class DmuxPamfEventType : u32 +{ + au_found, + demux_done, + fatal_error, + close, + flush_done, + prog_end_code, +}; + +struct alignas(0x80) DmuxPamfEvent +{ + be_t type; + + union + { + u8 pad[0x78]; + + struct + { + be_t stream_id; + be_t private_stream_id; + vm::ptr> au_addr; + CellCodecTimeStamp pts; + CellCodecTimeStamp dts; + be_t unk; + u8 reserved[4]; + be_t au_size; + be_t stream_header_size; + std::array stream_header_buf; + be_t user_data; + be_t is_rap; + } + au_found; + + struct + { + be_t stream_id; + be_t private_stream_id; + be_t user_data; + } + flush_done; + }; + + be_t event_queue_was_too_full; + + DmuxPamfEvent() = default; + + DmuxPamfEvent(be_t&& type, const be_t& event_queue_was_too_full) + : type(type), event_queue_was_too_full(event_queue_was_too_full) + { + } + + DmuxPamfEvent(be_t&& type, const be_t& stream_id, const be_t& private_stream_id, const be_t& user_data, const be_t& event_queue_was_too_full) + : type(type), flush_done{ stream_id, private_stream_id, user_data }, event_queue_was_too_full(event_queue_was_too_full) + { + } + + DmuxPamfEvent(be_t&& type, const be_t& stream_id, const be_t& private_stream_id, const vm::ptr>& au_addr, const CellCodecTimeStamp& pts, const CellCodecTimeStamp& dts, const be_t& unk, + const be_t& au_size, const be_t& au_specific_info_size, const std::array& au_specific_info, const be_t& user_data, const be_t& is_rap, const be_t& event_queue_was_too_full) + : type(type) + , au_found{ stream_id, private_stream_id, au_addr, pts, dts, static_cast>(unk), {}, au_size, au_specific_info_size, au_specific_info, user_data, is_rap } + , event_queue_was_too_full(event_queue_was_too_full) + { + } +}; + +CHECK_SIZE_ALIGN(DmuxPamfEvent, 0x80, 0x80); + +struct alignas(0x80) DmuxPamfStreamInfo +{ + vm::bcptr stream_addr; + be_t stream_size; + be_t user_data; + be_t continuity; + be_t is_raw_es; +}; + +CHECK_SIZE_ALIGN(DmuxPamfStreamInfo, 0x80, 0x80); + +enum DmuxPamfStreamTypeIndex +{ + DMUX_PAMF_STREAM_TYPE_INDEX_INVALID = -1, + DMUX_PAMF_STREAM_TYPE_INDEX_VIDEO, + DMUX_PAMF_STREAM_TYPE_INDEX_LPCM, + DMUX_PAMF_STREAM_TYPE_INDEX_AC3, + DMUX_PAMF_STREAM_TYPE_INDEX_ATRACX, + DMUX_PAMF_STREAM_TYPE_INDEX_USER_DATA, +}; + + +// SPU thread + +class dmux_pamf_base +{ + // Event handlers for the demuxer. These correspond to the events that the SPU thread sends to the PPU thread on LLE (except for au_queue_full(): the SPU thread just sets a global bool, + // but it is never notified to the PPU thread or the user). + + virtual bool on_au_found(u8 stream_id, u8 private_stream_id, u32 user_data, std::span au, u64 pts, u64 dts, bool rap, u8 au_specific_info_size, std::array au_specific_info_buf) = 0; + virtual bool on_demux_done() = 0; + virtual void on_fatal_error() = 0; + virtual bool on_flush_done(u8 stream_id, u8 private_stream_id, u32 user_data) = 0; + virtual bool on_prog_end() = 0; + virtual void on_au_queue_full() = 0; + +public: + virtual ~dmux_pamf_base() = default; + + bool enable_es(u32 stream_id, u32 private_stream_id, bool is_avc, std::span au_queue_buffer, u32 au_max_size, bool raw_es, u32 user_data); + bool disable_es(u32 stream_id, u32 private_stream_id); + bool release_au(u32 stream_id, u32 private_stream_id, u32 au_size) const; + bool flush_es(u32 stream_id, u32 private_stream_id); + void set_stream(std::span stream, bool continuity); + void reset_stream(); + bool reset_es(u32 stream_id, u32 private_stream_id, u8* au_addr); + bool process_next_pack(); + +protected: + void save_base(utils::serial& ar); + [[nodiscard]] bool has_work() const { return !!stream || !demux_done_notified; } + [[nodiscard]] u32 get_enabled_es_count() const; + +private: + static constexpr u16 PACK_SIZE = 0x800; + static constexpr s8 PACK_STUFFING_LENGTH_OFFSET = 0xd; + static constexpr s8 PES_PACKET_LENGTH_OFFSET = 0x4; + static constexpr s8 PES_HEADER_DATA_LENGTH_OFFSET = 0x8; + static constexpr s8 PTS_DTS_FLAG_OFFSET = 0x7; + static constexpr u8 PACKET_START_CODE_PREFIX = 1; + + static constexpr be_t M2V_PIC_START = 0x100; + static constexpr be_t AVC_AU_DELIMITER = 0x109; + static constexpr be_t M2V_SEQUENCE_HEADER = 0x1b3; + static constexpr be_t M2V_SEQUENCE_END = 0x1b7; + static constexpr be_t PACK_START = 0x1ba; + static constexpr be_t SYSTEM_HEADER = 0x1bb; + static constexpr be_t PRIVATE_STREAM_1 = 0x1bd; + static constexpr be_t PRIVATE_STREAM_2 = 0x1bf; + static constexpr be_t PROG_END = 0x1b9; + static constexpr be_t VIDEO_STREAM_BASE = 0x1e0; // The lower 4 bits indicate the channel + + // Partial access unit that will be written to the output queue + struct access_unit_chunk + { + std::vector cached_data; // Up to three bytes of data from the previous PES packet (copied into this vector, since it might not be in memory anymore) + std::span data; // Data of the current PES packet + }; + + // Output queue for access units + // The queue doesn't keep track of where access units are in the buffer (only which parts are used and which are free), this has to be done extenally + class output_queue + { + public: + explicit output_queue(std::span buffer) : buffer(buffer) {} + + explicit output_queue(utils::serial& ar) + : buffer{vm::_ptr(ar.pop()), ar.pop()} + , back(vm::_ptr(ar.pop())) + , front(vm::_ptr(ar.pop())) + , wrap_pos(vm::_ptr(ar.pop())) + { + } + + void save(utils::serial& ar) const { ar(vm::get_addr(buffer.data()), static_cast(buffer.size()), vm::get_addr(back), vm::get_addr(front), vm::get_addr(wrap_pos)); } + + // The queue itself doesn't keep track of the location of each access unit, so the pop and access operations need the size or address of the access unit to remove/return + void pop_back(u32 au_size); + void pop_back(u8* au_addr); + void pop_front(u32 au_size); + [[nodiscard]] const u8* peek_back(u32 au_size) const { return back - au_size; } + + void clear() { wrap_pos = front = back = buffer.data(); } + + void push_unchecked(const access_unit_chunk& au_chunk); + bool push(const access_unit_chunk& au_chunk, const std::function& on_fatal_error); + + [[nodiscard]] bool prepare_next_au(u32 au_max_size); + + [[nodiscard]] usz get_free_size() const { return wrap_pos != buffer.data() ? front - back : std::to_address(buffer.end()) - back; } + + private: + const std::span buffer; + + // Since access units have a variable size, uses pointers instead of indices + u8* back = buffer.data(); + const u8* front = buffer.data(); + const u8* wrap_pos = buffer.data(); // The address where the back pointer wrapped around to the beginning of the queue + }; + + // Base class for elementary streams and subclasses for each stream type + // Responsible for processing the data section of PES packets and splitting it into access units with the stream parsers of each subclass + class elementary_stream + { + public: + elementary_stream(u8 channel, u32 au_max_size, dmux_pamf_base& ctx, u32 user_data, u8 au_specific_info_size, std::span au_queue_buffer) + : channel(channel) + , au_max_size(au_max_size == umax || au_max_size > au_queue_buffer.size() ? 0x800 : au_max_size) + , ctx(ctx) + , au_specific_info_size(au_specific_info_size) + , user_data(user_data) + , au_queue(au_queue_buffer) + { + // The cache sizes will never exceed three bytes + cache.reserve(3); + au_chunk.cached_data.reserve(3); + } + + elementary_stream(utils::serial& ar, u8 channel, dmux_pamf_base& ctx, u8 au_specific_info_size) + : channel(channel) + , au_max_size(ar.pop()) + , ctx(ctx) + , au_specific_info_size(au_specific_info_size) + , user_data(ar.pop()) + , au_queue(ar) + { + save(ar); + } + + virtual ~elementary_stream() = default; + void save(utils::serial& ar); + + static bool is_enabled(const std::unique_ptr& es) { return !!es; } + + [[nodiscard]] virtual std::pair get_stream_id() const = 0; + + void set_pes_packet_data(std::span pes_packet_data) { ensure(!this->pes_packet_data); this->pes_packet_data = this->stream_chunk = pes_packet_data; } + void set_pts(u64 pts) { this->pts = pts; } + void set_dts(u64 dts) { this->dts = dts; } + void set_rap() { rap = true; } + + // Parses the proprietary header of private streams. Returns the size of the header or umax if the stream is invalid + virtual u32 parse_stream_header(std::span elementary_stream, s8 pts_dts_flag) = 0; + + // Processes the current PES packet. Returns true if it has been entirely consumed + bool process_pes_packet_data(); + + void release_au(u32 au_size) { au_queue.pop_front(au_size); } + void flush_es(); + void reset_es(u8* au_addr); + void discard_access_unit(); + + protected: + const u8 channel : 4; + const u32 au_max_size; // Maximum possible size of an access unit + u32 au_size_unk = 0; // For user data streams, used to store the size of the current access unit. For other private streams, used as a bool for some reason + alignas(0x10) std::array au_specific_info_buf{}; // For LPCM streams, stores the first 0x10 bytes of the current PES packet data, contains info like the number of channels + + // The access unit that is currently being cut out + struct access_unit + { + ENABLE_BITWISE_SERIALIZATION + + enum class state : u8 + { + none, // An access unit is not currently being cut out + incomplete, // An access unit is currently being cut out + commenced, // The current PES packet contains the beginning of an access unit + complete, // The current PES packet contains the end of an access unit + size_mismatch, // The distance between sync words and size indicated in the access unit's info header does not match + m2v_sequence, // Special case for M2V, access unit commenced, but the next start code does not complete the access unit + } + state = state::none; + + bool rap = false; + bool timestamps_rap_set = false; + + // Since the delimiters of compressed audio streams are allowed to appear anywhere in the stream (instead of just the beginning of an access unit), we need to parse the size of the access unit from the stream + u8 size_info_offset = 0; + u16 parsed_size = 0; + + u32 accumulated_size = 0; // Incremented after every access unit chunk cut out from the stream + + u64 pts = umax; + u64 dts = umax; + + alignas(0x10) std::array au_specific_info_buf{}; + } + current_au; + + access_unit_chunk au_chunk; // A partial access unit that will be written to the access unit queue. Set by the stream parsers + std::vector cache; // The last three bytes of the current PES packet need to be saved, since they could contain part of an access unit delimiter + + // Returns the stream header size of audio streams. The only difference between LPCM and compressed streams is the extra_header_size_unk_mask + template + u32 parse_audio_stream_header(std::span pes_packet_data); + + private: + dmux_pamf_base& ctx; // For access to event handlers + + enum class state : u8 + { + initial, + pushing_au_queue, + notifying_au_found, + preparing_for_next_au + } + state = state::initial; + + // Size of the "CellDmuxPamfAuSpecificInfo..." struct for the type of this stream ("reserved" fields are not counted, so for all stream types other than LPCM this will be 0) + // This does NOT correspond to the amount of data in au_specific_info_buf, the info in the buffer gets unpacked by the PPU thread + const u8 au_specific_info_size; + + const u32 user_data; + + // Data section of the current PES packet. Needs to be remembered separately from the span we're working with below + std::optional> pes_packet_data; + + std::span stream_chunk; // The current section of the PES packet data to be processed + u64 pts = umax; // Presentation time stamp of the current PES packet + u64 dts = umax; // Decoding time stamp of the current PES packet + bool rap = false; // Random access point indicator + + output_queue au_queue; + + // Extracts access units from the stream by searching for the access unit delimiter and setting au_chunk accordingly. Returns the number of bytes that were parsed + virtual u32 parse_stream(std::span stream) = 0; + + void reset() + { + state = state::initial; + pes_packet_data.reset(); + au_size_unk = 0; + pts = + dts = umax; + rap = false; + au_chunk.data = {}; + au_chunk.cached_data.clear(); + current_au = {}; + } + + void set_au_timestamps_rap() + { + current_au.pts = pts; + current_au.dts = dts; + current_au.rap = rap; + pts = + dts = umax; + rap = false; + current_au.timestamps_rap_set = true; + } + }; + + template + class video_stream final : public elementary_stream + { + public: + video_stream(u8 channel, u32 au_max_size, dmux_pamf_base& ctx, u32 user_data, std::span au_queue_buffer) : elementary_stream(channel, au_max_size, ctx, user_data, 0, au_queue_buffer) {} + video_stream(utils::serial& ar, u8 channel, dmux_pamf_base& ctx) : elementary_stream(ar, channel, ctx, 0) {} + + private: + u32 parse_stream(std::span stream) override; + u32 parse_stream_header([[maybe_unused]] std::span pes_packet_data, [[maybe_unused]] s8 pts_dts_flag) override { return 0; } + [[nodiscard]] std::pair get_stream_id() const override { return { 0xe0 | channel, 0 }; } + }; + + class lpcm_stream final : public elementary_stream + { + public: + lpcm_stream(u8 channel, u32 au_max_size, dmux_pamf_base& ctx, u32 user_data, std::span au_queue_buffer) : elementary_stream(channel, au_max_size, ctx, user_data, 3, au_queue_buffer) {} + lpcm_stream(utils::serial& ar, u8 channel, dmux_pamf_base& ctx) : elementary_stream(ar, channel, ctx, 3) {} + + private: + u32 parse_stream(std::span stream) override; + u32 parse_stream_header(std::span pes_packet_data, [[maybe_unused]] s8 pts_dts_flag) override; + [[nodiscard]] std::pair get_stream_id() const override { return { 0xbd, 0x40 | channel }; } + }; + + template + class audio_stream final : public elementary_stream + { + public: + audio_stream(u8 channel, u32 au_max_size, dmux_pamf_base& ctx, u32 user_data, std::span au_queue_buffer) : elementary_stream(channel, au_max_size, ctx, user_data, 0, au_queue_buffer) {} + audio_stream(utils::serial& ar, u8 channel, dmux_pamf_base& ctx) : elementary_stream(ar, channel, ctx, 0) {} + + private: + static constexpr be_t SYNC_WORD = ac3 ? 0x0b77 : 0x0fd0; + static constexpr u8 ATRACX_ATS_HEADER_SIZE = 8; + static constexpr u16 AC3_FRMSIZE_TABLE[3][38] = + { + { 0x40, 0x40, 0x50, 0x50, 0x60, 0x60, 0x70, 0x70, 0x80, 0x80, 0xa0, 0xa0, 0xc0, 0xc0, 0xe0, 0xe0, 0x100, 0x100, 0x140, 0x140, 0x180, 0x180, 0x1c0, 0x1c0, 0x200, 0x200, 0x280, 0x280, 0x300, 0x300, 0x380, 0x380, 0x400, 0x400, 0x480, 0x480, 0x500, 0x500 }, + { 0x45, 0x46, 0x57, 0x58, 0x68, 0x69, 0x79, 0x7a, 0x8b, 0x8c, 0xae, 0xaf, 0xd0, 0xd1, 0xf3, 0xf4, 0x116, 0x117, 0x15c, 0x15d, 0x1a1, 0x1a2, 0x1e7, 0x1e8, 0x22d, 0x22e, 0x2b8, 0x2b9, 0x343, 0x344, 0x3cf, 0x3d0, 0x45a, 0x45b, 0x4e5, 0x4e6, 0x571, 0x572 }, + { 0x60, 0x60, 0x78, 0x78, 0x90, 0x90, 0xa8, 0xa8, 0xc0, 0xc0, 0xf0, 0xf0, 0x120, 0x120, 0x150, 0x150, 0x180, 0x180, 0x1e0, 0x1e0, 0x240, 0x240, 0x2a0, 0x2a0, 0x300, 0x300, 0x3c0, 0x3c0, 0x480, 0x480, 0x540, 0x540, 0x600, 0x600, 0x6c0, 0x6c0, 0x780, 0x780 } + }; + + u32 parse_stream(std::span stream) override; + u32 parse_stream_header(std::span pes_packet_data, [[maybe_unused]] s8 pts_dts_flag) override { return parse_audio_stream_header<0xffff>(pes_packet_data); } + [[nodiscard]] std::pair get_stream_id() const override { return { 0xbd, (ac3 ? 0x30 : 0x00) | channel }; } + }; + + class user_data_stream final : public elementary_stream + { + public: + user_data_stream(u8 channel, u32 au_max_size, dmux_pamf_base& ctx, u32 user_data, std::span au_queue_buffer) : elementary_stream(channel, au_max_size, ctx, user_data, 0, au_queue_buffer) {} + user_data_stream(utils::serial& ar, u8 channel, dmux_pamf_base& ctx) : elementary_stream(ar, channel, ctx, 0) {} + + private: + u32 parse_stream(std::span stream) override; + u32 parse_stream_header(std::span pes_packet_data, s8 pts_dts_flag) override; + [[nodiscard]] std::pair get_stream_id() const override { return { 0xbd, 0x20 | channel }; } + }; + + + enum class state : u8 + { + initial, + elementary_stream, + prog_end + } + state = state::initial; + + bool demux_done_notified = true; // User was successfully notified that the stream has been consumed + + u8 pack_es_type_idx = umax; // Elementary stream type in the current pack + u8 pack_es_channel = 0; // Elementary stream channel in the current pack + + bool raw_es = false; // Indicates that the input stream is a raw elementary stream instead of a multiplexed MPEG program stream. If set to true, MPEG-PS related parsing will be skipped + + std::optional> stream; // The stream to be demultiplexed, provided by the user + + std::unique_ptr elementary_streams[5][0x10]; // One for each possible type and channel +}; + +// Implementation of the SPU thread +class dmux_pamf_spu_context : dmux_pamf_base +{ +public: + static constexpr u32 id_base = 0; + static constexpr u32 id_step = 1; + static constexpr u32 id_count = 0x400; + SAVESTATE_INIT_POS(std::numeric_limits::max()); // Doesn't depend on or is a dependency of anything + + dmux_pamf_spu_context(vm::ptr> cmd_queue, vm::ptr, 1>> cmd_result_queue, + vm::ptr> stream_info_queue, vm::ptr> event_queue) + : cmd_queue(cmd_queue), cmd_result_queue(cmd_result_queue), stream_info_queue(stream_info_queue), event_queue(event_queue) + { + } + + explicit dmux_pamf_spu_context(utils::serial& ar) + : cmd_queue(ar.pop>>()) + , cmd_result_queue(vm::ptr, 1>>::make(cmd_queue.addr() + sizeof(dmux_pamf_hle_spurs_queue))) + , stream_info_queue(vm::ptr>::make(cmd_result_queue.addr() + sizeof(dmux_pamf_hle_spurs_queue, 1>))) + , event_queue(vm::ptr>::make(stream_info_queue.addr() + sizeof(dmux_pamf_hle_spurs_queue))) + , new_stream(ar.pop()) + { + save_base(ar); + max_enqueued_events += 2 * get_enabled_es_count(); + } + + void save(utils::serial& ar); + + void operator()(); // cellSpursMain() + static constexpr auto thread_name = "HLE PAMF demuxer SPU thread"sv; + +private: + // These are globals in the SPU thread + const vm::ptr> cmd_queue; + const vm::ptr, 1>> cmd_result_queue; + const vm::ptr> stream_info_queue; + const vm::ptr> event_queue; + bool wait_for_au_queue = false; + bool wait_for_event_queue = false; + bool event_queue_was_too_full = false; // Sent to the PPU thread + u8 max_enqueued_events = 4; // 4 + 2 * number of enabled elementary streams + + // This is a local variable in cellSpursMain(), needs to be saved for savestates + bool new_stream = false; + + bool get_next_cmd(DmuxPamfCommand& lhs, bool new_stream) const; + bool send_event(auto&&... args) const; + + // The events are sent to the PPU thread via the event_queue + bool on_au_found(u8 stream_id, u8 private_stream_id, u32 user_data, std::span au, u64 pts, u64 dts, bool rap, u8 au_specific_info_size, std::array au_specific_info_buf) override + { + return !((wait_for_event_queue = !send_event(DmuxPamfEventType::au_found, stream_id, private_stream_id, vm::get_addr(au.data()), std::bit_cast(static_cast>(pts)), + std::bit_cast(static_cast>(dts)), 0, static_cast(au.size()), au_specific_info_size, au_specific_info_buf, user_data, rap))); + } + bool on_demux_done() override { return !((wait_for_event_queue = !send_event(DmuxPamfEventType::demux_done))); } + void on_fatal_error() override { send_event(DmuxPamfEventType::fatal_error); } + bool on_flush_done(u8 stream_id, u8 private_stream_id, u32 user_data) override { return send_event(DmuxPamfEventType::flush_done, stream_id, private_stream_id, user_data); } // The "flush done" event does not set wait_for_event_queue if the queue is full + bool on_prog_end() override { return !((wait_for_event_queue = !send_event(DmuxPamfEventType::prog_end_code))); } + void on_au_queue_full() override { wait_for_au_queue = true; } +}; + +using dmux_pamf_spu_thread = named_thread; + + +// PPU thread + struct CellDmuxPamfAttr { be_t maxEnabledEsNum; diff --git a/rpcs3/Emu/Cell/Modules/cellPamf.h b/rpcs3/Emu/Cell/Modules/cellPamf.h index e42acf60f4..abd89f8852 100644 --- a/rpcs3/Emu/Cell/Modules/cellPamf.h +++ b/rpcs3/Emu/Cell/Modules/cellPamf.h @@ -1,5 +1,6 @@ #pragma once +#include "Emu/Cell/ErrorCodes.h" #include "Emu/Memory/vm_ptr.h" // Error Codes diff --git a/rpcs3/Emu/savestate_utils.cpp b/rpcs3/Emu/savestate_utils.cpp index 31b24eba5f..7eca180668 100644 --- a/rpcs3/Emu/savestate_utils.cpp +++ b/rpcs3/Emu/savestate_utils.cpp @@ -23,7 +23,7 @@ struct serial_ver_t std::set compatible_versions; }; -static std::array s_serial_versions; +static std::array s_serial_versions; #define SERIALIZATION_VER(name, identifier, ...) \ \ @@ -85,6 +85,7 @@ SERIALIZATION_VER(LLE, 24, 1) SERIALIZATION_VER(HLE, 25, 1) SERIALIZATION_VER(cellSysutil, 26, 1, 2/*AVC2 Muting,Volume*/) +SERIALIZATION_VER(cellDmuxPamf, 27, 1) template <> void fmt_class_string>::format(std::string& out, u64 arg)