X-Git-Url: http://sru.miketaylor.org.uk/?a=blobdiff_plain;f=src%2Fthread_pool_observer.cpp;h=0c99422f4349188186fc411df3d805f1e808b598;hb=79100c2ae2dac4bdde4f4d46b69e147562b4ec6c;hp=367c43a8731bc31abfa651c0143097e9cfd4c06f;hpb=747bd27ce6cec1595cb3f7c5620dce794ba55e4b;p=metaproxy-moved-to-github.git diff --git a/src/thread_pool_observer.cpp b/src/thread_pool_observer.cpp index 367c43a..0c99422 100644 --- a/src/thread_pool_observer.cpp +++ b/src/thread_pool_observer.cpp @@ -1,6 +1,6 @@ -/* $Id: thread_pool_observer.cpp,v 1.9 2005-11-04 11:06:52 adam Exp $ - Copyright (c) 2005, Index Data. +/* $Id: thread_pool_observer.cpp,v 1.14 2006-03-16 10:40:59 adam Exp $ + Copyright (c) 2005-2006, Index Data. %LICENSE% */ @@ -13,6 +13,10 @@ #include #endif +#if HAVE_SYS_SOCKET_H +#include +#endif + #include #include #include @@ -20,12 +24,17 @@ #include #include +#include + #include #include #include "thread_pool_observer.hpp" +#include "pipe.hpp" -namespace yp2 { +namespace mp = metaproxy_1; + +namespace metaproxy_1 { class ThreadPoolSocketObserver::Worker { public: Worker(ThreadPoolSocketObserver *s) : m_s(s) {}; @@ -42,7 +51,7 @@ namespace yp2 { ~Rep(); private: yazpp_1::ISocketObservable *m_socketObservable; - int m_fd[2]; + Pipe m_pipe; boost::thread_group m_thrds; boost::mutex m_mutex_input_data; boost::condition m_cond_input_data; @@ -56,10 +65,10 @@ namespace yp2 { using namespace yazpp_1; -using namespace yp2; +using namespace mp; ThreadPoolSocketObserver::Rep::Rep(ISocketObservable *obs) - : m_socketObservable(obs) + : m_socketObservable(obs), m_pipe(9123) { } @@ -76,8 +85,7 @@ ThreadPoolSocketObserver::ThreadPoolSocketObserver(ISocketObservable *obs, int no_threads) : m_p(new Rep(obs)) { - pipe(m_p->m_fd); - obs->addObserver(m_p->m_fd[0], this); + obs->addObserver(m_p->m_pipe.read_fd(), this); obs->maskObserver(this, SOCKET_OBSERVE_READ); m_p->m_stop_flag = false; @@ -100,9 +108,6 @@ ThreadPoolSocketObserver::~ThreadPoolSocketObserver() m_p->m_thrds.join_all(); m_p->m_socketObservable->deleteObserver(this); - - close(m_p->m_fd[0]); - close(m_p->m_fd[1]); } void ThreadPoolSocketObserver::socketNotify(int event) @@ -110,7 +115,7 @@ void ThreadPoolSocketObserver::socketNotify(int event) if (event & SOCKET_OBSERVE_READ) { char buf[2]; - read(m_p->m_fd[0], buf, 1); + recv(m_p->m_pipe.read_fd(), buf, 1, 0); IThreadPoolMsg *out; { boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data); @@ -141,7 +146,7 @@ void ThreadPoolSocketObserver::run(void *p) { boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data); m_p->m_output.push_back(out); - write(m_p->m_fd[1], "", 1); + send(m_p->m_pipe.write_fd(), "", 1, 0); } } }