Compare commits

...

3 Commits

Author SHA1 Message Date
Julian Oes 91101996c1 refactor(parameters): adopt Beat Küng's suggestion 2026-04-09 06:36:22 +12:00
Julian Oes 01d894140b fix(parameters): don't lie in test, do as comment implies 2026-04-09 06:36:20 +12:00
Julian Oes e8f2b2763b fix(parameters): fix data races in DynamicSparseLayer 2026-04-09 06:36:17 +12:00
3 changed files with 294 additions and 42 deletions
+1
View File
@@ -205,3 +205,4 @@ if(${PX4_PLATFORM} STREQUAL "posix" OR ${PX4_PLATFORM} STREQUAL "ros2")
endif()
px4_add_functional_gtest(SRC ParameterTest.cpp LINKLIBS parameters)
px4_add_functional_gtest(SRC DynamicSparseLayerTest.cpp LINKLIBS parameters)
+48 -42
View File
@@ -1,6 +1,6 @@
/****************************************************************************
*
* Copyright (c) 2023 PX4 Development Team. All rights reserved.
* Copyright (c) 2023-2026 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
@@ -55,20 +55,20 @@ public:
slots[i] = {UINT16_MAX, param_value_u{}};
}
_slots.store(slots);
_slots = slots;
}
virtual ~DynamicSparseLayer()
{
if (_slots.load()) {
free(_slots.load());
if (_slots) {
free(_slots);
}
}
bool store(param_t param, param_value_u value) override
{
AtomicTransaction transaction;
Slot *slots = _slots.load();
Slot *slots = _slots;
const int index = _getIndex(param);
@@ -84,7 +84,7 @@ public:
return false;
}
_slots.load()[_next_slot++] = {param, value};
_slots[_next_slot++] = {param, value};
_sort();
}
@@ -101,7 +101,7 @@ public:
{
px4::AtomicBitset<PARAM_COUNT> set;
const AtomicTransaction transaction;
Slot *slots = _slots.load();
Slot *slots = _slots;
for (int i = 0; i < _next_slot; i++) {
set.set(slots[i].param);
@@ -113,7 +113,7 @@ public:
param_value_u get(param_t param) const override
{
const AtomicTransaction transaction;
Slot *slots = _slots.load();
Slot *slots = _slots;
const int index = _getIndex(param);
@@ -128,7 +128,7 @@ public:
{
const AtomicTransaction transaction;
int index = _getIndex(param);
Slot *slots = _slots.load();
Slot *slots = _slots;
if (index < _next_slot) {
slots[index] = {UINT16_MAX, param_value_u{}};
@@ -144,11 +144,13 @@ public:
int size() const override
{
const AtomicTransaction transaction;
return _next_slot;
}
int byteSize() const override
{
const AtomicTransaction transaction;
return _n_slots * sizeof(Slot);
}
@@ -165,14 +167,14 @@ private:
void _sort()
{
qsort(_slots.load(), _n_slots, sizeof(Slot), _slotCompare);
qsort(_slots, _n_slots, sizeof(Slot), _slotCompare);
}
int _getIndex(param_t param) const
{
int left = 0;
int right = _next_slot - 1;
Slot *slots = _slots.load();
Slot *slots = _slots;
while (left <= right) {
int mid = (left + right) / 2;
@@ -197,42 +199,46 @@ private:
return false;
}
int max_retries = 5;
unsigned max_retries = 5;
// As malloc uses locking, so we need to re-enable IRQ's during malloc/free and
// then atomically exchange the buffer
// then exchange the buffer inside a critical section.
while (_next_slot >= _n_slots && max_retries-- > 0) {
Slot *previous_slots = nullptr;
Slot *new_slots = nullptr;
do {
previous_slots = _slots.load();
transaction.unlock();
if (new_slots) {
free(new_slots);
}
new_slots = (Slot *) malloc(sizeof(Slot) * (_n_slots + _n_grow));
transaction.lock();
if (new_slots == nullptr) {
return false;
}
} while (!_slots.compare_exchange(&previous_slots, new_slots));
memcpy(new_slots, previous_slots, sizeof(Slot) * _n_slots);
for (int i = _n_slots; i < _n_slots + _n_grow; i++) {
new_slots[i] = {UINT16_MAX, param_value_u{}};
}
_n_slots += _n_grow;
const int n_slots_new = _n_slots + _n_grow;
transaction.unlock();
free(previous_slots);
Slot *new_slots = (Slot *) malloc(sizeof(Slot) * n_slots_new);
transaction.lock();
if (new_slots == nullptr) {
return false;
}
if (_n_slots + _n_grow == n_slots_new) {
Slot *previous_slots = _slots;
memcpy(new_slots, previous_slots, sizeof(Slot) * _n_slots);
for (int i = _n_slots; i < n_slots_new; i++) {
new_slots[i] = {UINT16_MAX, param_value_u{}};
}
_slots = new_slots;
_n_slots = n_slots_new;
transaction.unlock();
free(previous_slots);
transaction.lock();
// After freeing previous_slots, we still need to continue the loop, because we just unlocked
// the critical section, so the buffer might already be full again at this point.
} else {
// If we end up here then another thread (successfully) increased the buffer already.
// So we can drop the new buffer but we will still need to check again if we need to
// increase the buffer even more.
transaction.unlock();
free(new_slots);
transaction.lock();
}
}
return _next_slot < _n_slots;
@@ -241,5 +247,5 @@ private:
int _next_slot = 0;
int _n_slots = 0;
const int _n_grow;
px4::atomic<Slot *> _slots{nullptr};
Slot *_slots{nullptr};
};
@@ -0,0 +1,245 @@
/****************************************************************************
*
* Copyright (c) 2026 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
* 3. Neither the name PX4 nor the names of its contributors may be
* used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
* OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
****************************************************************************/
/**
* @file DynamicSparseLayerTest.cpp
*
* Concurrent stress tests for DynamicSparseLayer.
*
* Validates thread safety of store/get/contains under concurrent access,
* especially during _grow() operations. Designed to be run with ASAN/TSan
* to catch use-after-free or data races.
*/
#include <gtest/gtest.h>
#include <parameters/px4_parameters.hpp>
#include "DynamicSparseLayer.h"
#include <atomic>
#include <thread>
#include <vector>
/**
* Minimal ParamLayer parent that returns a default zero value for all params.
*/
class StubParamLayer : public ParamLayer
{
public:
StubParamLayer() : ParamLayer(nullptr) {}
bool store(param_t, param_value_u) override { return false; }
bool contains(param_t) const override { return false; }
px4::AtomicBitset<PARAM_COUNT> containedAsBitset() const override
{
return px4::AtomicBitset<PARAM_COUNT>();
}
param_value_u get(param_t) const override
{
param_value_u v{};
v.i = 0;
return v;
}
void reset(param_t) override {}
void refresh(param_t) override {}
int size() const override { return 0; }
int byteSize() const override { return 0; }
};
class DynamicSparseLayerConcurrentTest : public ::testing::Test
{
protected:
StubParamLayer stub;
};
// Single writer forces frequent _grow() while readers concurrently access
// stored params. With only one writer there is no concurrent growth, so
// this safely stresses the read-side during buffer reallocation.
TEST_F(DynamicSparseLayerConcurrentTest, ConcurrentStoreGetRace)
{
for (int rep = 0; rep < 50; rep++) {
DynamicSparseLayer layer(&stub, /*n_prealloc=*/2, /*n_grow=*/1);
constexpr int NUM_PARAMS = 100;
std::atomic<bool> stop{false};
std::thread writer([&]() {
for (int i = 0; i < NUM_PARAMS; i++) {
param_value_u v{};
v.i = i * 10;
layer.store(static_cast<param_t>(i), v);
}
stop.store(true, std::memory_order_release);
});
std::vector<std::thread> readers;
for (int r = 0; r < 4; r++) {
readers.emplace_back([&]() {
while (!stop.load(std::memory_order_acquire)) {
int current_size = layer.size();
for (int i = 0; i < current_size && i < NUM_PARAMS; i++) {
layer.contains(static_cast<param_t>(i));
layer.get(static_cast<param_t>(i));
}
}
});
}
writer.join();
for (auto &t : readers) {
t.join();
}
for (int i = 0; i < NUM_PARAMS; i++) {
ASSERT_TRUE(layer.contains(static_cast<param_t>(i)))
<< "rep=" << rep << " param=" << i;
param_value_u v = layer.get(static_cast<param_t>(i));
ASSERT_EQ(v.i, i * 10)
<< "rep=" << rep << " param=" << i;
}
ASSERT_EQ(layer.size(), NUM_PARAMS);
}
}
// Multiple writers store to disjoint param ranges concurrently.
// Pre-allocated to avoid concurrent _grow() which has a known ABA
// limitation in the CAS retry loop (see _grow() implementation).
// This test validates concurrent store correctness.
TEST_F(DynamicSparseLayerConcurrentTest, ConcurrentMultipleWriters)
{
constexpr int PARAMS_PER_WRITER = 50;
constexpr int NUM_WRITERS = 4;
constexpr int TOTAL = NUM_WRITERS * PARAMS_PER_WRITER;
for (int rep = 0; rep < 20; rep++) {
DynamicSparseLayer layer(&stub, /*n_prealloc=*/TOTAL, /*n_grow=*/1);
std::vector<std::thread> writers;
for (int w = 0; w < NUM_WRITERS; w++) {
writers.emplace_back([&layer, w]() {
int base = w * PARAMS_PER_WRITER;
for (int i = 0; i < PARAMS_PER_WRITER; i++) {
param_value_u v{};
v.i = (base + i) * 10;
layer.store(static_cast<param_t>(base + i), v);
}
});
}
for (auto &t : writers) {
t.join();
}
ASSERT_EQ(layer.size(), TOTAL) << "rep=" << rep;
for (int i = 0; i < TOTAL; i++) {
ASSERT_TRUE(layer.contains(static_cast<param_t>(i)))
<< "rep=" << rep << " param=" << i;
param_value_u v = layer.get(static_cast<param_t>(i));
ASSERT_EQ(v.i, i * 10)
<< "rep=" << rep << " param=" << i;
}
}
}
// Combined stress: writers store while readers access concurrently.
TEST_F(DynamicSparseLayerConcurrentTest, ConcurrentWritersAndReaders)
{
constexpr int PARAMS_PER_WRITER = 50;
constexpr int NUM_WRITERS = 2;
constexpr int TOTAL = NUM_WRITERS * PARAMS_PER_WRITER;
for (int rep = 0; rep < 20; rep++) {
DynamicSparseLayer layer(&stub, /*n_prealloc=*/2, /*n_grow=*/1);
std::atomic<bool> stop{false};
std::vector<std::thread> writers;
for (int w = 0; w < NUM_WRITERS; w++) {
writers.emplace_back([&layer, w]() {
int base = w * PARAMS_PER_WRITER;
for (int i = 0; i < PARAMS_PER_WRITER; i++) {
param_value_u v{};
v.i = (base + i) * 10;
layer.store(static_cast<param_t>(base + i), v);
}
});
}
std::vector<std::thread> readers;
for (int r = 0; r < 4; r++) {
readers.emplace_back([&]() {
while (!stop.load(std::memory_order_acquire)) {
int current_size = layer.size();
for (int i = 0; i < current_size && i < TOTAL; i++) {
layer.contains(static_cast<param_t>(i));
layer.get(static_cast<param_t>(i));
}
}
});
}
for (auto &t : writers) {
t.join();
}
stop.store(true, std::memory_order_release);
for (auto &t : readers) {
t.join();
}
ASSERT_EQ(layer.size(), TOTAL) << "rep=" << rep;
for (int i = 0; i < TOTAL; i++) {
ASSERT_TRUE(layer.contains(static_cast<param_t>(i)))
<< "rep=" << rep << " param=" << i;
param_value_u v = layer.get(static_cast<param_t>(i));
ASSERT_EQ(v.i, i * 10)
<< "rep=" << rep << " param=" << i;
}
}
}