}
}
+void ThreadPoolSocketObserver::get_thread_info(int &tbusy, int &total)
+{
+ tbusy = m_p->m_no_threads - m_p->m_no_threads_waiting;
+ total = m_p->m_no_threads;
+}
+
void ThreadPoolSocketObserver::run(void *p)
{
while(1)
}
}
+void ThreadPoolSocketObserver::cleanup(IThreadPoolMsg *m, void *info)
+{
+ boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
+
+ std::deque<IThreadPoolMsg *>::iterator it = m_p->m_input.begin();
+ while (it != m_p->m_input.end())
+ {
+ if ((*it)->cleanup(info))
+ it = m_p->m_input.erase(it);
+ else
+ it++;
+ }
+}
+
void ThreadPoolSocketObserver::put(IThreadPoolMsg *m)
{
boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
+
while (m_p->m_input.size() >= m_p->m_no_threads * queue_size_per_thread)
m_p->m_cond_input_full.wait(input_lock);
m_p->m_input.push_back(m);
m_p->m_cond_input_data.notify_one();
}
+
/*
* Local variables:
* c-basic-offset: 4