dataman: Move to semaphore abstraction

This commit is contained in:
Lorenz Meier 2015-09-20 00:28:39 +02:00
parent aba2d007df
commit fa27e59ac4

View File

@ -42,6 +42,7 @@
#include <px4_config.h>
#include <px4_defines.h>
#include <px4_posix.h>
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
@ -82,7 +83,7 @@ typedef enum {
/** Work task work item */
typedef struct {
sq_entry_t link; /**< list linkage */
sem_t wait_sem;
px4_sem_t wait_sem;
unsigned char first;
unsigned char func;
ssize_t result;
@ -128,8 +129,8 @@ static const unsigned g_per_item_max_index[DM_KEY_NUM_KEYS] = {
static unsigned int g_key_offsets[DM_KEY_NUM_KEYS];
/* Item type lock mutexes */
static sem_t *g_item_locks[DM_KEY_NUM_KEYS];
static sem_t g_sys_state_mutex;
static px4_sem_t *g_item_locks[DM_KEY_NUM_KEYS];
static px4_sem_t g_sys_state_mutex;
/* The data manager store file handle and file name */
static int g_fd = -1, g_task_fd = -1;
@ -140,7 +141,7 @@ static char *k_data_manager_device_path = NULL;
typedef struct {
sq_queue_t q; /* Nuttx queue */
sem_t mutex; /* Mutual exclusion on work queue adds and deletes */
px4_sem_t mutex; /* Mutual exclusion on work queue adds and deletes */
unsigned size; /* Current size of queue */
unsigned max_size; /* Maximum queue size reached */
} work_q_t;
@ -148,8 +149,8 @@ typedef struct {
static work_q_t g_free_q; /* queue of free work items. So that we don't always need to call malloc and free*/
static work_q_t g_work_q; /* pending work items. To be consumed by worker thread */
sem_t g_work_queued_sema; /* To notify worker thread a work item has been queued */
sem_t g_init_sema;
px4_sem_t g_work_queued_sema; /* To notify worker thread a work item has been queued */
px4_sem_t g_init_sema;
static bool g_task_should_exit; /**< if true, dataman task should exit */
@ -159,26 +160,26 @@ static const unsigned k_sector_size = DM_MAX_DATA_SIZE + DM_SECTOR_HDR_SIZE; /*
static void init_q(work_q_t *q)
{
sq_init(&(q->q)); /* Initialize the NuttX queue structure */
sem_init(&(q->mutex), 1, 1); /* Queue is initially unlocked */
px4_sem_init(&(q->mutex), 1, 1); /* Queue is initially unlocked */
q->size = q->max_size = 0; /* Queue is initially empty */
}
static inline void
destroy_q(work_q_t *q)
{
sem_destroy(&(q->mutex)); /* Destroy the queue lock */
px4_sem_destroy(&(q->mutex)); /* Destroy the queue lock */
}
static inline void
lock_queue(work_q_t *q)
{
sem_wait(&(q->mutex)); /* Acquire the queue lock */
px4_sem_wait(&(q->mutex)); /* Acquire the queue lock */
}
static inline void
unlock_queue(work_q_t *q)
{
sem_post(&(q->mutex)); /* Release the queue lock */
px4_sem_post(&(q->mutex)); /* Release the queue lock */
}
static work_q_item_t *
@ -221,7 +222,7 @@ create_work_item(void)
/* If we got one then lock the item*/
if (item) {
sem_init(&item->wait_sem, 1, 0); /* Caller will wait on this... initially locked */
px4_sem_init(&item->wait_sem, 1, 0); /* Caller will wait on this... initially locked */
}
/* return the item pointer, or NULL if all failed */
@ -233,7 +234,7 @@ create_work_item(void)
static inline void
destroy_work_item(work_q_item_t *item)
{
sem_destroy(&item->wait_sem); /* Destroy the item lock */
px4_sem_destroy(&item->wait_sem); /* Destroy the item lock */
/* Return the item to the free item queue for later reuse */
lock_queue(&g_free_q);
sq_addfirst(&item->link, &(g_free_q.q));
@ -277,10 +278,10 @@ enqueue_work_item_and_wait_for_result(work_q_item_t *item)
unlock_queue(&g_work_q);
/* tell the work thread that work is available */
sem_post(&g_work_queued_sema);
px4_sem_post(&g_work_queued_sema);
/* wait for the result */
sem_wait(&item->wait_sem);
px4_sem_wait(&item->wait_sem);
int result = item->result;
@ -628,7 +629,7 @@ dm_lock(dm_item_t item)
}
if (g_item_locks[item]) {
sem_wait(g_item_locks[item]);
px4_sem_wait(g_item_locks[item]);
}
}
@ -645,7 +646,7 @@ dm_unlock(dm_item_t item)
}
if (g_item_locks[item]) {
sem_post(g_item_locks[item]);
px4_sem_post(g_item_locks[item]);
}
}
@ -691,7 +692,7 @@ task_main(int argc, char *argv[])
}
/* Initialize the item type locks, for now only DM_KEY_MISSION_STATE supports locking */
sem_init(&g_sys_state_mutex, 1, 1); /* Initially unlocked */
px4_sem_init(&g_sys_state_mutex, 1, 1); /* Initially unlocked */
for (unsigned i = 0; i < DM_KEY_NUM_KEYS; i++) {
g_item_locks[i] = NULL;
@ -704,7 +705,7 @@ task_main(int argc, char *argv[])
init_q(&g_work_q);
init_q(&g_free_q);
sem_init(&g_work_queued_sema, 1, 0);
px4_sem_init(&g_work_queued_sema, 1, 0);
/* See if the data manage file exists and is a multiple of the sector size */
g_task_fd = open(k_data_manager_device_path, O_RDONLY | O_BINARY);
@ -729,14 +730,14 @@ task_main(int argc, char *argv[])
if (g_task_fd < 0) {
warnx("Could not open data manager file %s", k_data_manager_device_path);
sem_post(&g_init_sema); /* Don't want to hang startup */
px4_sem_post(&g_init_sema); /* Don't want to hang startup */
return -1;
}
if ((unsigned)lseek(g_task_fd, max_offset, SEEK_SET) != max_offset) {
close(g_task_fd);
warnx("Could not seek data manager file %s", k_data_manager_device_path);
sem_post(&g_init_sema); /* Don't want to hang startup */
px4_sem_post(&g_init_sema); /* Don't want to hang startup */
return -1;
}
@ -771,7 +772,7 @@ task_main(int argc, char *argv[])
printf(", data manager file '%s' size is %d bytes\n", k_data_manager_device_path, max_offset);
/* Tell startup that the worker thread has completed its initialization */
sem_post(&g_init_sema);
px4_sem_post(&g_init_sema);
/* Start the endless loop, waiting for then processing work requests */
while (true) {
@ -784,7 +785,7 @@ task_main(int argc, char *argv[])
if (!g_task_should_exit) {
/* wait for work */
sem_wait(&g_work_queued_sema);
px4_sem_wait(&g_work_queued_sema);
}
/* Empty the work queue */
@ -821,7 +822,7 @@ task_main(int argc, char *argv[])
}
/* Inform the caller that work is done */
sem_post(&work->wait_sem);
px4_sem_post(&work->wait_sem);
}
/* time to go???? */
@ -846,8 +847,8 @@ task_main(int argc, char *argv[])
destroy_q(&g_work_q);
destroy_q(&g_free_q);
sem_destroy(&g_work_queued_sema);
sem_destroy(&g_sys_state_mutex);
px4_sem_destroy(&g_work_queued_sema);
px4_sem_destroy(&g_sys_state_mutex);
return 0;
}
@ -857,7 +858,7 @@ start(void)
{
int task;
sem_init(&g_init_sema, 1, 0);
px4_sem_init(&g_init_sema, 1, 0);
/* start the worker thread */
if ((task = px4_task_spawn_cmd("dataman", SCHED_DEFAULT, SCHED_PRIORITY_DEFAULT, 1500, task_main, NULL)) <= 0) {
@ -866,8 +867,8 @@ start(void)
}
/* wait for the thread to actually initialize */
sem_wait(&g_init_sema);
sem_destroy(&g_init_sema);
px4_sem_wait(&g_init_sema);
px4_sem_destroy(&g_init_sema);
return 0;
}
@ -888,7 +889,7 @@ stop(void)
{
/* Tell the worker task to shut down */
g_task_should_exit = true;
sem_post(&g_work_queued_sema);
px4_sem_post(&g_work_queued_sema);
}
static void