Logger: dynamic number of subscriptions

Keep a fixed-size array of 250 requested topics on the stack, then allocate
an array with LoggerSubscription with the exact required size.
This commit is contained in:
Beat Küng
2019-12-13 16:51:09 +01:00
committed by Daniel Agar
parent 2757647897
commit 738ceab0ee
2 changed files with 115 additions and 58 deletions
+91 -52
View File
@@ -174,6 +174,8 @@ int Logger::task_spawn(int argc, char *argv[])
int Logger::print_status()
{
PX4_INFO("Running in mode: %s", configured_backend_mode());
PX4_INFO("Number of subscriptions: %i (%i bytes)", _num_subscriptions,
(int)(_num_subscriptions * sizeof(LoggerSubscription)));
bool is_logging = false;
@@ -394,9 +396,8 @@ Logger::~Logger()
free(_replay_file_name);
}
if (_msg_buffer) {
delete[](_msg_buffer);
}
delete[](_msg_buffer);
delete[](_subscriptions);
}
bool Logger::request_stop_static()
@@ -409,69 +410,75 @@ bool Logger::request_stop_static()
return true;
}
LoggerSubscription *Logger::add_topic(const orb_metadata *topic, uint32_t interval_ms, uint8_t instance)
bool Logger::add_topic(const orb_metadata *topic, uint16_t interval_ms, uint8_t instance)
{
LoggerSubscription *subscription = nullptr;
size_t fields_len = strlen(topic->o_fields) + strlen(topic->o_name) + 1; //1 for ':'
if (fields_len > sizeof(ulog_message_format_s::format)) {
PX4_WARN("skip topic %s, format string is too large: %zu (max is %zu)", topic->o_name, fields_len,
sizeof(ulog_message_format_s::format));
return nullptr;
return false;
}
if (_subscriptions.push_back(LoggerSubscription{topic, interval_ms, instance})) {
subscription = &_subscriptions[_subscriptions.size() - 1];
} else {
if (_requested_subscriptions->count >= MAX_TOPICS_NUM) {
PX4_WARN("Too many subscriptions, failed to add: %s %d", topic->o_name, instance);
return false;
}
return subscription;
RequestedSubscription &sub = _requested_subscriptions->sub[_requested_subscriptions->count++];
sub.topic = topic;
sub.interval_ms = interval_ms;
sub.instance = instance;
return true;
}
bool Logger::add_topic(const char *name, uint32_t interval_ms, uint8_t instance)
bool Logger::add_topic(const char *name, uint16_t interval_ms, uint8_t instance)
{
// if we poll on a topic, we don't use the interval and let the polled topic define the maximum interval
if (_polling_topic_meta) {
interval_ms = 0;
}
if (!_requested_subscriptions) {
PX4_ERR("logic error"); // sanity-check: it is a bug if we land here
return false;
}
const orb_metadata *const *topics = orb_get_topics();
LoggerSubscription *subscription = nullptr;
bool success = false;
for (size_t i = 0; i < orb_topics_count(); i++) {
if (strcmp(name, topics[i]->o_name) == 0) {
bool already_added = false;
// check if already added: if so, only update the interval
for (size_t j = 0; j < _subscriptions.size(); ++j) {
if ((_subscriptions[j].get_topic() == topics[i]) && (_subscriptions[j].get_instance() == instance)) {
for (int j = 0; j < _requested_subscriptions->count; ++j) {
if (_requested_subscriptions->sub[j].topic == topics[i] &&
_requested_subscriptions->sub[j].instance == instance) {
PX4_DEBUG("logging topic %s(%d), interval: %i, already added, only setting interval",
topics[i]->o_name, instance, interval_ms);
_subscriptions[j].set_interval_ms(interval_ms);
subscription = &_subscriptions[j];
_requested_subscriptions->sub[j].interval_ms = interval_ms;
success = true;
already_added = true;
break;
}
}
if (!already_added) {
subscription = add_topic(topics[i], interval_ms, instance);
success = add_topic(topics[i], interval_ms, instance);
PX4_DEBUG("logging topic: %s(%d), interval: %i", topics[i]->o_name, instance, interval_ms);
break;
}
}
}
return (subscription != nullptr);
return success;
}
bool Logger::add_topic_multi(const char *name, uint32_t interval_ms)
bool Logger::add_topic_multi(const char *name, uint16_t interval_ms)
{
// add all possible instances
for (uint8_t instance = 0; instance < ORB_MULTI_MAX_INSTANCES; instance++) {
@@ -737,6 +744,56 @@ void Logger::initialize_mission_topics(MissionLogType type)
}
}
bool Logger::initialize_topics(MissionLogType mission_log_mode)
{
RequestedSubscriptionArray requested_subscriptions;
_requested_subscriptions = &requested_subscriptions;
// mission log topics if enabled (must be added first)
if (mission_log_mode != MissionLogType::Disabled) {
initialize_mission_topics((MissionLogType)mission_log_mode);
if (_num_mission_subs > 0) {
int mkdir_ret = mkdir(LOG_ROOT[(int)LogType::Mission], S_IRWXU | S_IRWXG | S_IRWXO);
if (mkdir_ret != 0 && errno != EEXIST) {
PX4_ERR("failed creating log root dir: %s (%i)", LOG_ROOT[(int)LogType::Mission], errno);
}
}
}
int ntopics = add_topics_from_file(PX4_STORAGEDIR "/etc/logging/logger_topics.txt");
if (ntopics > 0) {
PX4_INFO("logging %d topics from logger_topics.txt", ntopics);
} else {
initialize_configured_topics();
}
delete[](_subscriptions);
_subscriptions = nullptr;
// now that we know how many subscriptions we need, move _requested_subscriptions into _subscriptions
if (requested_subscriptions.count > 0) {
_subscriptions = new LoggerSubscription[requested_subscriptions.count];
if (!_subscriptions) {
PX4_ERR("alloc failed");
return false;
}
for (int i = 0; i < requested_subscriptions.count; ++i) {
const RequestedSubscription &sub = requested_subscriptions.sub[i];
_subscriptions[i] = LoggerSubscription(sub.topic, sub.interval_ms, sub.instance);
}
}
_num_subscriptions = requested_subscriptions.count;
_requested_subscriptions = nullptr;
return true;
}
void Logger::initialize_configured_topics()
{
// get the logging profile
@@ -833,38 +890,23 @@ void Logger::run()
uORB::Subscription parameter_update_sub(ORB_ID(parameter_update));
// mission log topics if enabled (must be added first)
int32_t mission_log_mode = 0;
if (_mission_log != PARAM_INVALID) {
param_get(_mission_log, &mission_log_mode);
initialize_mission_topics((MissionLogType)mission_log_mode);
if (_num_mission_subs > 0) {
int mkdir_ret = mkdir(LOG_ROOT[(int)LogType::Mission], S_IRWXU | S_IRWXG | S_IRWXO);
if (mkdir_ret != 0 && errno != EEXIST) {
PX4_ERR("failed creating log root dir: %s (%i)", LOG_ROOT[(int)LogType::Mission], errno);
}
}
}
int ntopics = add_topics_from_file(PX4_STORAGEDIR "/etc/logging/logger_topics.txt");
if (ntopics > 0) {
PX4_INFO("logging %d topics from logger_topics.txt", ntopics);
} else {
initialize_configured_topics();
if (!initialize_topics((MissionLogType)mission_log_mode)) {
return;
}
//all topics added. Get required message buffer size
int max_msg_size = 0;
for (const auto &subscription : _subscriptions) {
for (int sub = 0; sub < _num_subscriptions; ++sub) {
//use o_size, because that's what orb_copy will use
if (subscription.get_topic()->o_size > max_msg_size) {
max_msg_size = subscription.get_topic()->o_size;
if (_subscriptions[sub].get_topic()->o_size > max_msg_size) {
max_msg_size = _subscriptions[sub].get_topic()->o_size;
}
}
@@ -994,9 +1036,8 @@ void Logger::run()
/* wait for lock on log buffer */
_writer.lock();
int sub_idx = 0;
for (LoggerSubscription &sub : _subscriptions) {
for (int sub_idx = 0; sub_idx < _num_subscriptions; ++sub_idx) {
LoggerSubscription &sub = _subscriptions[sub_idx];
/* if this topic has been updated, copy the new data into the message buffer
* and write a message to the log
*/
@@ -1040,8 +1081,6 @@ void Logger::run()
}
}
}
++sub_idx;
}
// check for new logging message(s)
@@ -1101,7 +1140,7 @@ void Logger::run()
/* subscription update */
if (next_subscribe_topic_index != -1) {
if (++next_subscribe_topic_index >= (int)_subscriptions.size()) {
if (++next_subscribe_topic_index >= _num_subscriptions) {
next_subscribe_topic_index = -1;
next_subscribe_check = loop_time + TRY_SUBSCRIBE_INTERVAL;
}
@@ -1122,7 +1161,7 @@ void Logger::run()
_subscriptions[next_subscribe_topic_index].subscribe();
}
if (++next_subscribe_topic_index >= (int)_subscriptions.size()) {
if (++next_subscribe_topic_index >= _num_subscriptions) {
next_subscribe_topic_index = -1;
next_subscribe_check = loop_time + TRY_SUBSCRIBE_INTERVAL;
}
@@ -1812,13 +1851,13 @@ void Logger::write_formats(LogType type)
WrittenFormats written_formats;
// write all subscribed formats
size_t sub_count = _subscriptions.size();
int sub_count = _num_subscriptions;
if (type == LogType::Mission) {
sub_count = _num_mission_subs;
}
for (size_t i = 0; i < sub_count; ++i) {
for (int i = 0; i < sub_count; ++i) {
const LoggerSubscription &sub = _subscriptions[i];
write_format(type, *sub.get_topic(), written_formats, msg, i);
}
@@ -1830,13 +1869,13 @@ void Logger::write_all_add_logged_msg(LogType type)
{
_writer.lock();
size_t sub_count = _subscriptions.size();
int sub_count = _num_subscriptions;
if (type == LogType::Mission) {
sub_count = _num_mission_subs;
}
for (size_t i = 0; i < sub_count; ++i) {
for (int i = 0; i < sub_count; ++i) {
LoggerSubscription &sub = _subscriptions[i];
if (sub.valid()) {