From: Adam Dickmeiss Date: Wed, 19 May 2010 13:21:42 +0000 (+0200) Subject: Use threading utilities X-Git-Tag: v1.4.1~15 X-Git-Url: http://sru.miketaylor.org.uk/?a=commitdiff_plain;h=5775b27098fab0dac8ddcbc719a8b35c67391424;p=pazpar2-moved-to-github.git Use threading utilities --- diff --git a/src/sel_thread.c b/src/sel_thread.c index 0677761..1e13d24 100644 --- a/src/sel_thread.c +++ b/src/sel_thread.c @@ -26,7 +26,8 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA #include #include #include -#include +#include +#include #include struct work_item { @@ -57,9 +58,9 @@ static void queue_trav(struct work_item *q, void (*f)(void *data)) struct sel_thread { int fd[2]; NMEM nmem; - pthread_t *thread_id; - pthread_mutex_t mutex; - pthread_cond_t input_data; + yaz_thread_t *thread_id; + YAZ_MUTEX mutex; + YAZ_COND input_data; int stop_flag; int no_threads; struct work_item *input_queue; @@ -79,9 +80,9 @@ static void *sel_thread_handler(void *vp) { struct work_item *work_this = 0; /* wait for some work */ - pthread_mutex_lock(&p->mutex); + yaz_mutex_enter(p->mutex); while (!p->stop_flag && !p->input_queue) - pthread_cond_wait(&p->input_data, &p->mutex); + yaz_cond_wait(p->input_data, p->mutex, 0); /* see if we were waken up because we're shutting down */ if (p->stop_flag) break; @@ -93,21 +94,21 @@ static void *sel_thread_handler(void *vp) yaz_log(YLOG_DEBUG, "input queue length after pop: %d", input_queue_length); assert(work_this); - pthread_mutex_unlock(&p->mutex); + yaz_mutex_leave(p->mutex); /* work on this item */ p->work_handler(work_this->data); /* put it back into output queue */ - pthread_mutex_lock(&p->mutex); + yaz_mutex_enter(p->mutex); work_this->next = p->output_queue; p->output_queue = work_this; - pthread_mutex_unlock(&p->mutex); + yaz_mutex_leave(p->mutex); /* wake up select/poll with a single byte */ (void) write(p->fd[1], "", 1); } - pthread_mutex_unlock(&p->mutex); + yaz_mutex_leave(p->mutex); return 0; } @@ -136,28 +137,34 @@ sel_thread_t sel_thread_create(void (*work_handler)(void *work_data), p->free_queue = 0; p->work_handler = work_handler; p->work_destroy = work_destroy; - + p->no_threads = 0; /* we if need to destroy */ p->stop_flag = 0; - p->no_threads = no_of_threads; - pthread_mutex_init(&p->mutex, 0); - pthread_cond_init(&p->input_data, 0); + p->mutex = 0; + yaz_mutex_create(&p->mutex); + yaz_cond_create(&p->input_data); + if (p->input_data == 0) /* condition variable could not be created? */ + { + sel_thread_destroy(p); + return 0; + } + p->no_threads = no_of_threads; p->thread_id = nmem_malloc(nmem, sizeof(*p->thread_id) * p->no_threads); for (i = 0; i < p->no_threads; i++) - pthread_create(p->thread_id + i, 0, sel_thread_handler, p); + p->thread_id[i] = yaz_thread_create(sel_thread_handler, p); return p; } void sel_thread_destroy(sel_thread_t p) { int i; - pthread_mutex_lock(&p->mutex); + yaz_mutex_enter(p->mutex); p->stop_flag = 1; - pthread_cond_broadcast(&p->input_data); - pthread_mutex_unlock(&p->mutex); + yaz_cond_broadcast(p->input_data); + yaz_mutex_leave(p->mutex); for (i = 0; i< p->no_threads; i++) - pthread_join(p->thread_id[i], 0); + yaz_thread_join(&p->thread_id[i], 0); if (p->work_destroy) { @@ -167,8 +174,8 @@ void sel_thread_destroy(sel_thread_t p) close(p->fd[0]); close(p->fd[1]); - pthread_cond_destroy(&p->input_data); - pthread_mutex_destroy(&p->mutex); + yaz_cond_destroy(&p->input_data); + yaz_mutex_destroy(&p->mutex); nmem_destroy(p->nmem); } @@ -176,7 +183,7 @@ void sel_thread_add(sel_thread_t p, void *data) { struct work_item *work_p; - pthread_mutex_lock(&p->mutex); + yaz_mutex_enter(p->mutex); if (p->free_queue) { @@ -191,8 +198,8 @@ void sel_thread_add(sel_thread_t p, void *data) p->input_queue = work_p; input_queue_length++; yaz_log(YLOG_DEBUG, "sel_thread_add: Input queue length after push: %d", input_queue_length); - pthread_cond_signal(&p->input_data); - pthread_mutex_unlock(&p->mutex); + yaz_cond_signal(p->input_data); + yaz_mutex_leave(p->mutex); } void *sel_thread_result(sel_thread_t p) @@ -201,7 +208,7 @@ void *sel_thread_result(sel_thread_t p) void *data = 0; char read_buf[1]; - pthread_mutex_lock(&p->mutex); + yaz_mutex_enter(p->mutex); /* got something. Take the last one out of output_queue */ work_this = queue_remove_last(&p->output_queue); @@ -214,7 +221,7 @@ void *sel_thread_result(sel_thread_t p) data = work_this->data; (void) read(p->fd[0], read_buf, 1); } - pthread_mutex_unlock(&p->mutex); + yaz_mutex_leave(p->mutex); return data; }