-/* $Id: msg-thread.cpp,v 1.12 2006-03-30 10:31:17 adam Exp $
+/* $Id: msg-thread.cpp,v 1.13 2006-03-30 13:29:23 adam Exp $
Copyright (c) 1998-2006, Index Data.
This file is part of the yazproxy.
Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
02111-1307, USA.
*/
+
+#if YAZ_POSIX_THREADS
#include <pthread.h>
+#endif
+
#include <unistd.h>
#include <ctype.h>
#include <stdio.h>
using namespace yazpp_1;
+struct Msg_Thread::Private {
+public:
+ int m_no_threads;
+ Msg_Thread_Queue m_input;
+ Msg_Thread_Queue m_output;
+#if YAZ_POSIX_THREADS
+ int m_fd[2];
+ yazpp_1::ISocketObservable *m_SocketObservable;
+ pthread_t *m_thread_id;
+ pthread_mutex_t m_mutex_input_data;
+ pthread_cond_t m_cond_input_data;
+ pthread_mutex_t m_mutex_output_data;
+ bool m_stop_flag;
+#endif
+};
+
IMsg_Thread::~IMsg_Thread()
{
return m;
}
+#if YAZ_POSIX_THREADS
static void *tfunc(void *p)
{
Msg_Thread *pt = (Msg_Thread *) p;
pt->run(0);
return 0;
}
-
+#endif
Msg_Thread::Msg_Thread(ISocketObservable *obs, int no_threads)
- : m_SocketObservable(obs)
{
- pipe(m_fd);
- obs->addObserver(m_fd[0], this);
+ m_p = new Private;
+
+#if YAZ_POSIX_THREADS
+ m_p->m_SocketObservable = obs;
+
+ pipe(m_p->m_fd);
+ obs->addObserver(m_p->m_fd[0], this);
obs->maskObserver(this, SOCKET_OBSERVE_READ);
- m_stop_flag = false;
- pthread_mutex_init(&m_mutex_input_data, 0);
- pthread_cond_init(&m_cond_input_data, 0);
- pthread_mutex_init(&m_mutex_output_data, 0);
+ m_p->m_stop_flag = false;
+ pthread_mutex_init(&m_p->m_mutex_input_data, 0);
+ pthread_cond_init(&m_p->m_cond_input_data, 0);
+ pthread_mutex_init(&m_p->m_mutex_output_data, 0);
- m_no_threads = no_threads;
- m_thread_id = new pthread_t[no_threads];
+ m_p->m_no_threads = no_threads;
+ m_p->m_thread_id = new pthread_t[no_threads];
int i;
- for (i = 0; i<m_no_threads; i++)
- pthread_create(&m_thread_id[i], 0, tfunc, this);
+ for (i = 0; i<m_p->m_no_threads; i++)
+ pthread_create(&m_p->m_thread_id[i], 0, tfunc, this);
+#endif
}
Msg_Thread::~Msg_Thread()
{
- pthread_mutex_lock(&m_mutex_input_data);
- m_stop_flag = true;
- pthread_cond_broadcast(&m_cond_input_data);
- pthread_mutex_unlock(&m_mutex_input_data);
+#if YAZ_POSIX_THREADS
+ pthread_mutex_lock(&m_p->m_mutex_input_data);
+ m_p->m_stop_flag = true;
+ pthread_cond_broadcast(&m_p->m_cond_input_data);
+ pthread_mutex_unlock(&m_p->m_mutex_input_data);
int i;
- for (i = 0; i<m_no_threads; i++)
- pthread_join(m_thread_id[i], 0);
- delete [] m_thread_id;
+ for (i = 0; i<m_p->m_no_threads; i++)
+ pthread_join(m_p->m_thread_id[i], 0);
+ delete [] m_p->m_thread_id;
+
+ m_p->m_SocketObservable->deleteObserver(this);
- m_SocketObservable->deleteObserver(this);
+ pthread_cond_destroy(&m_p->m_cond_input_data);
+ pthread_mutex_destroy(&m_p->m_mutex_input_data);
+ pthread_mutex_destroy(&m_p->m_mutex_output_data);
+ close(m_p->m_fd[0]);
+ close(m_p->m_fd[1]);
+#endif
- pthread_cond_destroy(&m_cond_input_data);
- pthread_mutex_destroy(&m_mutex_input_data);
- pthread_mutex_destroy(&m_mutex_output_data);
- close(m_fd[0]);
- close(m_fd[1]);
+ delete m_p;
}
void Msg_Thread::socketNotify(int event)
{
+#if HAVE_POSIX_THREADS
if (event & SOCKET_OBSERVE_READ)
{
char buf[2];
- read(m_fd[0], buf, 1);
- pthread_mutex_lock(&m_mutex_output_data);
- IMsg_Thread *out = m_output.dequeue();
- pthread_mutex_unlock(&m_mutex_output_data);
+ read(m_p->m_fd[0], buf, 1);
+ pthread_mutex_lock(&m_p->m_mutex_output_data);
+ IMsg_Thread *out = m_p->m_output.dequeue();
+ pthread_mutex_unlock(&m_p->m_mutex_output_data);
if (out)
out->result();
}
+#endif
}
+#if YAZ_POSIX_THREADS
void Msg_Thread::run(void *p)
{
while(1)
{
- pthread_mutex_lock(&m_mutex_input_data);
- while (!m_stop_flag && m_input.size() == 0)
- pthread_cond_wait(&m_cond_input_data, &m_mutex_input_data);
- if (m_stop_flag)
+ pthread_mutex_lock(&m_p->m_mutex_input_data);
+ while (!m_p->m_stop_flag && m_p->m_input.size() == 0)
+ pthread_cond_wait(&m_p->m_cond_input_data, &m_p->m_mutex_input_data);
+ if (m_p->m_stop_flag)
{
- pthread_mutex_unlock(&m_mutex_input_data);
+ pthread_mutex_unlock(&m_p->m_mutex_input_data);
break;
}
- IMsg_Thread *in = m_input.dequeue();
- pthread_mutex_unlock(&m_mutex_input_data);
+ IMsg_Thread *in = m_p->m_input.dequeue();
+ pthread_mutex_unlock(&m_p->m_mutex_input_data);
IMsg_Thread *out = in->handle();
- pthread_mutex_lock(&m_mutex_output_data);
- m_output.enqueue(out);
+ pthread_mutex_lock(&m_p->m_mutex_output_data);
+ m_p->m_output.enqueue(out);
- write(m_fd[1], "", 1);
- pthread_mutex_unlock(&m_mutex_output_data);
+ write(m_p->m_fd[1], "", 1);
+ pthread_mutex_unlock(&m_p->m_mutex_output_data);
}
}
+#endif
void Msg_Thread::put(IMsg_Thread *m)
{
- pthread_mutex_lock(&m_mutex_input_data);
- m_input.enqueue(m);
- pthread_cond_signal(&m_cond_input_data);
- pthread_mutex_unlock(&m_mutex_input_data);
+#if YAZ_POSIX_THREADS
+ pthread_mutex_lock(&m_p->m_mutex_input_data);
+ m_p->m_input.enqueue(m);
+ pthread_cond_signal(&m_p->m_cond_input_data);
+ pthread_mutex_unlock(&m_p->m_mutex_input_data);
+#else
+ IMsg_Thread *out = m->handle();
+ if (out)
+ out->result();
+#endif
}
/*
* Local variables: