From 705875bc5185fa585f54d5387323db10d898cea1 Mon Sep 17 00:00:00 2001 From: Adam Dickmeiss Date: Mon, 28 Sep 2015 19:41:33 +0200 Subject: [PATCH] Allow stack-size to be set for ThreadPoolSocketObserver --- src/filter_frontend_net.cpp | 16 ++++++++++++++-- src/test_thread_pool_observer.cpp | 32 +++++++++++++------------------- src/thread_pool_observer.cpp | 21 ++++++++++++++++++--- src/thread_pool_observer.hpp | 3 ++- xml/schema/filter_frontend_net.rnc | 1 + 5 files changed, 48 insertions(+), 25 deletions(-) diff --git a/src/filter_frontend_net.cpp b/src/filter_frontend_net.cpp index 3df0332..7d6b422 100644 --- a/src/filter_frontend_net.cpp +++ b/src/filter_frontend_net.cpp @@ -56,6 +56,7 @@ namespace metaproxy_1 { int m_no_threads; int m_max_threads; + int m_stack_size; std::vector m_ports; int m_listen_duration; int m_session_timeout; @@ -526,6 +527,7 @@ yf::FrontendNet::FrontendNet() : m_p(new Rep) yf::FrontendNet::Rep::Rep() { m_max_threads = m_no_threads = 5; + m_stack_size = 0; m_listen_duration = 0; m_session_timeout = 300; // 5 minutes m_connect_max = 0; @@ -616,7 +618,8 @@ void yf::FrontendNet::process(mp::Package &package) const m_p->m_listen_duration); ThreadPoolSocketObserver tp(&m_p->mySocketManager, m_p->m_no_threads, - m_p->m_max_threads); + m_p->m_max_threads, + m_p->m_stack_size); for (i = 0; im_ports.size(); i++) { @@ -710,10 +713,19 @@ void yf::FrontendNet::configure(const xmlNode * ptr, bool test_only, std::string threads_str = mp::xml::get_text(ptr); int threads = atoi(threads_str.c_str()); if (threads < 1) - throw yf::FilterException("Bad value for threads: " + throw yf::FilterException("Bad value for max-threads: " + threads_str); m_p->m_max_threads = threads; } + else if (!strcmp((const char *) ptr->name, "stack-size")) + { + std::string sz_str = mp::xml::get_text(ptr); + int sz = atoi(sz_str.c_str()); + if (sz < 0) + throw yf::FilterException("Bad value for stack-size: " + + sz_str); + m_p->m_stack_size = sz * 1024; + } else if (!strcmp((const char *) ptr->name, "timeout")) { std::string timeout_str = mp::xml::get_text(ptr); diff --git a/src/test_thread_pool_observer.cpp b/src/test_thread_pool_observer.cpp index 3b2fe9a..8df5b22 100644 --- a/src/test_thread_pool_observer.cpp +++ b/src/test_thread_pool_observer.cpp @@ -61,14 +61,9 @@ public: mp::IThreadPoolMsg *My_Msg::handle() { - My_Msg *res = new My_Msg; - if (m_val == 7) sleep(1); - - res->m_val = m_val; - res->m_timer = m_timer; - return res; + return this; } bool My_Msg::cleanup(void *info) @@ -80,6 +75,7 @@ void My_Msg::result(const char *t_info) { m_timer->m_sum += m_val; m_timer->m_responses++; + delete this; } My_Timer_Thread::My_Timer_Thread(ISocketObservable *obs, @@ -97,26 +93,24 @@ My_Timer_Thread::My_Timer_Thread(ISocketObservable *obs, void My_Timer_Thread::socketNotify(int event) { - My_Msg *m = new My_Msg; - m->m_val = m_requests++; - m->m_timer = this; - m_t->put(m); -#if 0 - // prevent input queue from being filled up.. - // bug #1064: Test test_thread_pool_observer hangs - // fortunately we don't need this hack. because put (ebove) - // will block itself if needed - if (m->m_val == 30) + if (m_requests == 30) m_obs->deleteObserver(this); -#endif + else + { + My_Msg *m = new My_Msg; + m->m_val = m_requests++; + m->m_timer = this; + m_t->put(m); + } } BOOST_AUTO_TEST_CASE( thread_pool_observer1 ) { SocketManager mySocketManager; - mp::ThreadPoolSocketObserver m(&mySocketManager, 3); - My_Timer_Thread t(&mySocketManager, &m) ; + mp::ThreadPoolSocketObserver m(&mySocketManager, 3, 3, 16*1024); + My_Timer_Thread t(&mySocketManager, &m); + while (t.m_responses < 30 && mySocketManager.processEvent() > 0) ; BOOST_CHECK_EQUAL(t.m_responses, 30); diff --git a/src/thread_pool_observer.cpp b/src/thread_pool_observer.cpp index bf60e73..f7d3e63 100644 --- a/src/thread_pool_observer.cpp +++ b/src/thread_pool_observer.cpp @@ -68,6 +68,7 @@ namespace metaproxy_1 { std::deque m_input; std::deque m_output; bool m_stop_flag; + unsigned m_stack_size; unsigned m_no_threads; unsigned m_min_threads; unsigned m_max_threads; @@ -97,7 +98,8 @@ IThreadPoolMsg::~IThreadPoolMsg() ThreadPoolSocketObserver::ThreadPoolSocketObserver( yazpp_1::ISocketObservable *obs, - unsigned min_threads, unsigned max_threads) + unsigned min_threads, unsigned max_threads, + unsigned stack_size) : m_p(new Rep(obs)) { obs->addObserver(m_p->m_pipe.read_fd(), this); @@ -107,11 +109,18 @@ ThreadPoolSocketObserver::ThreadPoolSocketObserver( m_p->m_min_threads = m_p->m_no_threads = min_threads; m_p->m_max_threads = max_threads; m_p->m_waiting_threads = 0; + m_p->m_stack_size = stack_size; unsigned i; for (i = 0; i < m_p->m_no_threads; i++) { Worker w(this); - m_p->m_thrds.add_thread(new boost::thread(w)); + boost::thread::attributes attrs; + if (m_p->m_stack_size) + attrs.set_stack_size(m_p->m_stack_size); + + boost::thread *x = new boost::thread(attrs, w); + + m_p->m_thrds.add_thread(x); } } @@ -238,7 +247,13 @@ void ThreadPoolSocketObserver::put(IThreadPoolMsg *m) { m_p->m_no_threads++; Worker w(this); - m_p->m_thrds.add_thread(new boost::thread(w)); + + boost::thread::attributes attrs; + if (m_p->m_stack_size) + attrs.set_stack_size(m_p->m_stack_size); + boost::thread *x = new boost::thread(attrs, w); + + m_p->m_thrds.add_thread(x); } while (m_p->m_input.size() >= m_p->m_no_threads * queue_size_per_thread) m_p->m_cond_input_full.wait(input_lock); diff --git a/src/thread_pool_observer.hpp b/src/thread_pool_observer.hpp index 88c3a0b..8d59897 100644 --- a/src/thread_pool_observer.hpp +++ b/src/thread_pool_observer.hpp @@ -38,7 +38,8 @@ namespace metaproxy_1 { class Worker; public: ThreadPoolSocketObserver(yazpp_1::ISocketObservable *obs, - unsigned min_threads, unsigned max_threads); + unsigned min_threads, unsigned max_threads, + unsigned stack_size); virtual ~ThreadPoolSocketObserver(); void put(IThreadPoolMsg *m); void cleanup(IThreadPoolMsg *m, void *info); diff --git a/xml/schema/filter_frontend_net.rnc b/xml/schema/filter_frontend_net.rnc index 6f94f6b..4e32ccd 100644 --- a/xml/schema/filter_frontend_net.rnc +++ b/xml/schema/filter_frontend_net.rnc @@ -8,6 +8,7 @@ filter_frontend_net = attribute name { xsd:NCName }?, element mp:threads { xsd:integer }?, element mp:max-threads { xsd:integer }?, + element mp:stack-size { xsd:integer }?, element mp:port { attribute route { xsd:NCName }?, attribute max_recv_bytes { xsd:integer }?, -- 1.7.10.4