Compare commits

...

4 Commits

Author SHA1 Message Date
Marco Hauswirth 8542261cab ekf2: implement multi-instance AGP fusion
- Support up to 4 independent AGP sources simultaneously
- Dynamic parameter lookup without boilerplate switch statements
- Per-source state tracking and fusion logic
- Update simulator to use new AuxGlobalPosition message
2025-12-19 14:42:13 +01:00
Marco Hauswirth b9af95903b ekf2: add multi-instance AGP parameters
Add per-instance parameters (ID, CTRL, MODE, DELAY, NOISE, GATE)
for up to 4 auxiliary global position sources.
2025-12-19 14:42:13 +01:00
Marco Hauswirth ec7973e4a2 uorb: add AuxGlobalPosition message
Add dedicated message type for auxiliary global position sources.
Separates aux_global_position from VehicleGlobalPosition topic.
2025-12-19 14:42:12 +01:00
Marco Hauswirth fee7da696d uxrce_dds: support multi-instance uORB topics
Enable DDS bridge to handle multi-instance uORB topics by mapping
multiple topic instances to a single DDS topic with instance field.
2025-12-19 09:06:27 +01:00
13 changed files with 389 additions and 154 deletions
+22
View File
@@ -0,0 +1,22 @@
# Auxiliary global position
# This message provides global position data from an external source such as
# radio-triangulation, viusal navigation, or other positioning system.
uint32 MESSAGE_VERSION = 0
uint64 timestamp # [us] Time since system start
uint64 timestamp_sample # [us] Timestamp of the raw data
uint8 id # Unique identifier for the AGP sourcxe, 1X for Visual Navigation, 2X for Radio Triangulation
float64 lat # [deg] Latitude in WGS84
float64 lon # [deg] Longitude in WGS84
float32 alt # [m] Altitude above mean sea level (AMSL)
float32 eph # [m] Standard deviation of horizontal position error
float32 epv # [m] Standard deviation of vertical position error
uint8 lat_lon_reset_counter # Counter for reset events on horizontal position coordinates
# TOPICS aux_global_position
+1
View File
@@ -47,6 +47,7 @@ set(msg_files
Airspeed.msg
AirspeedWind.msg
AutotuneAttitudeControlStatus.msg
AuxGlobalPosition.msg
BatteryInfo.msg
ButtonEvent.msg
CameraCapture.msg
-1
View File
@@ -34,4 +34,3 @@ bool dead_reckoning # True if this position is estimated through dead-reckoning
# TOPICS vehicle_global_position vehicle_global_position_groundtruth external_ins_global_position
# TOPICS estimator_global_position
# TOPICS aux_global_position
@@ -42,147 +42,159 @@ void AuxGlobalPosition::update(Ekf &ekf, const estimator::imuSample &imu_delayed
#if defined(MODULE_NAME)
if (_aux_global_position_sub.updated()) {
for (int instance = 0; instance < MAX_AGP_IDS; instance++) {
if (_aux_global_position_subs[instance].updated()) {
vehicle_global_position_s aux_global_position{};
_aux_global_position_sub.copy(&aux_global_position);
aux_global_position_s aux_global_position{};
_aux_global_position_subs[instance].copy(&aux_global_position);
const int64_t time_us = aux_global_position.timestamp_sample - static_cast<int64_t>(_param_ekf2_agp_delay.get() * 1000);
const uint8_t sensor_id = aux_global_position.id;
const int slot = mapSensorIdToSlot(sensor_id);
AuxGlobalPositionSample sample{};
sample.time_us = time_us;
sample.latitude = aux_global_position.lat;
sample.longitude = aux_global_position.lon;
sample.altitude_amsl = aux_global_position.alt;
sample.eph = aux_global_position.eph;
sample.epv = aux_global_position.epv;
sample.lat_lon_reset_counter = aux_global_position.lat_lon_reset_counter;
if (slot >= MAX_AGP_IDS) {
// All parameter slots are full, cannot handle this sensor. Set ID of unused slot to 0
continue;
}
_aux_global_position_buffer.push(sample);
const int64_t time_us = aux_global_position.timestamp_sample - static_cast<int64_t>(getDelayParam(slot) * 1000);
_time_last_buffer_push = imu_delayed.time_us;
AuxGlobalPositionSample sample{};
sample.time_us = time_us;
sample.id = sensor_id;
sample.latitude = aux_global_position.lat;
sample.longitude = aux_global_position.lon;
sample.altitude_amsl = aux_global_position.alt;
sample.eph = aux_global_position.eph;
sample.epv = aux_global_position.epv;
sample.lat_lon_reset_counter = aux_global_position.lat_lon_reset_counter;
_sources[slot].buffer.push(sample);
_sources[slot].time_last_buffer_push = imu_delayed.time_us;
}
}
#endif // MODULE_NAME
AuxGlobalPositionSample sample;
for (int slot = 0; slot < MAX_AGP_IDS; slot++) {
SourceData &source = _sources[slot];
AuxGlobalPositionSample sample;
if (_aux_global_position_buffer.pop_first_older_than(imu_delayed.time_us, &sample)) {
if (source.buffer.pop_first_older_than(imu_delayed.time_us, &sample)) {
if (!(_param_ekf2_agp_ctrl.get() & static_cast<int32_t>(Ctrl::kHPos))) {
return;
}
if (!(getCtrlParam(slot) & static_cast<int32_t>(Ctrl::kHPos))) {
continue;
}
estimator_aid_source2d_s &aid_src = _aid_src_aux_global_position;
const LatLonAlt position(sample.latitude, sample.longitude, sample.altitude_amsl);
const Vector2f innovation = (ekf.getLatLonAlt() - position).xy(); // altitude measurements are not used
estimator_aid_source2d_s &aid_src = source.aid_src;
const LatLonAlt position(sample.latitude, sample.longitude, sample.altitude_amsl);
const Vector2f innovation = (ekf.getLatLonAlt() - position).xy(); // altitude measurements are not used
// relax the upper observation noise limit which prevents bad measurements perturbing the position estimate
float pos_noise = math::max(sample.eph, _param_ekf2_agp_noise.get(), 0.01f);
const float pos_var = sq(pos_noise);
const Vector2f pos_obs_var(pos_var, pos_var);
float pos_noise = math::max(sample.eph, getNoiseParam(slot), 0.01f);
const float pos_var = sq(pos_noise);
const Vector2f pos_obs_var(pos_var, pos_var);
ekf.updateAidSourceStatus(aid_src,
sample.time_us, // sample timestamp
matrix::Vector2d(sample.latitude, sample.longitude), // observation
pos_obs_var, // observation variance
innovation, // innovation
Vector2f(ekf.getPositionVariance()) + pos_obs_var, // innovation variance
math::max(_param_ekf2_agp_gate.get(), 1.f)); // innovation gate
ekf.updateAidSourceStatus(aid_src,
sample.time_us, // sample timestamp
matrix::Vector2d(sample.latitude, sample.longitude), // observation
pos_obs_var, // observation variance
innovation, // innovation
Vector2f(ekf.getPositionVariance()) + pos_obs_var, // innovation variance
math::max(getGateParam(slot), 1.f)); // innovation gate
const bool starting_conditions = PX4_ISFINITE(sample.latitude) && PX4_ISFINITE(sample.longitude)
&& ekf.control_status_flags().yaw_align;
const bool continuing_conditions = starting_conditions
&& ekf.global_origin_valid();
const bool starting_conditions = PX4_ISFINITE(sample.latitude) && PX4_ISFINITE(sample.longitude)
&& ekf.control_status_flags().yaw_align;
const bool continuing_conditions = starting_conditions
&& ekf.global_origin_valid();
switch (_state) {
case State::kStopped:
switch (source.state) {
case State::kStopped:
/* FALLTHROUGH */
case State::kStarting:
if (starting_conditions) {
_state = State::kStarting;
/* FALLTHROUGH */
case State::kStarting:
if (starting_conditions) {
source.state = State::kStarting;
if (ekf.global_origin_valid()) {
const bool fused = ekf.fuseHorizontalPosition(aid_src);
bool reset = false;
if (ekf.global_origin_valid()) {
const bool fused = ekf.fuseHorizontalPosition(aid_src);
bool reset = false;
if (!fused && isResetAllowed(ekf)) {
ekf.resetHorizontalPositionTo(sample.latitude, sample.longitude, Vector2f(aid_src.observation_variance));
ekf.resetAidSourceStatusZeroInnovation(aid_src);
reset = true;
if (!fused && isResetAllowed(ekf, slot)) {
ekf.resetHorizontalPositionTo(sample.latitude, sample.longitude, Vector2f(aid_src.observation_variance));
ekf.resetAidSourceStatusZeroInnovation(aid_src);
reset = true;
}
if (fused || reset) {
ekf.enableControlStatusAuxGpos();
source.reset_counters.lat_lon = sample.lat_lon_reset_counter;
source.state = State::kActive;
}
} else {
// Try to initialize using measurement
if (ekf.resetGlobalPositionTo(sample.latitude, sample.longitude, sample.altitude_amsl, pos_var,
sq(sample.epv))) {
ekf.resetAidSourceStatusZeroInnovation(aid_src);
ekf.enableControlStatusAuxGpos();
source.reset_counters.lat_lon = sample.lat_lon_reset_counter;
source.state = State::kActive;
}
}
}
if (fused || reset) {
ekf.enableControlStatusAuxGpos();
_reset_counters.lat_lon = sample.lat_lon_reset_counter;
_state = State::kActive;
break;
case State::kActive:
if (continuing_conditions) {
ekf.fuseHorizontalPosition(aid_src);
if (isTimedOut(aid_src.time_last_fuse, imu_delayed.time_us, ekf._params.reset_timeout_max)
|| (source.reset_counters.lat_lon != sample.lat_lon_reset_counter)) {
if (isResetAllowed(ekf, slot)) {
ekf.resetHorizontalPositionTo(sample.latitude, sample.longitude, Vector2f(aid_src.observation_variance));
ekf.resetAidSourceStatusZeroInnovation(aid_src);
source.reset_counters.lat_lon = sample.lat_lon_reset_counter;
} else {
ekf.disableControlStatusAuxGpos();
source.state = State::kStopped;
}
}
} else {
// Try to initialize using measurement
if (ekf.resetGlobalPositionTo(sample.latitude, sample.longitude, sample.altitude_amsl, pos_var,
sq(sample.epv))) {
ekf.resetAidSourceStatusZeroInnovation(aid_src);
ekf.enableControlStatusAuxGpos();
_reset_counters.lat_lon = sample.lat_lon_reset_counter;
_state = State::kActive;
}
}
}
break;
case State::kActive:
if (continuing_conditions) {
ekf.fuseHorizontalPosition(aid_src);
if (isTimedOut(aid_src.time_last_fuse, imu_delayed.time_us, ekf._params.reset_timeout_max)
|| (_reset_counters.lat_lon != sample.lat_lon_reset_counter)) {
if (isResetAllowed(ekf)) {
ekf.resetHorizontalPositionTo(sample.latitude, sample.longitude, Vector2f(aid_src.observation_variance));
ekf.resetAidSourceStatusZeroInnovation(aid_src);
_reset_counters.lat_lon = sample.lat_lon_reset_counter;
} else {
ekf.disableControlStatusAuxGpos();
_state = State::kStopped;
}
ekf.disableControlStatusAuxGpos();
source.state = State::kStopped;
}
} else {
ekf.disableControlStatusAuxGpos();
_state = State::kStopped;
break;
default:
break;
}
break;
default:
break;
}
#if defined(MODULE_NAME)
aid_src.timestamp = hrt_absolute_time();
_estimator_aid_src_aux_global_position_pub.publish(aid_src);
aid_src.timestamp = hrt_absolute_time();
source.aid_src_pub.publish(aid_src);
_test_ratio_filtered = math::max(fabsf(aid_src.test_ratio_filtered[0]), fabsf(aid_src.test_ratio_filtered[1]));
source.test_ratio_filtered = math::max(fabsf(aid_src.test_ratio_filtered[0]), fabsf(aid_src.test_ratio_filtered[1]));
#endif // MODULE_NAME
} else if ((_state != State::kStopped) && isTimedOut(_time_last_buffer_push, imu_delayed.time_us, (uint64_t)5e6)) {
ekf.disableControlStatusAuxGpos();
_state = State::kStopped;
ECL_WARN("Aux global position data stopped");
} else if ((source.state != State::kStopped) && isTimedOut(source.time_last_buffer_push, imu_delayed.time_us, (uint64_t)5e6)) {
ekf.disableControlStatusAuxGpos();
source.state = State::kStopped;
ECL_WARN("Aux global position data stopped for slot %d (sensor ID %d)", slot, getIdParam(slot));
}
}
}
bool AuxGlobalPosition::isResetAllowed(const Ekf &ekf) const
bool AuxGlobalPosition::isResetAllowed(const Ekf &ekf, int slot) const
{
return ((static_cast<Mode>(_param_ekf2_agp_mode.get()) == Mode::kAuto)
return ((static_cast<Mode>(getModeParam(slot)) == Mode::kAuto)
&& !ekf.isOtherSourceOfHorizontalPositionAidingThan(ekf.control_status_flags().aux_gpos))
|| ((static_cast<Mode>(_param_ekf2_agp_mode.get()) == Mode::kDeadReckoning)
|| ((static_cast<Mode>(getModeParam(slot)) == Mode::kDeadReckoning)
&& !ekf.isOtherSourceOfHorizontalAidingThan(ekf.control_status_flags().aux_gpos));
}
@@ -49,10 +49,12 @@
#if defined(MODULE_NAME)
# include <px4_platform_common/module_params.h>
# include <px4_platform_common/log.h>
# include <lib/parameters/param.h>
# include <uORB/PublicationMulti.hpp>
# include <uORB/Subscription.hpp>
# include <uORB/topics/estimator_aid_source2d.h>
# include <uORB/topics/vehicle_global_position.h>
# include <uORB/topics/aux_global_position.h>
#endif // MODULE_NAME
class Ekf;
@@ -60,9 +62,28 @@ class Ekf;
class AuxGlobalPosition : public ModuleParams
{
public:
static constexpr uint8_t MAX_AGP_IDS = 4;
AuxGlobalPosition() : ModuleParams(nullptr)
{
_estimator_aid_src_aux_global_position_pub.advertise();
for (int i = 0; i < MAX_AGP_IDS; i++) {
_sources[i].aid_src_pub.advertise();
}
// check existing ID to initialize parameters
for (int i = 0; i < MAX_AGP_IDS; i++) {
if (getIdParam(i) != 0) {
getCtrlParam(i);
getModeParam(i);
getDelayParam(i);
getNoiseParam(i);
getGateParam(i);
} else {
break;
}
}
}
~AuxGlobalPosition() = default;
@@ -74,7 +95,14 @@ public:
updateParams();
}
float test_ratio_filtered() const { return _test_ratio_filtered; }
float test_ratio_filtered(uint8_t id = 0) const
{
if (id < MAX_AGP_IDS) {
return _sources[id].test_ratio_filtered;
}
return INFINITY;
}
private:
bool isTimedOut(uint64_t last_sensor_timestamp, uint64_t time_delayed_us, uint64_t timeout_period) const
@@ -82,10 +110,11 @@ private:
return (last_sensor_timestamp == 0) || (last_sensor_timestamp + timeout_period < time_delayed_us);
}
bool isResetAllowed(const Ekf &ekf) const;
bool isResetAllowed(const Ekf &ekf, int slot) const;
struct AuxGlobalPositionSample {
uint64_t time_us{}; ///< timestamp of the measurement (uSec)
uint8_t id{}; ///< source identifier
double latitude{};
double longitude{};
float altitude_amsl{};
@@ -94,10 +123,6 @@ private:
uint8_t lat_lon_reset_counter{};
};
estimator_aid_source2d_s _aid_src_aux_global_position{};
RingBuffer<AuxGlobalPositionSample> _aux_global_position_buffer{20}; // TODO: size with _obs_buffer_length and actual publication rate
uint64_t _time_last_buffer_push{0};
enum class Ctrl : uint8_t {
kHPos = (1 << 0),
kVPos = (1 << 1)
@@ -114,26 +139,120 @@ private:
kActive,
};
State _state{State::kStopped};
float _test_ratio_filtered{INFINITY};
#if defined(MODULE_NAME)
struct reset_counters_s {
uint8_t lat_lon{};
};
reset_counters_s _reset_counters{};
uORB::PublicationMulti<estimator_aid_source2d_s> _estimator_aid_src_aux_global_position_pub{ORB_ID(estimator_aid_src_aux_global_position)};
uORB::Subscription _aux_global_position_sub{ORB_ID(aux_global_position)};
struct SourceData {
estimator_aid_source2d_s aid_src{};
RingBuffer<AuxGlobalPositionSample> buffer{20};
uint64_t time_last_buffer_push{0};
State state{State::kStopped};
float test_ratio_filtered{0.f};
reset_counters_s reset_counters{};
uORB::PublicationMulti<estimator_aid_source2d_s> aid_src_pub{ORB_ID(estimator_aid_src_aux_global_position)};
};
DEFINE_PARAMETERS(
(ParamInt<px4::params::EKF2_AGP_CTRL>) _param_ekf2_agp_ctrl,
(ParamInt<px4::params::EKF2_AGP_MODE>) _param_ekf2_agp_mode,
(ParamFloat<px4::params::EKF2_AGP_DELAY>) _param_ekf2_agp_delay,
(ParamFloat<px4::params::EKF2_AGP_NOISE>) _param_ekf2_agp_noise,
(ParamFloat<px4::params::EKF2_AGP_GATE>) _param_ekf2_agp_gate
)
SourceData _sources[MAX_AGP_IDS];
uORB::Subscription _aux_global_position_subs[MAX_AGP_IDS] {
{ORB_ID(aux_global_position), 0},
{ORB_ID(aux_global_position), 1},
{ORB_ID(aux_global_position), 2},
{ORB_ID(aux_global_position), 3}
};
int32_t getAgpParamInt32(const char *param_suffix, int instance) const
{
char param_name[20] {};
snprintf(param_name, sizeof(param_name), "EKF2_AGP%d_%s", instance, param_suffix);
int32_t value = 0;
if (param_get(param_find(param_name), &value) != 0) {
PX4_ERR("failed to get %s", param_name);
}
return value;
}
float getAgpParamFloat(const char *param_suffix, int instance) const
{
char param_name[20] {};
snprintf(param_name, sizeof(param_name), "EKF2_AGP%d_%s", instance, param_suffix);
float value = NAN;
if (param_get(param_find(param_name), &value) != 0) {
PX4_ERR("failed to get %s", param_name);
}
return value;
}
bool setAgpParamInt32(const char *param_suffix, int instance, int32_t value)
{
char param_name[20] {};
snprintf(param_name, sizeof(param_name), "EKF2_AGP%d_%s", instance, param_suffix);
return param_set_no_notification(param_find(param_name), &value) == PX4_OK;
}
int32_t getCtrlParam(int instance) const
{
return getAgpParamInt32("CTRL", instance);
}
int32_t getModeParam(int instance) const
{
return getAgpParamInt32("MODE", instance);
}
float getDelayParam(int instance) const
{
return getAgpParamFloat("DELAY", instance);
}
float getNoiseParam(int instance) const
{
return getAgpParamFloat("NOISE", instance);
}
float getGateParam(int instance) const
{
return getAgpParamFloat("GATE", instance);
}
int32_t getIdParam(int instance) const
{
return getAgpParamInt32("ID", instance);
}
void setIdParam(int instance, int32_t sensor_id)
{
setAgpParamInt32("ID", instance, sensor_id);
}
int mapSensorIdToSlot(int32_t sensor_id)
{
for (int slot = 0; slot < MAX_AGP_IDS; slot++) {
if (getIdParam(slot) == sensor_id) {
return slot;
}
}
// Not found, try to assign to first available slot
for (int slot = 0; slot < MAX_AGP_IDS; slot++) {
if (getIdParam(slot) == 0) {
setIdParam(slot, sensor_id);
return slot;
}
}
// All slots are full
return MAX_AGP_IDS;
}
#endif // MODULE_NAME
};
@@ -1,10 +1,22 @@
__max_num_agp_instances: &max_num_agp_instances 4
module_name: ekf2
parameters:
- group: EKF2
definitions:
EKF2_AGP_CTRL:
EKF2_AGP${i}_ID:
description:
short: Aux global position (AGP) sensor aiding
short: Aux global position sensor ${i} ID
long: 'Sensor ID for slot ${i}. Set to 0 to disable this slot.'
type: int32
default: 0
min: 0
max: 255
num_instances: *max_num_agp_instances
instance_start: 0
EKF2_AGP${i}_CTRL:
description:
short: Aux global position sensor ${i} aiding
long: 'Set bits in the following positions to enable: 0 : Horizontal position
fusion 1 : Vertical position fusion'
type: bitmask
@@ -14,9 +26,11 @@ parameters:
default: 0
min: 0
max: 3
EKF2_AGP_MODE:
num_instances: *max_num_agp_instances
instance_start: 0
EKF2_AGP${i}_MODE:
description:
short: Fusion reset mode
short: Fusion reset mode for sensor ${i}
long: 'Automatic: reset on fusion timeout if no other source of position is available
Dead-reckoning: reset on fusion timeout if no source of velocity is available'
type: enum
@@ -24,28 +38,34 @@ parameters:
0: Automatic
1: Dead-reckoning
default: 0
EKF2_AGP_DELAY:
num_instances: *max_num_agp_instances
instance_start: 0
EKF2_AGP${i}_DELAY:
description:
short: Aux global position estimator delay relative to IMU measurements
short: Aux global position sensor ${i} delay relative to IMU measurements
type: float
default: 0
min: 0
max: 300
max: 1000
unit: ms
reboot_required: true
decimal: 1
EKF2_AGP_NOISE:
num_instances: *max_num_agp_instances
instance_start: 0
EKF2_AGP${i}_NOISE:
description:
short: Measurement noise for aux global position measurements
short: Measurement noise for aux global position sensor ${i}
long: Used to lower bound or replace the uncertainty included in the message
type: float
default: 0.9
default: 1.0
min: 0.01
unit: m
decimal: 2
EKF2_AGP_GATE:
num_instances: *max_num_agp_instances
instance_start: 0
EKF2_AGP${i}_GATE:
description:
short: Gate size for aux global position fusion
short: Gate size for aux global position sensor ${i} fusion
long: Sets the number of standard deviations used by the innovation consistency
test.
type: float
@@ -53,3 +73,5 @@ parameters:
min: 1.0
unit: SD
decimal: 1
num_instances: *max_num_agp_instances
instance_start: 0
+2 -2
View File
@@ -208,7 +208,7 @@ void LoggedTopics::add_default_topics()
add_topic_multi("vehicle_imu_status", 1000, 4);
add_optional_topic_multi("vehicle_magnetometer", 500, 4);
add_topic("vehicle_optical_flow", 500);
add_topic("aux_global_position", 500);
add_topic_multi("aux_global_position", 500);
add_optional_topic("pps_capture");
// additional control allocation logging
@@ -317,7 +317,7 @@ void LoggedTopics::add_estimator_replay_topics()
add_topic("vehicle_magnetometer");
add_topic("vehicle_status");
add_topic("vehicle_visual_odometry");
add_topic("aux_global_position");
add_topic_multi("aux_global_position");
add_topic_multi("distance_sensor");
}
+1
View File
@@ -53,6 +53,7 @@
#include <uORB/topics/vehicle_optical_flow.h>
#include <uORB/topics/vehicle_status.h>
#include <uORB/topics/vehicle_odometry.h>
#include <uORB/topics/aux_global_position.h>
#include "ReplayEkf2.hpp"
@@ -129,17 +129,16 @@ void SensorAgpSim::Run()
CONSTANTS_RADIUS_OF_EARTH);
const double altitude = (double)(_measured_lla.altitude() + (generate_wgn() * 0.5f));
vehicle_global_position_s sample{};
aux_global_position_s sample{};
sample.id = 33;
sample.timestamp_sample = gpos.timestamp_sample;
sample.lat = latitude;
sample.lon = longitude;
sample.alt = altitude;
sample.lat_lon_valid = true;
sample.alt_ellipsoid = altitude;
sample.alt_valid = true;
sample.eph = 20.f;
sample.epv = 5.f;
sample.lat_lon_reset_counter = 0;
sample.timestamp = hrt_absolute_time();
_aux_global_position_pub.publish(sample);
@@ -44,6 +44,7 @@
#include <uORB/SubscriptionInterval.hpp>
#include <uORB/topics/parameter_update.h>
#include <uORB/topics/vehicle_global_position.h>
#include <uORB/topics/aux_global_position.h>
using namespace time_literals;
@@ -83,7 +84,7 @@ private:
uORB::SubscriptionInterval _parameter_update_sub{ORB_ID(parameter_update), 1_s};
uORB::Subscription _vehicle_global_position_sub{ORB_ID(vehicle_global_position_groundtruth)};
uORB::PublicationMulti<vehicle_global_position_s> _aux_global_position_pub{ORB_ID(aux_global_position)};
uORB::PublicationMulti<aux_global_position_s> _aux_global_position_pub{ORB_ID(aux_global_position)};
perf_counter_t _loop_perf{perf_alloc(PC_ELAPSED, MODULE_NAME": cycle")};
+37 -4
View File
@@ -162,8 +162,22 @@ struct RcvTopicsPubs {
uORB::Publication<@(sub['simple_base_type'])_s> @(sub['topic_simple'])_pub{ORB_ID(@(sub['topic_simple']))};
@[ end for]@
@[ for sub in subscriptions_multi]@
uORB::PublicationMulti<@(sub['simple_base_type'])_s> @(sub['topic_simple'])_pub{ORB_ID(@(sub['topic_simple']))};
@# Group subscriptions_multi by topic to create arrays
@{
multi_topic_groups = {}
for sub in subscriptions_multi:
topic_simple = sub['topic_simple']
if topic_simple not in multi_topic_groups:
multi_topic_groups[topic_simple] = []
multi_topic_groups[topic_simple].append(sub)
}@
@[ for topic_simple, subs in multi_topic_groups.items()]@
uORB::PublicationMulti<@(subs[0]['simple_base_type'])_s> @(topic_simple)_pubs[@(len(subs))] {
@[ for idx, sub in enumerate(subs)]@
{ORB_ID(@(topic_simple))}@('' if idx == len(subs)-1 else ',')
@[ end for]@
};
@[ end for]@
uint32_t num_payload_received{};
@@ -179,7 +193,7 @@ static void on_topic_update(uxrSession *session, uxrObjectId object_id, uint16_t
pubs->num_payload_received += length;
switch (object_id.id) {
@[ for idx, sub in enumerate(subscriptions + subscriptions_multi)]@
@[ for idx, sub in enumerate(subscriptions)]@
case @(idx)+ (65535U / 32U) + 1: {
@(sub['simple_base_type'])_s data;
@@ -190,6 +204,18 @@ static void on_topic_update(uxrSession *session, uxrObjectId object_id, uint16_t
}
break;
@[ end for]@
@[ for idx, sub in enumerate(subscriptions_multi)]@
case @(idx + len(subscriptions))+ (65535U / 32U) + 1: {
@(sub['simple_base_type'])_s data;
if (ucdr_deserialize_@(sub['simple_base_type'])(*ub, data, time_offset_us)) {
//print_message(ORB_ID(@(sub['simple_base_type'])), data);
pubs->@(sub['topic_simple'])_pubs[@(sub['instance'])].publish(data);
}
}
break;
@[ end for]@
default:
@@ -200,13 +226,20 @@ static void on_topic_update(uxrSession *session, uxrObjectId object_id, uint16_t
bool RcvTopicsPubs::init(uxrSession *session, uxrStreamId reliable_out_stream_id, uxrStreamId reliable_in_stream_id, uxrStreamId best_effort_in_stream_id, uxrObjectId participant_id, const char *client_namespace)
{
@[ for idx, sub in enumerate(subscriptions + subscriptions_multi)]@
@[ for idx, sub in enumerate(subscriptions)]@
{
uint16_t queue_depth = orb_get_queue_size(ORB_ID(@(sub['simple_base_type']))) * 2; // use a bit larger queue size than internal
uint32_t message_version = get_message_version<@(sub['simple_base_type'])_s>();
create_data_reader(session, reliable_out_stream_id, best_effort_in_stream_id, participant_id, @(idx), client_namespace, "@(sub['topic'])", message_version, "@(sub['dds_type'])", queue_depth);
}
@[ end for]@
@[ for idx, sub in enumerate(subscriptions_multi)]@
{
uint16_t queue_depth = orb_get_queue_size(ORB_ID(@(sub['topic_simple']))) * 2; // use a bit larger queue size than internal
uint32_t message_version = get_message_version<@(sub['simple_base_type'])_s>();
create_data_reader(session, reliable_out_stream_id, best_effort_in_stream_id, participant_id, @(idx + len(subscriptions)), client_namespace, "@(sub['topic'])", message_version, "@(sub['dds_type'])", queue_depth);
}
@[ end for]@
uxr_set_topic_callback(session, on_topic_update, this);
+9 -2
View File
@@ -52,7 +52,7 @@ publications:
- topic: /fmu/out/transponder_report
type: px4_msgs::msg::TransponderReport
# - topic: /fmu/out/vehicle_angular_velocity
# type: px4_msgs::msg::VehicleAngularVelocity
@@ -191,7 +191,7 @@ subscriptions:
type: px4_msgs::msg::ActuatorServos
- topic: /fmu/in/aux_global_position
type: px4_msgs::msg::VehicleGlobalPosition
type: px4_msgs::msg::AuxGlobalPosition
- topic: /fmu/in/fixed_wing_longitudinal_setpoint
type: px4_msgs::msg::FixedWingLongitudinalSetpoint
@@ -228,3 +228,10 @@ subscriptions:
# Create uORB::PublicationMulti
subscriptions_multi:
- type: px4_msgs::msg::AuxGlobalPosition
uorb_topic: aux_global_position
ros_topics:
- /fmu/in/aux_global_position_A
- /fmu/in/aux_global_position_B
- /fmu/in/aux_global_position_C
- /fmu/in/aux_global_position_D
@@ -131,11 +131,30 @@ if subs_not_empty:
merged_em_globals['subscriptions'] = msg_map['subscriptions'] if subs_not_empty else []
subs_multi_not_empty = msg_map['subscriptions_multi'] is not None
expanded_subs_multi = []
if subs_multi_not_empty:
for sm in msg_map['subscriptions_multi']:
process_message_type(sm)
if 'ros_topics' in sm and 'uorb_topic' in sm:
# Expand each DDS topic into a separate entry
# All entries for the same uorb_topic will use the same array
uorb_topic_name = sm['uorb_topic']
for idx, dds_topic in enumerate(sm['ros_topics']):
expanded_entry = {
'type': sm['type'],
'topic': dds_topic,
'uorb_topic': uorb_topic_name,
'instance': idx
}
process_message_type(expanded_entry)
expanded_entry['topic_simple'] = uorb_topic_name
expanded_subs_multi.append(expanded_entry)
else:
# Fallback for old-style single topic entries
process_message_type(sm)
sm['instance'] = 0
expanded_subs_multi.append(sm)
merged_em_globals['subscriptions_multi'] = msg_map['subscriptions_multi'] if subs_multi_not_empty else []
merged_em_globals['subscriptions_multi'] = expanded_subs_multi
merged_em_globals['type_includes'] = sorted(set(all_type_includes))