orb: add unit tests for queuing, including tests with poll & notify interface

Both succeed under Posix & NuttX (Pixracer)
This commit is contained in:
Beat Küng
2016-05-20 15:29:32 +02:00
committed by Lorenz Meier
parent 5b1273e334
commit 79b3766544
2 changed files with 251 additions and 4 deletions
@@ -36,6 +36,8 @@
#include <px4_config.h>
#include <px4_time.h>
#include <stdio.h>
#include <errno.h>
#include <poll.h>
uORBTest::UnitTest &uORBTest::UnitTest::instance()
{
@@ -164,7 +166,19 @@ int uORBTest::UnitTest::test()
return ret;
}
return test_multi2();
ret = test_multi2();
if (ret != OK) {
return ret;
}
ret = test_queue();
if (ret != OK) {
return ret;
}
return test_queue_poll_notify();
}
int uORBTest::UnitTest::test_unadvertise()
@@ -557,6 +571,226 @@ int uORBTest::UnitTest::test_multi_reversed()
return test_note("PASS multi-topic reversed");
}
int uORBTest::UnitTest::test_queue()
{
test_note("Testing orb queuing");
struct orb_test_medium t, u;
int sfd;
orb_advert_t ptopic;
bool updated;
sfd = orb_subscribe(ORB_ID(orb_test_medium_queue));
if (sfd < 0) {
return test_fail("subscribe failed: %d", errno);
}
const int queue_size = 11;
t.val = 0;
ptopic = orb_advertise_queue(ORB_ID(orb_test_medium_queue), &t, queue_size);
if (ptopic == nullptr) {
return test_fail("advertise failed: %d", errno);
}
orb_check(sfd, &updated);
if (!updated) {
return test_fail("update flag not set");
}
if (PX4_OK != orb_copy(ORB_ID(orb_test_medium_queue), sfd, &u)) {
return test_fail("copy(1) failed: %d", errno);
}
if (u.val != t.val) {
return test_fail("copy(1) mismatch: %d expected %d", u.val, t.val);
}
orb_check(sfd, &updated);
if (updated) {
return test_fail("spurious updated flag");
}
#define CHECK_UPDATED(element) \
orb_check(sfd, &updated); \
if (!updated) { \
return test_fail("update flag not set, element %i", element); \
}
#define CHECK_NOT_UPDATED(element) \
orb_check(sfd, &updated); \
if (updated) { \
return test_fail("update flag set, element %i", element); \
}
#define CHECK_COPY(i_got, i_correct) \
orb_copy(ORB_ID(orb_test_medium_queue), sfd, &u); \
if (i_got != i_correct) { \
return test_fail("got wrong element from the queue (got %i, should be %i)", i_got, i_correct); \
}
//no messages in the queue anymore
test_note(" Testing to write some elements...");
for (int i = 0; i < queue_size - 2; ++i) {
t.val = i;
orb_publish(ORB_ID(orb_test_medium_queue), ptopic, &t);
}
for (int i = 0; i < queue_size - 2; ++i) {
CHECK_UPDATED(i);
CHECK_COPY(u.val, i);
}
CHECK_NOT_UPDATED(queue_size);
test_note(" Testing overflow...");
int overflow_by = 3;
for (int i = 0; i < queue_size + overflow_by; ++i) {
t.val = i;
orb_publish(ORB_ID(orb_test_medium_queue), ptopic, &t);
}
for (int i = 0; i < queue_size; ++i) {
CHECK_UPDATED(i);
CHECK_COPY(u.val, i + overflow_by);
}
CHECK_NOT_UPDATED(queue_size);
test_note(" Testing underflow...");
for (int i = 0; i < queue_size; ++i) {
CHECK_NOT_UPDATED(i);
CHECK_COPY(u.val, queue_size + overflow_by - 1);
}
t.val = 943;
orb_publish(ORB_ID(orb_test_medium_queue), ptopic, &t);
CHECK_UPDATED(-1);
CHECK_COPY(u.val, t.val);
#undef CHECK_COPY
#undef CHECK_UPDATED
#undef CHECK_NOT_UPDATED
orb_unadvertise(ptopic);
return test_note("PASS orb queuing");
}
int uORBTest::UnitTest::pub_test_queue_entry(char *const argv[])
{
uORBTest::UnitTest &t = uORBTest::UnitTest::instance();
return t.pub_test_queue_main();
}
int uORBTest::UnitTest::pub_test_queue_main()
{
struct orb_test_medium t;
orb_advert_t ptopic;
const int queue_size = 50;
t.val = 0;
if ((ptopic = orb_advertise_queue(ORB_ID(orb_test_medium_queue_poll), &t, queue_size)) == nullptr) {
_thread_should_exit = true;
return test_fail("advertise failed: %d", errno);
}
int message_counter = 0, num_messages = 20 * queue_size;
++t.val;
while (message_counter < num_messages) {
//simulate burst
int burst_counter = 0;
while (burst_counter++ < queue_size / 2 + 7) { //make interval non-boundary aligned
orb_publish(ORB_ID(orb_test_medium_queue_poll), ptopic, &t);
++t.val;
}
message_counter += burst_counter;
usleep(20 * 1000); //give subscriber a chance to catch up
}
_num_messages_sent = t.val;
usleep(100 * 1000);
_thread_should_exit = true;
orb_unadvertise(ptopic);
return 0;
}
int uORBTest::UnitTest::test_queue_poll_notify()
{
test_note("Testing orb queuing (poll & notify)");
struct orb_test_medium t;
int sfd;
if ((sfd = orb_subscribe(ORB_ID(orb_test_medium_queue_poll))) < 0) {
return test_fail("subscribe failed: %d", errno);
}
_thread_should_exit = false;
char *const args[1] = { NULL };
int pubsub_task = px4_task_spawn_cmd("uorb_test_queue",
SCHED_DEFAULT,
SCHED_PRIORITY_MIN + 5,
1500,
(px4_main_t)&uORBTest::UnitTest::pub_test_queue_entry,
args);
if (pubsub_task < 0) {
return test_fail("failed launching task");
}
int next_expected_val = 0;
px4_pollfd_struct_t fds[1];
fds[0].fd = sfd;
fds[0].events = POLLIN;
while (!_thread_should_exit) {
int poll_ret = px4_poll(fds, 1, 500);
if (poll_ret == 0) {
if (_thread_should_exit) {
break;
}
return test_fail("poll timeout");
} else if (poll_ret < 0) {
return test_fail("poll error (%d, %d)", poll_ret, errno);
}
if (fds[0].revents & POLLIN) {
orb_copy(ORB_ID(orb_test_medium_queue_poll), sfd, &t);
if (next_expected_val != t.val) {
return test_fail("copy mismatch: %d expected %d", t.val, next_expected_val);
}
++next_expected_val;
}
}
if (_num_messages_sent != next_expected_val) {
return test_fail("number of sent and received messages mismatch (sent: %i, received: %i)",
_num_messages_sent, next_expected_val);
}
return test_note("PASS orb queuing (poll & notify), got %i messages", next_expected_val);
}
int uORBTest::UnitTest::test_fail(const char *fmt, ...)
{
va_list ap;