X-Git-Url: http://sru.miketaylor.org.uk/?a=blobdiff_plain;f=src%2Fthread_pool_observer.cpp;h=ab722553eaa6a6730fb3f654fbcdfe6e1ad683f8;hb=d15eeaaabd7e3f92ffe0215bd930bbc92b380e0f;hp=180f385aae736516706be355ad853c7445fdbe2e;hpb=323af6850b8aba94fb86edb303867dc55dc2eb7f;p=metaproxy-moved-to-github.git diff --git a/src/thread_pool_observer.cpp b/src/thread_pool_observer.cpp index 180f385..ab72255 100644 --- a/src/thread_pool_observer.cpp +++ b/src/thread_pool_observer.cpp @@ -1,8 +1,7 @@ +/* $Id: thread_pool_observer.cpp,v 1.19 2007-02-19 12:51:08 adam Exp $ + Copyright (c) 2005-2007, Index Data. -/* $Id: thread_pool_observer.cpp,v 1.10 2005-11-07 12:31:05 adam Exp $ - Copyright (c) 2005, Index Data. - -%LICENSE% + See the LICENSE file for details */ #include "config.hpp" @@ -13,6 +12,10 @@ #include #endif +#if HAVE_SYS_SOCKET_H +#include +#endif + #include #include #include @@ -22,12 +25,13 @@ #include -#include +#include #include #include "thread_pool_observer.hpp" +#include "pipe.hpp" -namespace yp2 { +namespace metaproxy_1 { class ThreadPoolSocketObserver::Worker { public: Worker(ThreadPoolSocketObserver *s) : m_s(s) {}; @@ -44,7 +48,7 @@ namespace yp2 { ~Rep(); private: yazpp_1::ISocketObservable *m_socketObservable; - int m_fd[2]; + Pipe m_pipe; boost::thread_group m_thrds; boost::mutex m_mutex_input_data; boost::condition m_cond_input_data; @@ -58,10 +62,10 @@ namespace yp2 { using namespace yazpp_1; -using namespace yp2; +using namespace metaproxy_1; -ThreadPoolSocketObserver::Rep::Rep(ISocketObservable *obs) - : m_socketObservable(obs) +ThreadPoolSocketObserver::Rep::Rep(yazpp_1::ISocketObservable *obs) + : m_socketObservable(obs), m_pipe(9123) { } @@ -74,12 +78,11 @@ IThreadPoolMsg::~IThreadPoolMsg() } -ThreadPoolSocketObserver::ThreadPoolSocketObserver(ISocketObservable *obs, - int no_threads) +ThreadPoolSocketObserver::ThreadPoolSocketObserver( + yazpp_1::ISocketObservable *obs, int no_threads) : m_p(new Rep(obs)) { - pipe(m_p->m_fd); - obs->addObserver(m_p->m_fd[0], this); + obs->addObserver(m_p->m_pipe.read_fd(), this); obs->maskObserver(this, SOCKET_OBSERVE_READ); m_p->m_stop_flag = false; @@ -102,9 +105,6 @@ ThreadPoolSocketObserver::~ThreadPoolSocketObserver() m_p->m_thrds.join_all(); m_p->m_socketObservable->deleteObserver(this); - - close(m_p->m_fd[0]); - close(m_p->m_fd[1]); } void ThreadPoolSocketObserver::socketNotify(int event) @@ -112,7 +112,11 @@ void ThreadPoolSocketObserver::socketNotify(int event) if (event & SOCKET_OBSERVE_READ) { char buf[2]; - read(m_p->m_fd[0], buf, 1); +#ifdef WIN32 + recv(m_p->m_pipe.read_fd(), buf, 1, 0); +#else + read(m_p->m_pipe.read_fd(), buf, 1); +#endif IThreadPoolMsg *out; { boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data); @@ -143,7 +147,11 @@ void ThreadPoolSocketObserver::run(void *p) { boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data); m_p->m_output.push_back(out); - write(m_p->m_fd[1], "", 1); +#ifdef WIN32 + send(m_p->m_pipe.write_fd(), "", 1, 0); +#else + write(m_p->m_pipe.write_fd(), "", 1); +#endif } } }