mirror of
https://gitee.com/mirrors_PX4/PX4-Autopilot.git
synced 2026-04-14 10:07:39 +08:00
lockstep_scheduler: fix TSAN lock-order-inversion and test correctness
Split set_absolute_time() into two phases: iterate the waiter list under _timed_waits_mutex, then broadcast under a separate _broadcast_mutex. This eliminates the lock-order-inversion cycle between cond_timedwait() (holds passed_lock -> acquires _timed_waits_mutex) and set_absolute_time() (held _timed_waits_mutex -> acquired passed_lock). Fix tests to have each thread lock its own mutex before calling cond_timedwait, as required by POSIX (the calling thread must own the mutex passed to pthread_cond_wait). The previous cross-thread ownership caused TSAN's deadlock detector to overflow its 64-entry limit.
This commit is contained in:
parent
ecc789fc05
commit
64ddfebfc6
@ -86,7 +86,7 @@ private:
|
||||
pthread_cond_t *passed_cond{nullptr};
|
||||
pthread_mutex_t *passed_lock{nullptr};
|
||||
uint64_t time_us{0};
|
||||
bool timeout{false};
|
||||
std::atomic<bool> timeout{false};
|
||||
std::atomic<bool> done{false};
|
||||
std::atomic<bool> removed{true};
|
||||
|
||||
@ -98,6 +98,7 @@ private:
|
||||
std::atomic<uint64_t> _time_us{0};
|
||||
|
||||
TimedWait *_timed_waits{nullptr}; ///< head of linked list
|
||||
std::mutex _timed_waits_mutex;
|
||||
std::mutex _timed_waits_mutex; ///< protects _timed_waits linked list
|
||||
std::mutex _broadcast_mutex; ///< protects the broadcast phase in set_absolute_time()
|
||||
std::atomic<bool> _setting_time{false}; ///< true if set_absolute_time() is currently being executed
|
||||
};
|
||||
|
||||
@ -55,6 +55,18 @@ void LockstepScheduler::set_absolute_time(uint64_t time_us)
|
||||
|
||||
_time_us = time_us;
|
||||
|
||||
// Collect entries that need wakeup while holding _timed_waits_mutex,
|
||||
// then broadcast outside of it to avoid lock-order-inversion with
|
||||
// cond_timedwait() (which holds passed_lock while acquiring _timed_waits_mutex).
|
||||
struct WakeUp {
|
||||
pthread_cond_t *cond;
|
||||
pthread_mutex_t *lock;
|
||||
};
|
||||
|
||||
static constexpr int MAX_WAKEUPS = 32;
|
||||
WakeUp wakeups[MAX_WAKEUPS];
|
||||
int num_wakeups = 0;
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock_timed_waits(_timed_waits_mutex);
|
||||
_setting_time = true;
|
||||
@ -81,17 +93,29 @@ void LockstepScheduler::set_absolute_time(uint64_t time_us)
|
||||
|
||||
if (timed_wait->time_us <= time_us &&
|
||||
!timed_wait->timeout) {
|
||||
// We are abusing the condition here to signal that the time
|
||||
// has passed.
|
||||
pthread_mutex_lock(timed_wait->passed_lock);
|
||||
// Mark as timed out and collect for broadcast
|
||||
timed_wait->timeout = true;
|
||||
pthread_cond_broadcast(timed_wait->passed_cond);
|
||||
pthread_mutex_unlock(timed_wait->passed_lock);
|
||||
|
||||
if (num_wakeups < MAX_WAKEUPS) {
|
||||
wakeups[num_wakeups++] = {timed_wait->passed_cond, timed_wait->passed_lock};
|
||||
}
|
||||
}
|
||||
|
||||
timed_wait_prev = timed_wait;
|
||||
timed_wait = timed_wait->next;
|
||||
}
|
||||
}
|
||||
|
||||
// Broadcast under _broadcast_mutex (not _timed_waits_mutex) so that
|
||||
// locking passed_lock here doesn't create an ordering cycle.
|
||||
{
|
||||
std::lock_guard<std::mutex> lock_broadcast(_broadcast_mutex);
|
||||
|
||||
for (int i = 0; i < num_wakeups; ++i) {
|
||||
pthread_mutex_lock(wakeups[i].lock);
|
||||
pthread_cond_broadcast(wakeups[i].cond);
|
||||
pthread_mutex_unlock(wakeups[i].lock);
|
||||
}
|
||||
|
||||
_setting_time = false;
|
||||
}
|
||||
@ -101,7 +125,7 @@ int LockstepScheduler::cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *loc
|
||||
{
|
||||
// A TimedWait object might still be in timed_waits_ after we return, so its lifetime needs to be
|
||||
// longer. And using thread_local is more efficient than malloc.
|
||||
static thread_local TimedWait timed_wait;
|
||||
static thread_local TimedWait timed_wait{};
|
||||
{
|
||||
std::lock_guard<std::mutex> lock_timed_waits(_timed_waits_mutex);
|
||||
|
||||
@ -144,9 +168,13 @@ int LockstepScheduler::cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *loc
|
||||
// deadlock due to a different locking order in set_absolute_time().
|
||||
// Note that this case does not happen too frequently, and thus can be
|
||||
// a bit more expensive.
|
||||
// We wait for both the iteration phase (_timed_waits_mutex) and the
|
||||
// broadcast phase (_broadcast_mutex) to complete.
|
||||
pthread_mutex_unlock(lock);
|
||||
_timed_waits_mutex.lock();
|
||||
_timed_waits_mutex.unlock();
|
||||
_broadcast_mutex.lock();
|
||||
_broadcast_mutex.unlock();
|
||||
pthread_mutex_lock(lock);
|
||||
}
|
||||
|
||||
|
||||
@ -65,11 +65,12 @@ void test_condition_timing_out()
|
||||
ls.set_absolute_time(some_time_us);
|
||||
|
||||
std::atomic<bool> should_have_timed_out{false};
|
||||
pthread_mutex_lock(&lock);
|
||||
|
||||
// Use a thread to wait for condition while we already have the lock.
|
||||
// This ensures the synchronization happens in the right order.
|
||||
// The thread that calls cond_timedwait must also lock the mutex,
|
||||
// otherwise set_absolute_time() would re-lock the main thread's
|
||||
// own mutex in its broadcast phase, causing a deadlock.
|
||||
TestThread thread([&ls, &cond, &lock, &should_have_timed_out]() {
|
||||
pthread_mutex_lock(&lock);
|
||||
EXPECT_EQ(ls.cond_timedwait(&cond, &lock, some_time_us + 1000), ETIMEDOUT);
|
||||
EXPECT_TRUE(should_have_timed_out);
|
||||
// It should be re-locked afterwards, so we should be able to unlock it.
|
||||
@ -100,17 +101,25 @@ void test_locked_semaphore_getting_unlocked()
|
||||
LockstepScheduler ls;
|
||||
ls.set_absolute_time(some_time_us);
|
||||
|
||||
pthread_mutex_lock(&lock);
|
||||
// Use a thread to wait for condition while we already have the lock.
|
||||
// This ensures the synchronization happens in the right order.
|
||||
TestThread thread([&ls, &cond, &lock]() {
|
||||
std::atomic<bool> thread_locked{false};
|
||||
|
||||
// The thread locks the mutex itself (POSIX requires the calling thread
|
||||
// to own the mutex passed to pthread_cond_wait).
|
||||
TestThread thread([&ls, &cond, &lock, &thread_locked]() {
|
||||
pthread_mutex_lock(&lock);
|
||||
thread_locked = true;
|
||||
ls.set_absolute_time(some_time_us + 500);
|
||||
EXPECT_EQ(ls.cond_timedwait(&cond, &lock, some_time_us + 1000), 0);
|
||||
// It should be re-locked afterwards, so we should be able to unlock it.
|
||||
EXPECT_EQ(pthread_mutex_unlock(&lock), 0);
|
||||
});
|
||||
|
||||
// Wait until thread has locked the mutex. Our pthread_mutex_lock below
|
||||
// will then block until cond_timedwait releases it via pthread_cond_wait.
|
||||
while (!thread_locked) {
|
||||
std::this_thread::yield();
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&lock);
|
||||
pthread_cond_broadcast(&cond);
|
||||
pthread_mutex_unlock(&lock);
|
||||
@ -142,8 +151,9 @@ public:
|
||||
|
||||
void run()
|
||||
{
|
||||
pthread_mutex_lock(&_lock);
|
||||
_thread = std::make_shared<TestThread>([this]() {
|
||||
pthread_mutex_lock(&_lock);
|
||||
_thread_ready = true;
|
||||
_result = _ls.cond_timedwait(&_cond, &_lock, _timeout);
|
||||
pthread_mutex_unlock(&_lock);
|
||||
});
|
||||
@ -151,7 +161,7 @@ public:
|
||||
|
||||
void check()
|
||||
{
|
||||
if (_is_done) {
|
||||
if (_is_done || !_thread_ready) {
|
||||
return;
|
||||
}
|
||||
|
||||
@ -186,6 +196,7 @@ private:
|
||||
pthread_mutex_t _lock;
|
||||
LockstepScheduler &_ls;
|
||||
std::atomic<bool> _is_done{false};
|
||||
std::atomic<bool> _thread_ready{false};
|
||||
std::atomic<int> _result {INITIAL_RESULT};
|
||||
std::shared_ptr<TestThread> _thread{};
|
||||
};
|
||||
@ -310,7 +321,7 @@ void test_usleep()
|
||||
TEST(LockstepScheduler, All)
|
||||
{
|
||||
for (unsigned iteration = 1; iteration <= 100; ++iteration) {
|
||||
//std::cout << "Test iteration: " << iteration << "\n";
|
||||
std::cout << "Test iteration: " << iteration << "\n";
|
||||
test_absolute_time();
|
||||
test_condition_timing_out();
|
||||
test_locked_semaphore_getting_unlocked();
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user