From ce193eb0444478dc61e832497ab7f4a7ae73c0c6 Mon Sep 17 00:00:00 2001 From: Adam Dickmeiss Date: Thu, 30 Mar 2006 13:29:23 +0000 Subject: [PATCH] Allos Msg_Thread to operate in non-threaded mode --- src/msg-thread.cpp | 130 ++++++++++++++++++++++++++++++++++------------------ src/msg-thread.h | 19 ++------ 2 files changed, 88 insertions(+), 61 deletions(-) diff --git a/src/msg-thread.cpp b/src/msg-thread.cpp index 865896a..e87231c 100644 --- a/src/msg-thread.cpp +++ b/src/msg-thread.cpp @@ -1,4 +1,4 @@ -/* $Id: msg-thread.cpp,v 1.12 2006-03-30 10:31:17 adam Exp $ +/* $Id: msg-thread.cpp,v 1.13 2006-03-30 13:29:23 adam Exp $ Copyright (c) 1998-2006, Index Data. This file is part of the yazproxy. @@ -18,7 +18,11 @@ along with YAZ proxy; see the file LICENSE. If not, write to the Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */ + +#if YAZ_POSIX_THREADS #include +#endif + #include #include #include @@ -30,6 +34,22 @@ Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA using namespace yazpp_1; +struct Msg_Thread::Private { +public: + int m_no_threads; + Msg_Thread_Queue m_input; + Msg_Thread_Queue m_output; +#if YAZ_POSIX_THREADS + int m_fd[2]; + yazpp_1::ISocketObservable *m_SocketObservable; + pthread_t *m_thread_id; + pthread_mutex_t m_mutex_input_data; + pthread_cond_t m_cond_input_data; + pthread_mutex_t m_mutex_output_data; + bool m_stop_flag; +#endif +}; + IMsg_Thread::~IMsg_Thread() { @@ -70,98 +90,118 @@ IMsg_Thread *Msg_Thread_Queue::dequeue() return m; } +#if YAZ_POSIX_THREADS static void *tfunc(void *p) { Msg_Thread *pt = (Msg_Thread *) p; pt->run(0); return 0; } - +#endif Msg_Thread::Msg_Thread(ISocketObservable *obs, int no_threads) - : m_SocketObservable(obs) { - pipe(m_fd); - obs->addObserver(m_fd[0], this); + m_p = new Private; + +#if YAZ_POSIX_THREADS + m_p->m_SocketObservable = obs; + + pipe(m_p->m_fd); + obs->addObserver(m_p->m_fd[0], this); obs->maskObserver(this, SOCKET_OBSERVE_READ); - m_stop_flag = false; - pthread_mutex_init(&m_mutex_input_data, 0); - pthread_cond_init(&m_cond_input_data, 0); - pthread_mutex_init(&m_mutex_output_data, 0); + m_p->m_stop_flag = false; + pthread_mutex_init(&m_p->m_mutex_input_data, 0); + pthread_cond_init(&m_p->m_cond_input_data, 0); + pthread_mutex_init(&m_p->m_mutex_output_data, 0); - m_no_threads = no_threads; - m_thread_id = new pthread_t[no_threads]; + m_p->m_no_threads = no_threads; + m_p->m_thread_id = new pthread_t[no_threads]; int i; - for (i = 0; im_no_threads; i++) + pthread_create(&m_p->m_thread_id[i], 0, tfunc, this); +#endif } Msg_Thread::~Msg_Thread() { - pthread_mutex_lock(&m_mutex_input_data); - m_stop_flag = true; - pthread_cond_broadcast(&m_cond_input_data); - pthread_mutex_unlock(&m_mutex_input_data); +#if YAZ_POSIX_THREADS + pthread_mutex_lock(&m_p->m_mutex_input_data); + m_p->m_stop_flag = true; + pthread_cond_broadcast(&m_p->m_cond_input_data); + pthread_mutex_unlock(&m_p->m_mutex_input_data); int i; - for (i = 0; im_no_threads; i++) + pthread_join(m_p->m_thread_id[i], 0); + delete [] m_p->m_thread_id; + + m_p->m_SocketObservable->deleteObserver(this); - m_SocketObservable->deleteObserver(this); + pthread_cond_destroy(&m_p->m_cond_input_data); + pthread_mutex_destroy(&m_p->m_mutex_input_data); + pthread_mutex_destroy(&m_p->m_mutex_output_data); + close(m_p->m_fd[0]); + close(m_p->m_fd[1]); +#endif - pthread_cond_destroy(&m_cond_input_data); - pthread_mutex_destroy(&m_mutex_input_data); - pthread_mutex_destroy(&m_mutex_output_data); - close(m_fd[0]); - close(m_fd[1]); + delete m_p; } void Msg_Thread::socketNotify(int event) { +#if HAVE_POSIX_THREADS if (event & SOCKET_OBSERVE_READ) { char buf[2]; - read(m_fd[0], buf, 1); - pthread_mutex_lock(&m_mutex_output_data); - IMsg_Thread *out = m_output.dequeue(); - pthread_mutex_unlock(&m_mutex_output_data); + read(m_p->m_fd[0], buf, 1); + pthread_mutex_lock(&m_p->m_mutex_output_data); + IMsg_Thread *out = m_p->m_output.dequeue(); + pthread_mutex_unlock(&m_p->m_mutex_output_data); if (out) out->result(); } +#endif } +#if YAZ_POSIX_THREADS void Msg_Thread::run(void *p) { while(1) { - pthread_mutex_lock(&m_mutex_input_data); - while (!m_stop_flag && m_input.size() == 0) - pthread_cond_wait(&m_cond_input_data, &m_mutex_input_data); - if (m_stop_flag) + pthread_mutex_lock(&m_p->m_mutex_input_data); + while (!m_p->m_stop_flag && m_p->m_input.size() == 0) + pthread_cond_wait(&m_p->m_cond_input_data, &m_p->m_mutex_input_data); + if (m_p->m_stop_flag) { - pthread_mutex_unlock(&m_mutex_input_data); + pthread_mutex_unlock(&m_p->m_mutex_input_data); break; } - IMsg_Thread *in = m_input.dequeue(); - pthread_mutex_unlock(&m_mutex_input_data); + IMsg_Thread *in = m_p->m_input.dequeue(); + pthread_mutex_unlock(&m_p->m_mutex_input_data); IMsg_Thread *out = in->handle(); - pthread_mutex_lock(&m_mutex_output_data); - m_output.enqueue(out); + pthread_mutex_lock(&m_p->m_mutex_output_data); + m_p->m_output.enqueue(out); - write(m_fd[1], "", 1); - pthread_mutex_unlock(&m_mutex_output_data); + write(m_p->m_fd[1], "", 1); + pthread_mutex_unlock(&m_p->m_mutex_output_data); } } +#endif void Msg_Thread::put(IMsg_Thread *m) { - pthread_mutex_lock(&m_mutex_input_data); - m_input.enqueue(m); - pthread_cond_signal(&m_cond_input_data); - pthread_mutex_unlock(&m_mutex_input_data); +#if YAZ_POSIX_THREADS + pthread_mutex_lock(&m_p->m_mutex_input_data); + m_p->m_input.enqueue(m); + pthread_cond_signal(&m_p->m_cond_input_data); + pthread_mutex_unlock(&m_p->m_mutex_input_data); +#else + IMsg_Thread *out = m->handle(); + if (out) + out->result(); +#endif } /* * Local variables: diff --git a/src/msg-thread.h b/src/msg-thread.h index 8e8a565..b7e4e57 100644 --- a/src/msg-thread.h +++ b/src/msg-thread.h @@ -1,4 +1,4 @@ -/* $Id: msg-thread.h,v 1.9 2006-03-30 10:31:25 adam Exp $ +/* $Id: msg-thread.h,v 1.10 2006-03-30 13:29:23 adam Exp $ Copyright (c) 1998-2006, Index Data. This file is part of the yazproxy. @@ -19,14 +19,9 @@ Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */ -#include #include #include -#if HAVE_DLFCN_H -#include -#endif - #include #include @@ -55,6 +50,7 @@ class Msg_Thread_Queue { }; class Msg_Thread : public yazpp_1::ISocketObserver { + class Private; public: Msg_Thread(yazpp_1::ISocketObservable *obs, int no_threads); virtual ~Msg_Thread(); @@ -62,17 +58,8 @@ class Msg_Thread : public yazpp_1::ISocketObserver { void put(IMsg_Thread *m); IMsg_Thread *get(); void run(void *p); - int m_fd[2]; private: - yazpp_1::ISocketObservable *m_SocketObservable; - int m_no_threads; - pthread_t *m_thread_id; - Msg_Thread_Queue m_input; - Msg_Thread_Queue m_output; - pthread_mutex_t m_mutex_input_data; - pthread_cond_t m_cond_input_data; - pthread_mutex_t m_mutex_output_data; - bool m_stop_flag; + class Private *m_p; }; /* -- 1.7.10.4