uORB simplify handling of subscriptions with configured intervals

This commit is contained in:
Daniel Agar 2019-01-20 10:40:42 -05:00
parent 8e6708bdcb
commit c4c3bbbfb0
2 changed files with 39 additions and 214 deletions

View File

@ -115,7 +115,8 @@ uORB::DeviceNode::open(cdev::file_t *filp)
}
/* If there were any previous publications, allow the subscriber to read them */
sd->generation = _generation - (_queue_size < _generation ? _queue_size : _generation);
const unsigned gen = published_message_count();
sd->generation = gen - (_queue_size < gen ? _queue_size : gen);
filp->f_priv = (void *)sd;
@ -150,10 +151,6 @@ uORB::DeviceNode::close(cdev::file_t *filp)
SubscriberData *sd = filp_to_sd(filp);
if (sd != nullptr) {
if (sd->update_interval) {
hrt_cancel(&sd->update_interval->update_call);
}
remove_internal_subscriber();
delete sd;
@ -167,8 +164,6 @@ uORB::DeviceNode::close(cdev::file_t *filp)
ssize_t
uORB::DeviceNode::read(cdev::file_t *filp, char *buffer, size_t buflen)
{
SubscriberData *sd = (SubscriberData *)filp_to_sd(filp);
/* if the object has not been written yet, return zero */
if (_data == nullptr) {
return 0;
@ -179,18 +174,22 @@ uORB::DeviceNode::read(cdev::file_t *filp, char *buffer, size_t buflen)
return -EIO;
}
SubscriberData *sd = (SubscriberData *)filp_to_sd(filp);
/*
* Perform an atomic copy & state update
*/
ATOMIC_ENTER;
if (_generation > sd->generation + _queue_size) {
const unsigned gen = published_message_count();
if (gen > sd->generation + _queue_size) {
/* Reader is too far behind: some messages are lost */
_lost_messages += _generation - (sd->generation + _queue_size);
sd->generation = _generation - _queue_size;
_lost_messages += gen - (sd->generation + _queue_size);
sd->generation = gen - _queue_size;
}
if (_generation == sd->generation && sd->generation > 0) {
if (gen == sd->generation && sd->generation > 0) {
/* The subscriber already read the latest message, but nothing new was published yet.
* Return the previous message
*/
@ -202,15 +201,14 @@ uORB::DeviceNode::read(cdev::file_t *filp, char *buffer, size_t buflen)
memcpy(buffer, _data + (_meta->o_size * (sd->generation % _queue_size)), _meta->o_size);
}
if (sd->generation < _generation) {
if (sd->generation < gen) {
++sd->generation;
}
/*
* Clear the flag that indicates that an update has been reported, as
* we have just collected it.
*/
sd->set_update_reported(false);
// if subscriber has an interval track the last update time
if (sd->update_interval) {
sd->update_interval->last_update = _last_update;
}
ATOMIC_LEAVE;
@ -230,9 +228,11 @@ uORB::DeviceNode::write(cdev::file_t *filp, const char *buffer, size_t buflen)
* Note that filp will usually be NULL.
*/
if (nullptr == _data) {
#ifdef __PX4_NUTTX
if (!up_interrupt_context()) {
#endif /* __PX4_NUTTX */
lock();
@ -242,18 +242,11 @@ uORB::DeviceNode::write(cdev::file_t *filp, const char *buffer, size_t buflen)
}
unlock();
#ifdef __PX4_NUTTX
}
#else
lock();
/* re-check size */
if (nullptr == _data) {
_data = new uint8_t[_meta->o_size * _queue_size];
}
unlock();
#endif
#endif /* __PX4_NUTTX */
/* failed or could not allocate */
if (nullptr == _data) {
@ -298,15 +291,12 @@ uORB::DeviceNode::ioctl(cdev::file_t *filp, int cmd, unsigned long arg)
return PX4_OK;
}
case ORBIOCUPDATED:
#ifndef __PX4_NUTTX
lock();
#endif
*(bool *)arg = appears_updated(sd);
#ifndef __PX4_NUTTX
unlock();
#endif
return PX4_OK;
case ORBIOCUPDATED: {
ATOMIC_ENTER;
*(bool *)arg = appears_updated(sd);
ATOMIC_LEAVE;
return PX4_OK;
}
case ORBIOCSETINTERVAL: {
int ret = PX4_OK;
@ -321,19 +311,12 @@ uORB::DeviceNode::ioctl(cdev::file_t *filp, int cmd, unsigned long arg)
} else {
if (sd->update_interval) {
sd->update_interval->interval = arg;
#ifndef __PX4_NUTTX
sd->update_interval->last_update = hrt_absolute_time();
#endif
} else {
sd->update_interval = new UpdateIntervalData();
if (sd->update_interval) {
memset(&sd->update_interval->update_call, 0, sizeof(hrt_call));
sd->update_interval->interval = arg;
#ifndef __PX4_NUTTX
sd->update_interval->last_update = hrt_absolute_time();
#endif
} else {
ret = -ENOMEM;
@ -505,162 +488,23 @@ uORB::DeviceNode::poll_notify_one(px4_pollfd_struct_t *fds, pollevent_t events)
}
}
#ifdef __PX4_NUTTX
bool
uORB::DeviceNode::appears_updated(SubscriberData *sd)
{
/* assume it doesn't look updated */
bool ret = false;
/* avoid racing between interrupt and non-interrupt context calls */
irqstate_t state = px4_enter_critical_section();
/* check if this topic has been published yet, if not bail out */
if (_data == nullptr) {
ret = false;
goto out;
}
/*
* If the subscriber's generation count matches the update generation
* count, there has been no update from their perspective; if they
* don't match then we might have a visible update.
*/
while (sd->generation != _generation) {
/*
* Handle non-rate-limited subscribers.
*/
if (sd->update_interval == nullptr) {
ret = true;
break;
}
/*
* If we have previously told the subscriber that there is data,
* and they have not yet collected it, continue to tell them
* that there has been an update. This mimics the non-rate-limited
* behaviour where checking / polling continues to report an update
* until the topic is read.
*/
if (sd->update_reported()) {
ret = true;
break;
}
/*
* If the interval timer is still running, the topic should not
* appear updated, even though at this point we know that it has.
* We have previously been through here, so the subscriber
* must have collected the update we reported, otherwise
* update_reported would still be true.
*/
if (!hrt_called(&sd->update_interval->update_call)) {
break;
}
/*
* Make sure that we don't consider the topic to be updated again
* until the interval has passed once more by restarting the interval
* timer and thereby re-scheduling a poll notification at that time.
*/
hrt_call_after(&sd->update_interval->update_call,
sd->update_interval->interval,
&uORB::DeviceNode::update_deferred_trampoline,
(void *)this);
/*
* Remember that we have told the subscriber that there is data.
*/
sd->set_update_reported(true);
ret = true;
break;
}
out:
px4_leave_critical_section(state);
/* consider it updated */
return ret;
}
#else
bool
uORB::DeviceNode::appears_updated(SubscriberData *sd)
{
/* assume it doesn't look updated */
bool ret = false;
/* check if this topic has been published yet, if not bail out */
// check if this topic has been published yet, if not bail out
if (_data == nullptr) {
return false;
}
/*
* If the subscriber's generation count matches the update generation
* count, there has been no update from their perspective; if they
* don't match then we might have a visible update.
*/
while (sd->generation != _generation) {
/*
* Handle non-rate-limited subscribers.
*/
if (sd->update_interval == nullptr) {
ret = true;
break;
// if subscriber has interval check time since last update
if (sd->update_interval != nullptr) {
if (hrt_elapsed_time(&sd->update_interval->last_update) < sd->update_interval->interval) {
return false;
}
/*
* If we have previously told the subscriber that there is data,
* and they have not yet collected it, continue to tell them
* that there has been an update. This mimics the non-rate-limited
* behaviour where checking / polling continues to report an update
* until the topic is read.
*/
if (sd->update_reported()) {
ret = true;
break;
}
// If we have not yet reached the deadline, then assume that we can ignore any
// newly received data.
if (sd->update_interval->last_update + sd->update_interval->interval > hrt_absolute_time()) {
break;
}
/*
* Remember that we have told the subscriber that there is data.
*/
sd->set_update_reported(true);
sd->update_interval->last_update = hrt_absolute_time();
ret = true;
break;
}
return ret;
}
#endif /* ifdef __PX4_NUTTX */
void
uORB::DeviceNode::update_deferred()
{
/*
* Instigate a poll notification; any subscribers whose intervals have
* expired will be woken.
*/
poll_notify(POLLIN);
}
void
uORB::DeviceNode::update_deferred_trampoline(void *arg)
{
uORB::DeviceNode *node = (uORB::DeviceNode *)arg;
node->update_deferred();
// finally, compare the generation
return (sd->generation != published_message_count());
}
bool

View File

@ -202,24 +202,17 @@ protected:
void poll_notify_one(px4_pollfd_struct_t *fds, pollevent_t events) override;
private:
struct UpdateIntervalData {
struct hrt_call update_call; /**< deferred wakeup call if update_period is nonzero */
#ifndef __PX4_NUTTX
uint64_t last_update; /**< time at which the last update was provided, used when update_interval is nonzero */
#endif
unsigned interval; /**< if nonzero minimum interval between updates */
bool update_reported;
uint64_t last_update{0}; /**< time at which the last update was provided, used when update_interval is nonzero */
unsigned interval{0}; /**< if nonzero minimum interval between updates */
};
struct SubscriberData {
~SubscriberData() { if (update_interval) { delete (update_interval); } }
unsigned generation; /**< last generation the subscriber has seen */
UpdateIntervalData *update_interval; /**< if null, no update interval */
// these flags are only used if update_interval != null
bool update_reported() const { return update_interval ? update_interval->update_reported : false; }
void set_update_reported(bool update_reported_flag)
{ if (update_interval) { update_interval->update_reported = update_reported_flag; } }
unsigned generation{0}; /**< last generation the subscriber has seen */
UpdateIntervalData *update_interval{nullptr}; /**< if null, no update interval */
};
const orb_metadata *_meta; /**< object metadata information */
@ -241,18 +234,6 @@ private:
inline static SubscriberData *filp_to_sd(cdev::file_t *filp);
/**
* Perform a deferred update for a rate-limited subscriber.
*/
void update_deferred();
/**
* Bridge from hrt_call to update_deferred
*
* void *arg ORBDevNode pointer for which the deferred update is performed.
*/
static void update_deferred_trampoline(void *arg);
/**
* Check whether a topic appears updated to a subscriber.
*