SPU: Reimplement reservation notifications

This commit is contained in:
Elad 2025-08-27 02:52:27 +03:00 committed by Elad
parent 34a9d03d3d
commit cfe1eca185
9 changed files with 269 additions and 236 deletions

View File

@ -1158,7 +1158,7 @@ cpu_thread& cpu_thread::operator=(thread_state)
{
if (u32 resv = atomic_storage<u32>::load(thread->raddr))
{
vm::reservation_notifier_notify(resv);
vm::reservation_notifier_notify(resv, thread->rtime);
}
}
}

View File

@ -2327,9 +2327,9 @@ void ppu_thread::cpu_wait(bs_t<cpu_flag> old)
{
res_notify = 0;
if (res_notify_time == vm::reservation_notifier_count_index(addr).second)
if (res_notify_time + 128 == (vm::reservation_acquire(addr) & -128))
{
vm::reservation_notifier_notify(addr);
vm::reservation_notifier_notify(addr, res_notify_time);
}
}
@ -3117,7 +3117,6 @@ static T ppu_load_acquire_reservation(ppu_thread& ppu, u32 addr)
{
// Reload "cached" reservation of previous succeeded conditional store
// This seems like a hardware feature according to cellSpursAddUrgentCommand function
ppu.rtime -= 128;
}
else
{
@ -3614,67 +3613,68 @@ static bool ppu_store_reservation(ppu_thread& ppu, u32 addr, u64 reg_value)
{
extern atomic_t<u32> liblv2_begin, liblv2_end;
// Avoid notifications from lwmutex or sys_spinlock
if (new_data != old_data && (ppu.cia < liblv2_begin || ppu.cia >= liblv2_end))
{
u32 notify = ppu.res_notify;
const u32 notify = ppu.res_notify;
if (notify)
if (notify)
{
if (auto waiter = vm::reservation_notifier_notify(notify, ppu.res_notify_time, true))
{
if (ppu.res_notify_time == vm::reservation_notifier_count_index(notify).second)
ppu.state += cpu_flag::wait;
waiter->notify_all();
}
ppu.res_notify = 0;
}
// Avoid notifications from lwmutex or sys_spinlock
const bool is_liblv2_or_null = (ppu.cia >= liblv2_begin && ppu.cia < liblv2_end);
if (!is_liblv2_or_null && (addr ^ notify) & -128)
{
// Try to postpone notification to when PPU is asleep or join notifications on the same address
// This also optimizes a mutex - won't notify after lock is aqcuired (prolonging the critical section duration), only notifies on unlock
const u32 count = vm::reservation_notifier_count(addr & -128, rtime);
if (cpu_flag::wait - ppu.state)
{
ppu.state += cpu_flag::wait;
vm::reservation_notifier_notify(notify);
}
else
{
notify = 0;
}
ppu.res_notify = 0;
}
if ((addr ^ notify) & -128)
vm::reservation_notifier_notify(addr & -128, rtime);
switch (count)
{
// Try to postpone notification to when PPU is asleep or join notifications on the same address
// This also optimizes a mutex - won't notify after lock is aqcuired (prolonging the critical section duration), only notifies on unlock
const auto [count, index] = vm::reservation_notifier_count_index(addr);
switch (count)
{
case 0:
{
// Nothing to do
break;
}
case 1:
{
if (!notify)
{
ppu.res_notify = addr;
ppu.res_notify_time = index;
break;
}
// Notify both
[[fallthrough]];
}
default:
{
if (!notify)
{
ppu.state += cpu_flag::wait;
}
vm::reservation_notifier_notify(addr);
break;
}
}
case 0:
{
// Nothing to do
break;
}
case 1:
{
if (!notify)
{
ppu.res_notify = addr & -128;
ppu.res_notify_time = rtime;
break;
}
static_cast<void>(ppu.test_stopped());
// Notify both
[[fallthrough]];
}
default:
{
if (cpu_flag::wait - ppu.state)
{
ppu.state += cpu_flag::wait;
}
vm::reservation_notifier_notify(addr & -128, rtime);
break;
}
}
}
static_cast<void>(ppu.test_stopped());
if (addr == ppu.last_faddr)
{
ppu.last_succ++;
@ -3682,7 +3682,6 @@ static bool ppu_store_reservation(ppu_thread& ppu, u32 addr, u64 reg_value)
ppu.last_faddr = 0;
ppu.res_cached = ppu.raddr;
ppu.rtime += 128;
ppu.raddr = 0;
return true;
}
@ -3693,10 +3692,10 @@ static bool ppu_store_reservation(ppu_thread& ppu, u32 addr, u64 reg_value)
// And on failure it has some time to do something else
if (notify && ((addr ^ notify) & -128))
{
if (ppu.res_notify_time == vm::reservation_notifier_count_index(notify).second)
if (auto waiter = vm::reservation_notifier_notify(notify, ppu.res_notify_time, true))
{
ppu.state += cpu_flag::wait;
vm::reservation_notifier_notify(notify);
waiter->notify_all();
static_cast<void>(ppu.test_stopped());
}

View File

@ -461,7 +461,7 @@ waitpkg_func static void __tpause(u32 cycles, u32 cstate)
namespace vm
{
std::array<atomic_t<reservation_waiter_t>, 1024> g_resrv_waiters_count{};
std::array<atomic_t<reservation_waiter_t>, 2048> g_resrv_waiters_count{};
}
void do_cell_atomic_128_store(u32 addr, const void* to_write);
@ -3979,7 +3979,7 @@ bool spu_thread::do_putllc(const spu_mfc_cmd& args)
{
if (raddr != spurs_addr || pc != 0x11e4)
{
vm::reservation_notifier_notify(addr);
vm::reservation_notifier_notify(addr, rtime);
}
else
{
@ -3990,7 +3990,7 @@ bool spu_thread::do_putllc(const spu_mfc_cmd& args)
if (switched_from_running_to_idle)
{
vm::reservation_notifier_notify(addr);
vm::reservation_notifier_notify(addr, rtime);
}
}
@ -4199,7 +4199,10 @@ void spu_thread::do_putlluc(const spu_mfc_cmd& args)
}
do_cell_atomic_128_store(addr, _ptr<spu_rdata_t>(args.lsa & 0x3ff80));
vm::reservation_notifier_notify(addr);
// TODO: Implement properly (notifying previous two times)
vm::reservation_notifier_notify(addr, vm::reservation_acquire(addr) - 128);
vm::reservation_notifier_notify(addr, vm::reservation_acquire(addr) - 256);
}
bool spu_thread::do_mfc(bool can_escape, bool must_finish)
@ -4914,10 +4917,10 @@ bool spu_thread::process_mfc_cmd()
usz cache_line_waiter_index = umax;
if (auto wait_var = vm::reservation_notifier_begin_wait(addr, rtime))
if (auto [wait_var, flag_val] = vm::reservation_notifier_begin_wait(addr, rtime); wait_var)
{
cache_line_waiter_index = register_cache_line_waiter(addr);
utils::bless<atomic_t<u32>>(&wait_var->raw().wait_flag)->wait(1, atomic_wait_timeout{100'000});
utils::bless<atomic_t<u32>>(&wait_var->raw().wait_flag)->wait(flag_val, atomic_wait_timeout{100'000});
vm::reservation_notifier_end_wait(*wait_var);
}
@ -4964,9 +4967,9 @@ bool spu_thread::process_mfc_cmd()
g_unchanged++;
// Notify threads manually, memory data has likely changed and broke the reservation for others
if (vm::reservation_notifier_count(addr) && res == new_time)
if (vm::reservation_notifier_count(addr, new_time) && res == new_time)
{
vm::reservation_notifier_notify(addr);
vm::reservation_notifier_notify(addr, new_time);
}
}
else
@ -4984,9 +4987,9 @@ bool spu_thread::process_mfc_cmd()
if (this_time == rtime)
{
// Notify threads manually, memory data has likely changed and broke the reservation for others
if (vm::reservation_notifier_count(addr) && res == this_time)
if (vm::reservation_notifier_count(addr, this_time) && res == this_time)
{
vm::reservation_notifier_notify(addr);
vm::reservation_notifier_notify(addr, this_time);
}
}
@ -5645,28 +5648,46 @@ usz spu_thread::register_cache_line_waiter(u32 addr)
{
const u64 value = u64{compute_rdata_hash32(rdata)} << 32 | addr;
for (usz i = 0; i < std::size(g_spu_waiters_by_value); i++)
for (usz attempts = 0; attempts < 2; attempts++)
{
auto [old, ok] = g_spu_waiters_by_value[i].fetch_op([value](u64& x)
// First, scan for a matching address waiter
// Remembering a potentially empty spot
usz empty_it = umax;
for (usz i = 0; i < std::size(g_spu_waiters_by_value); i++)
{
if (x == 0)
auto [old, ok] = g_spu_waiters_by_value[i].fetch_op([&](u64& x)
{
x = value + 1;
return true;
}
if (x == 0)
{
empty_it = i;
return false;
}
if ((x & -128) == value)
if ((x & -128) == value)
{
x++;
return true;
}
return false;
});
if (ok)
{
x++;
return true;
return i;
}
}
return false;
});
if (ok)
if (empty_it == umax)
{
return i;
continue;
}
// If we did not find existing an waiter, try to occupy an empty spot
if (g_spu_waiters_by_value[empty_it].compare_and_swap_test(0, value + 1))
{
return empty_it;
}
}
@ -5681,7 +5702,7 @@ void spu_thread::deregister_cache_line_waiter(usz index)
return;
}
g_spu_waiters_by_value[index].fetch_op([](u64& x)
g_spu_waiters_by_value[index].atomic_op([](u64& x)
{
x--;
@ -5689,8 +5710,6 @@ void spu_thread::deregister_cache_line_waiter(usz index)
{
x = 0;
}
return false;
});
}
@ -6151,9 +6170,9 @@ s64 spu_thread::get_ch_value(u32 ch)
else if (!cmp_rdata(rdata, *resrv_mem))
{
// Notify threads manually, memory data has likely changed and broke the reservation for others
if (vm::reservation_notifier_count(raddr) && vm::reservation_acquire(raddr) == rtime)
if (vm::reservation_notifier_count(raddr, rtime) && vm::reservation_acquire(raddr) == rtime)
{
vm::reservation_notifier_notify(raddr);
vm::reservation_notifier_notify(raddr, rtime);
}
set_lr = true;
@ -6268,11 +6287,16 @@ s64 spu_thread::get_ch_value(u32 ch)
{
// Wait with extended timeout, in this situation we have notifications for nearly all writes making it possible
// Abort notifications are handled specially for performance reasons
if (auto wait_var = vm::reservation_notifier_begin_wait(raddr, rtime))
if (auto [wait_var, flag_val] = vm::reservation_notifier_begin_wait(raddr, rtime); wait_var)
{
if (check_cache_line_waiter())
if (!cmp_rdata(rdata, *resrv_mem))
{
utils::bless<atomic_t<u32>>(&wait_var->raw().wait_flag)->wait(1, atomic_wait_timeout{300'000});
raddr = 0;
set_events(SPU_EVENT_LR);
}
else if (check_cache_line_waiter())
{
utils::bless<atomic_t<u32>>(&wait_var->raw().wait_flag)->wait(flag_val, atomic_wait_timeout{200'000});
}
vm::reservation_notifier_end_wait(*wait_var);
@ -6283,12 +6307,16 @@ s64 spu_thread::get_ch_value(u32 ch)
const u32 _raddr = this->raddr;
#ifdef __linux__
if (auto wait_var = vm::reservation_notifier_begin_wait(_raddr, rtime))
if (auto [wait_var, flag_val] = vm::reservation_notifier_begin_wait(_raddr, rtime); wait_var)
{
if (check_cache_line_waiter())
if (!cmp_rdata(rdata, *resrv_mem))
{
utils::bless<atomic_t<u32>>(&wait_var->raw().wait_flag)->wait(1, atomic_wait_timeout{50'000});
raddr = 0;
set_events(SPU_EVENT_LR);
}
else if (check_cache_line_waiter())
{
utils::bless<atomic_t<u32>>(&wait_var->raw().wait_flag)->wait(flag_val, atomic_wait_timeout{50'000});
}
vm::reservation_notifier_end_wait(*wait_var);
@ -6330,7 +6358,7 @@ s64 spu_thread::get_ch_value(u32 ch)
else if (!cmp_rdata(_this->rdata, *_this->resrv_mem))
{
// Notify threads manually, memory data has likely changed and broke the reservation for others
if (vm::reservation_notifier_count(raddr) >= 2 && vm::reservation_acquire(raddr) == _this->rtime)
if (vm::reservation_notifier_count(raddr, _this->rtime) >= 2 && vm::reservation_acquire(raddr) == _this->rtime)
{
s_tls_try_notify = true;
}
@ -6351,20 +6379,25 @@ s64 spu_thread::get_ch_value(u32 ch)
return true;
};
if (auto wait_var = vm::reservation_notifier_begin_wait(_raddr, rtime))
if (auto [wait_var, flag_val] = vm::reservation_notifier_begin_wait(_raddr, rtime); wait_var)
{
if (check_cache_line_waiter())
if (!cmp_rdata(rdata, *resrv_mem))
{
raddr = 0;
set_events(SPU_EVENT_LR);
}
else if (check_cache_line_waiter())
{
atomic_wait_engine::set_one_time_use_wait_callback(wait_cb);
utils::bless<atomic_t<u32>>(&wait_var->raw().wait_flag)->wait(1, atomic_wait_timeout{80'000});
utils::bless<atomic_t<u32>>(&wait_var->raw().wait_flag)->wait(flag_val, atomic_wait_timeout{100'000});
}
vm::reservation_notifier_end_wait(*wait_var);
}
if (s_tls_try_notify && vm::reservation_notifier_count(_raddr) && vm::reservation_acquire(_raddr) == rtime)
if (s_tls_try_notify && vm::reservation_notifier_count(_raddr, rtime) && vm::reservation_acquire(_raddr) == rtime)
{
vm::reservation_notifier_notify(_raddr);
vm::reservation_notifier_notify(_raddr, rtime);
}
#endif
}
@ -7273,6 +7306,7 @@ bool spu_thread::stop_and_signal(u32 code)
}
u32 prev_resv = 0;
u64 prev_rtime = 0;
for (auto& thread : group->threads)
{
@ -7285,13 +7319,14 @@ bool spu_thread::stop_and_signal(u32 code)
if (u32 resv = atomic_storage<u32>::load(thread->raddr))
{
if (prev_resv && prev_resv != resv)
if (prev_resv && (prev_resv != resv || thread->rtime != prev_rtime))
{
// Batch reservation notifications if possible
vm::reservation_notifier_notify(prev_resv);
vm::reservation_notifier_notify(prev_resv, thread->rtime);
}
prev_resv = resv;
prev_rtime = thread->rtime;
}
}
}
@ -7299,7 +7334,7 @@ bool spu_thread::stop_and_signal(u32 code)
if (prev_resv)
{
vm::reservation_notifier_notify(prev_resv);
vm::reservation_notifier_notify(prev_resv, prev_rtime);
}
check_state();

View File

@ -1340,23 +1340,20 @@ bool lv2_obj::sleep(cpu_thread& cpu, const u64 timeout)
{
static_cast<ppu_thread&>(cpu).res_notify = 0;
if (static_cast<ppu_thread&>(cpu).res_notify_time != vm::reservation_notifier_count_index(addr).second)
if (auto it = std::find(g_to_notify, std::end(g_to_notify), std::add_pointer_t<const void>{}); it != std::end(g_to_notify))
{
// Ignore outdated notification request
}
else if (auto it = std::find(g_to_notify, std::end(g_to_notify), std::add_pointer_t<const void>{}); it != std::end(g_to_notify))
{
*it++ = vm::reservation_notifier_notify(addr, true);
if (it < std::end(g_to_notify))
if ((*it++ = vm::reservation_notifier_notify(addr, static_cast<ppu_thread&>(cpu).res_notify_time, true)))
{
// Null-terminate the list if it ends before last slot
*it = nullptr;
if (it < std::end(g_to_notify))
{
// Null-terminate the list if it ends before last slot
*it = nullptr;
}
}
}
else
{
vm::reservation_notifier_notify(addr);
vm::reservation_notifier_notify(addr, static_cast<ppu_thread&>(cpu).res_notify_time);
}
}
}
@ -1393,23 +1390,20 @@ bool lv2_obj::awake(cpu_thread* thread, s32 prio)
{
ppu->res_notify = 0;
if (ppu->res_notify_time != vm::reservation_notifier_count_index(addr).second)
if (auto it = std::find(g_to_notify, std::end(g_to_notify), std::add_pointer_t<const void>{}); it != std::end(g_to_notify))
{
// Ignore outdated notification request
}
else if (auto it = std::find(g_to_notify, std::end(g_to_notify), std::add_pointer_t<const void>{}); it != std::end(g_to_notify))
{
*it++ = vm::reservation_notifier_notify(addr, true);
if (it < std::end(g_to_notify))
if ((*it++ = vm::reservation_notifier_notify(addr, ppu->res_notify_time, true)))
{
// Null-terminate the list if it ends before last slot
*it = nullptr;
if (it < std::end(g_to_notify))
{
// Null-terminate the list if it ends before last slot
*it = nullptr;
}
}
}
else
{
vm::reservation_notifier_notify(addr);
vm::reservation_notifier_notify(addr, ppu->res_notify_time);
}
}
}
@ -2232,18 +2226,8 @@ void lv2_obj::notify_all() noexcept
if (cpu != &g_to_notify)
{
const auto res_start = vm::reservation_notifier(0).second;
const auto res_end = vm::reservation_notifier(umax).second;
if (cpu >= res_start && cpu <= res_end)
{
atomic_wait_engine::notify_all(cpu);
}
else
{
// Note: by the time of notification the thread could have been deallocated which is why the direct function is used
atomic_wait_engine::notify_one(cpu);
}
// Note: by the time of notification the thread could have been deallocated which is why the direct function is used
atomic_wait_engine::notify_all(cpu);
}
}
@ -2267,19 +2251,28 @@ void lv2_obj::notify_all() noexcept
constexpr usz total_waiters = std::size(spu_thread::g_spu_waiters_by_value);
u32 notifies[total_waiters]{};
u64 notifies_time[total_waiters]{};
// There may be 6 waiters, but checking them all may be performance expensive
// Instead, check 2 at max, but use the CPU ID index to tell which index to start checking so the work would be distributed across all threads
atomic_t<u64, 64>* range_lock = nullptr;
for (usz i = 0, checked = 0; checked < 3 && i < total_waiters; i++)
u32 current_raddr = 0;
if (cpu->get_class() == thread_class::spu)
{
range_lock = static_cast<spu_thread*>(cpu)->range_lock;
current_raddr = static_cast<spu_thread*>(cpu)->raddr;
}
for (usz i = 0, checked = 0; checked < 4 && i < total_waiters; i++)
{
auto& waiter = spu_thread::g_spu_waiters_by_value[(i + cpu->id) % total_waiters];
const u64 value = waiter.load();
u32 raddr = static_cast<u32>(value) & -128;
if (vm::check_addr(raddr))
if (vm::check_addr(raddr) && (raddr != current_raddr || (value % 128) > 1))
{
if (((raddr >> 28) < 2 || (raddr >> 28) == 0xd))
{
@ -2300,6 +2293,7 @@ void lv2_obj::notify_all() noexcept
}).second)
{
notifies[i] = raddr;
notifies_time[i] = vm::reservation_acquire(raddr);
}
}
@ -2328,21 +2322,22 @@ void lv2_obj::notify_all() noexcept
}).second)
{
notifies[i] = raddr;
notifies_time[i] = vm::reservation_acquire(raddr);
}
}
}
}
if (range_lock)
if (range_lock && cpu->get_class() != thread_class::spu)
{
vm::free_range_lock(range_lock);
}
for (u32 addr : notifies)
for (u32 i = 0; i < total_waiters; i++)
{
if (addr)
if (notifies[i])
{
vm::reservation_notifier_notify(addr);
vm::reservation_notifier_notify(notifies[i], notifies_time[i]);
}
}
}

View File

@ -4,6 +4,7 @@
#include "Emu/Cell/ErrorCodes.h"
#include "Emu/Cell/PPUThread.h"
#include "Emu/Memory/vm_reservation.h"
#include "util/asm.hpp"
@ -345,6 +346,9 @@ error_code sys_mutex_unlock(ppu_thread& ppu, u32 mutex_id)
const auto mutex = idm::check<lv2_obj, lv2_mutex>(mutex_id, [&, notify = lv2_obj::notify_all_t()](lv2_mutex& mutex) -> CellError
{
// At unlock, we have some time to do other jobs when the thread is unlikely to be in other critical sections
notify.enqueue_on_top(vm::reservation_notifier_notify(ppu.res_notify, ppu.res_notify_time));
auto result = mutex.try_unlock(ppu);
if (result == CELL_EBUSY)

View File

@ -1501,6 +1501,7 @@ error_code sys_spu_thread_group_terminate(ppu_thread& ppu, u32 id, s32 value)
}
u32 prev_resv = 0;
u64 prev_time = 0;
for (auto& thread : group->threads)
{
@ -1510,20 +1511,21 @@ error_code sys_spu_thread_group_terminate(ppu_thread& ppu, u32 id, s32 value)
if (u32 resv = atomic_storage<u32>::load(thread->raddr))
{
if (prev_resv && prev_resv != resv)
if (prev_resv && (prev_resv != resv || prev_time != thread->rtime))
{
// Batch reservation notifications if possible
vm::reservation_notifier_notify(prev_resv);
vm::reservation_notifier_notify(prev_resv, prev_time);
}
prev_resv = resv;
prev_time = thread->rtime;
}
}
}
if (prev_resv)
{
vm::reservation_notifier_notify(prev_resv);
vm::reservation_notifier_notify(prev_resv, prev_time);
}
group->exit_status = value;

View File

@ -483,6 +483,13 @@ public:
}
}
static void enqueue_on_top(const void* waiter)
{
return;
g_to_notify[0] = waiter;
g_to_notify[1] = nullptr;
}
~notify_all_t() noexcept
{
lv2_obj::notify_all();

View File

@ -112,12 +112,9 @@ namespace vm
{
const auto [ok, rtime] = try_reservation_update(addr);
if (ok || (old & -128) < (rtime & -128))
if (ok)
{
if (ok)
{
reservation_notifier_notify(addr);
}
reservation_notifier_notify(addr, rtime);
if (cpu && !had_wait && cpu->test_stopped())
{
@ -126,7 +123,7 @@ namespace vm
return;
}
old = rtime;
}
}
@ -599,6 +596,38 @@ namespace vm
}
}
atomic_t<u32>* reservation_notifier_notify(u32 raddr, u64 rtime, bool postpone)
{
const auto waiter = reservation_notifier(raddr, rtime);
if (waiter->load().wait_flag % 2 == 1)
{
if (!waiter->fetch_op([](reservation_waiter_t& value)
{
if (value.wait_flag % 2 == 1)
{
// Notify and make it even
value.wait_flag++;
return true;
}
return false;
}).second)
{
return nullptr;
}
if (postpone)
{
return utils::bless<atomic_t<u32>>(&waiter->raw().wait_flag);
}
utils::bless<atomic_t<u32>>(&waiter->raw().wait_flag)->notify_all();
}
return nullptr;
}
u64 reservation_lock_internal(u32 addr, atomic_t<u64>& res)
{
for (u64 i = 0;; i++)

View File

@ -40,111 +40,71 @@ namespace vm
struct reservation_waiter_t
{
u32 wait_flag = 0;
u8 waiters_count = 0;
u8 waiters_index = 0;
u32 waiters_count = 0;
};
static inline std::pair<atomic_t<reservation_waiter_t>*, atomic_t<reservation_waiter_t>*> reservation_notifier(u32 raddr)
static inline atomic_t<reservation_waiter_t>* reservation_notifier(u32 raddr, u64 rtime)
{
extern std::array<atomic_t<reservation_waiter_t>, 1024> g_resrv_waiters_count;
rtime = 0;
constexpr u32 wait_vars_for_each = 64;
constexpr u32 unique_address_bit_mask = 0b111;
constexpr u32 unique_rtime_bit_mask = 0b11;
extern std::array<atomic_t<reservation_waiter_t>, wait_vars_for_each * (unique_address_bit_mask + 1) * (unique_rtime_bit_mask + 1)> g_resrv_waiters_count;
// Storage efficient method to distinguish different nearby addresses (which are likely)
constexpr u32 wait_vars_for_each = 8;
constexpr u32 unique_address_bit_mask = 0b11;
const usz index = std::popcount(raddr & -1024) + ((raddr / 128) & unique_address_bit_mask) * 32;
auto& waiter = g_resrv_waiters_count[index * wait_vars_for_each];
return { &g_resrv_waiters_count[index * wait_vars_for_each + waiter.load().waiters_index % wait_vars_for_each], &waiter };
}
// Returns waiter count and index
static inline std::pair<u32, u32> reservation_notifier_count_index(u32 raddr)
{
const auto notifiers = reservation_notifier(raddr);
return { notifiers.first->load().waiters_count, static_cast<u32>(notifiers.first - notifiers.second) };
const usz index = std::popcount(raddr & -1024) * (1 << 5) + ((rtime / 128) & unique_rtime_bit_mask) * (1 << 3) + ((raddr / 128) & unique_address_bit_mask);
return &g_resrv_waiters_count[index];
}
// Returns waiter count
static inline u32 reservation_notifier_count(u32 raddr)
static inline u32 reservation_notifier_count(u32 raddr, u64 rtime)
{
return reservation_notifier(raddr).first->load().waiters_count;
return reservation_notifier(raddr, rtime)->load().waiters_count;
}
static inline void reservation_notifier_end_wait(atomic_t<reservation_waiter_t>& waiter)
{
waiter.atomic_op([](reservation_waiter_t& value)
{
if (value.waiters_count-- == 1)
if (value.waiters_count == 1 && value.wait_flag % 2 == 1)
{
value.wait_flag = 0;
// Make wait_flag even (disabling notification on last waiter)
value.wait_flag++;
}
value.waiters_count--;
});
}
static inline atomic_t<reservation_waiter_t>* reservation_notifier_begin_wait(u32 raddr, u64 rtime)
static inline std::pair<atomic_t<reservation_waiter_t>*, u32> reservation_notifier_begin_wait(u32 raddr, u64 rtime)
{
const auto notifiers = reservation_notifier(raddr);
atomic_t<reservation_waiter_t>& waiter = *notifiers.first;
atomic_t<reservation_waiter_t>& waiter = *reservation_notifier(raddr, rtime);
waiter.atomic_op([](reservation_waiter_t& value)
u32 wait_flag = 0;
waiter.atomic_op([&](reservation_waiter_t& value)
{
value.wait_flag = 1;
if (value.wait_flag % 2 == 0)
{
// Make wait_flag odd (for notification deduplication detection)
value.wait_flag++;
}
wait_flag = value.wait_flag;
value.waiters_count++;
});
if ((reservation_acquire(raddr) & -128) != rtime)
{
reservation_notifier_end_wait(waiter);
return nullptr;
return {};
}
return &waiter;
return { &waiter, wait_flag };
}
static inline atomic_t<u32>* reservation_notifier_notify(u32 raddr, bool pospone = false)
{
const auto notifiers = reservation_notifier(raddr);
if (notifiers.first->load().wait_flag)
{
if (notifiers.first == notifiers.second)
{
if (!notifiers.first->fetch_op([](reservation_waiter_t& value)
{
if (value.waiters_index == 0)
{
value.wait_flag = 0;
value.waiters_count = 0;
value.waiters_index++;
return true;
}
return false;
}).second)
{
return nullptr;
}
}
else
{
u8 old_index = static_cast<u8>(notifiers.first - notifiers.second);
if (!atomic_storage<u8>::compare_exchange(notifiers.second->raw().waiters_index, old_index, (old_index + 1) % 4))
{
return nullptr;
}
notifiers.first->release(reservation_waiter_t{});
}
if (pospone)
{
return utils::bless<atomic_t<u32>>(&notifiers.first->raw().wait_flag);
}
utils::bless<atomic_t<u32>>(&notifiers.first->raw().wait_flag)->notify_all();
}
return nullptr;
}
atomic_t<u32>* reservation_notifier_notify(u32 raddr, u64 rtime, bool postpone = false);
u64 reservation_lock_internal(u32, atomic_t<u64>&);
@ -232,28 +192,28 @@ namespace vm
if constexpr (std::is_void_v<std::invoke_result_t<F, T&>>)
{
std::invoke(op, *sptr);
res += 128;
const u64 old_time = res.fetch_add(128);
#ifndef _MSC_VER
__asm__ volatile ("xend;" ::: "memory");
#else
_xend();
#endif
if constexpr (Ack)
res.notify_all();
reservation_notifier_notify(addr, old_time);
return;
}
else
{
if (auto result = std::invoke(op, *sptr))
{
res += 128;
const u64 old_time = res.fetch_add(128);
#ifndef _MSC_VER
__asm__ volatile ("xend;" ::: "memory");
#else
_xend();
#endif
if constexpr (Ack)
res.notify_all();
reservation_notifier_notify(addr, old_time);
return result;
}
else
@ -308,7 +268,7 @@ namespace vm
#endif
res += 127;
if (Ack)
res.notify_all();
reservation_notifier_notify(addr, _old);
return;
}
else
@ -322,7 +282,7 @@ namespace vm
#endif
res += 127;
if (Ack)
res.notify_all();
reservation_notifier_notify(addr, _old);
return result;
}
else
@ -357,7 +317,7 @@ namespace vm
});
if constexpr (Ack)
res.notify_all();
reservation_notifier_notify(addr, _old);
return;
}
else
@ -377,7 +337,7 @@ namespace vm
});
if (Ack && result)
res.notify_all();
reservation_notifier_notify(addr, _old);
return result;
}
}
@ -388,16 +348,18 @@ namespace vm
// Lock reservation and perform heavyweight lock
reservation_shared_lock_internal(res);
u64 old_time = umax;
if constexpr (std::is_void_v<std::invoke_result_t<F, T&>>)
{
{
vm::writer_lock lock(addr);
std::invoke(op, *sptr);
res += 127;
old_time = res.fetch_add(127);
}
if constexpr (Ack)
res.notify_all();
reservation_notifier_notify(addr, old_time);
return;
}
else
@ -408,16 +370,16 @@ namespace vm
if ((result = std::invoke(op, *sptr)))
{
res += 127;
old_time = res.fetch_add(127);
}
else
{
res -= 1;
old_time = res.fetch_sub(1);
}
}
if (Ack && result)
res.notify_all();
reservation_notifier_notify(addr, old_time);
return result;
}
}