-/* $Id: thread_pool_observer.cpp,v 1.8 2005-11-04 10:27:51 adam Exp $
+/* $Id: thread_pool_observer.cpp,v 1.9 2005-11-04 11:06:52 adam Exp $
Copyright (c) 2005, Index Data.
%LICENSE%
#include <winsock.h>
#endif
+#include <boost/thread/thread.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/condition.hpp>
+
#include <ctype.h>
#include <stdio.h>
#include "thread_pool_observer.hpp"
+namespace yp2 {
+ class ThreadPoolSocketObserver::Worker {
+ public:
+ Worker(ThreadPoolSocketObserver *s) : m_s(s) {};
+ ThreadPoolSocketObserver *m_s;
+ void operator() (void) {
+ m_s->run(0);
+ }
+ };
+
+ class ThreadPoolSocketObserver::Rep : public boost::noncopyable {
+ friend class ThreadPoolSocketObserver;
+ public:
+ Rep(yazpp_1::ISocketObservable *obs);
+ ~Rep();
+ private:
+ yazpp_1::ISocketObservable *m_socketObservable;
+ int m_fd[2];
+ boost::thread_group m_thrds;
+ boost::mutex m_mutex_input_data;
+ boost::condition m_cond_input_data;
+ boost::mutex m_mutex_output_data;
+ std::deque<IThreadPoolMsg *> m_input;
+ std::deque<IThreadPoolMsg *> m_output;
+ bool m_stop_flag;
+ int m_no_threads;
+ };
+}
+
+
using namespace yazpp_1;
using namespace yp2;
+ThreadPoolSocketObserver::Rep::Rep(ISocketObservable *obs)
+ : m_socketObservable(obs)
+{
+}
+
+ThreadPoolSocketObserver::Rep::~Rep()
+{
+}
+
IThreadPoolMsg::~IThreadPoolMsg()
{
}
-ThreadPoolSocketObserver::ThreadPoolSocketObserver(ISocketObservable *obs, int no_threads)
- : m_SocketObservable(obs)
+ThreadPoolSocketObserver::ThreadPoolSocketObserver(ISocketObservable *obs,
+ int no_threads)
+ : m_p(new Rep(obs))
{
- pipe(m_fd);
- obs->addObserver(m_fd[0], this);
+ pipe(m_p->m_fd);
+ obs->addObserver(m_p->m_fd[0], this);
obs->maskObserver(this, SOCKET_OBSERVE_READ);
- m_stop_flag = false;
- m_no_threads = no_threads;
+ m_p->m_stop_flag = false;
+ m_p->m_no_threads = no_threads;
int i;
for (i = 0; i<no_threads; i++)
{
Worker w(this);
- m_thrds.add_thread(new boost::thread(w));
+ m_p->m_thrds.add_thread(new boost::thread(w));
}
}
ThreadPoolSocketObserver::~ThreadPoolSocketObserver()
{
{
- boost::mutex::scoped_lock input_lock(m_mutex_input_data);
- m_stop_flag = true;
- m_cond_input_data.notify_all();
+ boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
+ m_p->m_stop_flag = true;
+ m_p->m_cond_input_data.notify_all();
}
- m_thrds.join_all();
+ m_p->m_thrds.join_all();
- m_SocketObservable->deleteObserver(this);
+ m_p->m_socketObservable->deleteObserver(this);
- close(m_fd[0]);
- close(m_fd[1]);
+ close(m_p->m_fd[0]);
+ close(m_p->m_fd[1]);
}
void ThreadPoolSocketObserver::socketNotify(int event)
if (event & SOCKET_OBSERVE_READ)
{
char buf[2];
- read(m_fd[0], buf, 1);
+ read(m_p->m_fd[0], buf, 1);
IThreadPoolMsg *out;
{
- boost::mutex::scoped_lock output_lock(m_mutex_output_data);
- out = m_output.front();
- m_output.pop_front();
+ boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data);
+ out = m_p->m_output.front();
+ m_p->m_output.pop_front();
}
if (out)
out->result();
{
IThreadPoolMsg *in = 0;
{
- boost::mutex::scoped_lock input_lock(m_mutex_input_data);
- while (!m_stop_flag && m_input.size() == 0)
- m_cond_input_data.wait(input_lock);
- if (m_stop_flag)
+ boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
+ while (!m_p->m_stop_flag && m_p->m_input.size() == 0)
+ m_p->m_cond_input_data.wait(input_lock);
+ if (m_p->m_stop_flag)
break;
- in = m_input.front();
- m_input.pop_front();
+ in = m_p->m_input.front();
+ m_p->m_input.pop_front();
}
IThreadPoolMsg *out = in->handle();
{
- boost::mutex::scoped_lock output_lock(m_mutex_output_data);
- m_output.push_back(out);
- write(m_fd[1], "", 1);
+ boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data);
+ m_p->m_output.push_back(out);
+ write(m_p->m_fd[1], "", 1);
}
}
}
void ThreadPoolSocketObserver::put(IThreadPoolMsg *m)
{
- boost::mutex::scoped_lock input_lock(m_mutex_input_data);
- m_input.push_back(m);
- m_cond_input_data.notify_one();
+ boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
+ m_p->m_input.push_back(m);
+ m_p->m_cond_input_data.notify_one();
}
/*
* Local variables:
-/* $Id: thread_pool_observer.hpp,v 1.5 2005-11-04 10:27:51 adam Exp $
+/* $Id: thread_pool_observer.hpp,v 1.6 2005-11-04 11:06:52 adam Exp $
Copyright (c) 2005, Index Data.
%LICENSE%
#ifndef YP2_THREAD_POOL_OBSERVER_HPP
#define YP2_THREAD_POOL_OBSERVER_HPP
-#include <boost/thread/thread.hpp>
-#include <boost/thread/mutex.hpp>
-#include <boost/thread/condition.hpp>
+#include <boost/scoped_ptr.hpp>
#include <ctype.h>
};
class ThreadPoolSocketObserver : public yazpp_1::ISocketObserver {
- private:
- class Worker {
- public:
- Worker(ThreadPoolSocketObserver *s) : m_s(s) {};
- ThreadPoolSocketObserver *m_s;
- void operator() (void) {
- m_s->run(0);
- }
- };
+ class Rep;
+ class Worker;
public:
ThreadPoolSocketObserver(yazpp_1::ISocketObservable *obs,
int no_threads);
virtual ~ThreadPoolSocketObserver();
- void socketNotify(int event);
void put(IThreadPoolMsg *m);
IThreadPoolMsg *get();
void run(void *p);
- int m_fd[2];
private:
- yazpp_1::ISocketObservable *m_SocketObservable;
- int m_no_threads;
- boost::thread_group m_thrds;
-
- std::deque<IThreadPoolMsg *> m_input;
- std::deque<IThreadPoolMsg *> m_output;
-
- boost::mutex m_mutex_input_data;
- boost::condition m_cond_input_data;
- boost::mutex m_mutex_output_data;
- bool m_stop_flag;
+ void socketNotify(int event);
+ boost::scoped_ptr<Rep> m_p;
-
};
}
#endif