X-Git-Url: http://sru.miketaylor.org.uk/?a=blobdiff_plain;f=src%2Ffilter_frontend_net.cpp;h=fbc48a03d4c33b90e251491a838bfde38d0320ee;hb=cea1fb12604fd1ddbac6804b95c4aff078d30409;hp=ed9a4759025b9ed14b6482e13a5582bf8384ec63;hpb=57c1c4822f1de4d983c1aee3bc6059ad2600d991;p=metaproxy-moved-to-github.git diff --git a/src/filter_frontend_net.cpp b/src/filter_frontend_net.cpp index ed9a475..fbc48a0 100644 --- a/src/filter_frontend_net.cpp +++ b/src/filter_frontend_net.cpp @@ -20,7 +20,7 @@ class P2_Session : public yazpp_1::Z_Assoc { public: ~P2_Session(); P2_Session(yazpp_1::IPDU_Observable *the_PDU_Observable, - ThreadPoolSocketObserver *m_my_thread, + ThreadPoolSocketObserver *m_thread_pool_observer, const Package *package); int m_no_requests; private: @@ -33,7 +33,7 @@ private: void timeoutNotify(); void connectNotify(); private: - ThreadPoolSocketObserver *m_my_thread; + ThreadPoolSocketObserver *m_thread_pool_observer; Session m_session; Origin m_origin; bool m_delete_flag; @@ -70,6 +70,9 @@ void ThreadPoolPackage::result() int len; m_session->send_GDU(gdu->get(), &len); } + if (m_session->m_no_requests == 0 && m_package->session().is_closed()) + delete m_session; + delete this; } IThreadPoolMsg *ThreadPoolPackage::handle() @@ -84,7 +87,7 @@ P2_Session::P2_Session(yazpp_1::IPDU_Observable *the_PDU_Observable, const Package *package) : Z_Assoc(the_PDU_Observable) { - m_my_thread = my_thread_pool; + m_thread_pool_observer = my_thread_pool; m_no_requests = 0; m_delete_flag = false; m_package = package; @@ -107,27 +110,31 @@ void P2_Session::recv_GDU(Z_GDU *z_pdu, int len) Package *p = new Package(m_session, m_origin); - ThreadPoolPackage *m = new ThreadPoolPackage(p, this); + ThreadPoolPackage *tp = new ThreadPoolPackage(p, this); p->copy_filter(*m_package); p->request() = yazpp_1::GDU(z_pdu); - m_my_thread->put(m); + m_thread_pool_observer->put(tp); } void P2_Session::failNotify() { // TODO: send Package to signal "close" - m_delete_flag = true; - if (m_no_requests == 0) - delete this; - + if (m_session.is_closed()) + return; + m_no_requests++; + + m_session.close(); + + Package *p = new Package(m_session, m_origin); + + ThreadPoolPackage *tp = new ThreadPoolPackage(p, this); + p->copy_filter(*m_package); + m_thread_pool_observer->put(tp); } void P2_Session::timeoutNotify() { - // TODO: send Package to signal "close" - m_delete_flag = true; - if (m_no_requests == 0) - delete this; + failNotify(); } void P2_Session::connectNotify() @@ -139,7 +146,7 @@ class P2_Server : public yazpp_1::Z_Assoc { public: ~P2_Server(); P2_Server(yazpp_1::IPDU_Observable *the_PDU_Observable, - ThreadPoolSocketObserver *m_my_thread, + ThreadPoolSocketObserver *m_thread_pool_observer, const Package *package); private: yazpp_1::IPDU_Observer* sessionNotify( @@ -151,17 +158,17 @@ private: void timeoutNotify(); void connectNotify(); private: - ThreadPoolSocketObserver *m_my_thread; + ThreadPoolSocketObserver *m_thread_pool_observer; const Package *m_package; }; P2_Server::P2_Server(yazpp_1::IPDU_Observable *the_PDU_Observable, - ThreadPoolSocketObserver *my_thread, + ThreadPoolSocketObserver *thread_pool_observer, const Package *package) : Z_Assoc(the_PDU_Observable) { - m_my_thread = my_thread; + m_thread_pool_observer = thread_pool_observer; m_package = package; } @@ -169,7 +176,7 @@ P2_Server::P2_Server(yazpp_1::IPDU_Observable *the_PDU_Observable, yazpp_1::IPDU_Observer *P2_Server::sessionNotify(yazpp_1::IPDU_Observable *the_PDU_Observable, int fd) { - P2_Session *my = new P2_Session(the_PDU_Observable, m_my_thread, + P2_Session *my = new P2_Session(the_PDU_Observable, m_thread_pool_observer, m_package); return my; } @@ -235,7 +242,7 @@ void My_Timer_Thread::socketNotify(int event) close(m_fd[1]); } -Package &FilterFrontendNet::process(Package &package) const { +void FilterFrontendNet::process(Package &package) const { yazpp_1::SocketManager mySocketManager; My_Timer_Thread *tt = 0; @@ -255,7 +262,7 @@ Package &FilterFrontendNet::process(Package &package) const { if (tt && tt->timeout()) break; } - return package; + delete tt; } std::string &FilterFrontendNet::listen_address()