Simplified process interface. Private sub class Worker.
[metaproxy-moved-to-github.git] / src / filter_frontend_net.cpp
index ed9a475..fbc48a0 100644 (file)
@@ -20,7 +20,7 @@ class P2_Session : public yazpp_1::Z_Assoc {
 public:
     ~P2_Session();
     P2_Session(yazpp_1::IPDU_Observable *the_PDU_Observable,
-              ThreadPoolSocketObserver *m_my_thread,
+              ThreadPoolSocketObserver *m_thread_pool_observer,
               const Package *package);
     int m_no_requests;
 private:
@@ -33,7 +33,7 @@ private:
     void timeoutNotify();
     void connectNotify();
 private:
-    ThreadPoolSocketObserver *m_my_thread;
+    ThreadPoolSocketObserver *m_thread_pool_observer;
     Session m_session;
     Origin m_origin;
     bool m_delete_flag;
@@ -70,6 +70,9 @@ void ThreadPoolPackage::result()
        int len;
        m_session->send_GDU(gdu->get(), &len);
     }
+    if (m_session->m_no_requests == 0 && m_package->session().is_closed())
+       delete m_session;
+    delete this;
 }
 
 IThreadPoolMsg *ThreadPoolPackage::handle() 
@@ -84,7 +87,7 @@ P2_Session::P2_Session(yazpp_1::IPDU_Observable *the_PDU_Observable,
                       const Package *package)
     :  Z_Assoc(the_PDU_Observable)
 {
-    m_my_thread = my_thread_pool;
+    m_thread_pool_observer = my_thread_pool;
     m_no_requests = 0;
     m_delete_flag = false;
     m_package = package;
@@ -107,27 +110,31 @@ void P2_Session::recv_GDU(Z_GDU *z_pdu, int len)
 
     Package *p = new Package(m_session, m_origin);
 
-    ThreadPoolPackage *m = new ThreadPoolPackage(p, this);
+    ThreadPoolPackage *tp = new ThreadPoolPackage(p, this);
     p->copy_filter(*m_package);
     p->request() = yazpp_1::GDU(z_pdu);
-    m_my_thread->put(m);  
+    m_thread_pool_observer->put(tp);  
 }
 
 void P2_Session::failNotify()
 {
     // TODO: send Package to signal "close"
-    m_delete_flag = true;
-    if (m_no_requests == 0)
-        delete this;
-    
+    if (m_session.is_closed())
+       return;
+    m_no_requests++;
+
+    m_session.close();
+
+    Package *p = new Package(m_session, m_origin);
+
+    ThreadPoolPackage *tp = new ThreadPoolPackage(p, this);
+    p->copy_filter(*m_package);
+    m_thread_pool_observer->put(tp);  
 }
 
 void P2_Session::timeoutNotify()
 {
-    // TODO: send Package to signal "close"
-    m_delete_flag = true;
-    if (m_no_requests == 0)
-        delete this;
+    failNotify();
 }
 
 void P2_Session::connectNotify()
@@ -139,7 +146,7 @@ class P2_Server : public yazpp_1::Z_Assoc {
 public:
     ~P2_Server();
     P2_Server(yazpp_1::IPDU_Observable *the_PDU_Observable,
-              ThreadPoolSocketObserver *m_my_thread,
+              ThreadPoolSocketObserver *m_thread_pool_observer,
              const Package *package);
 private:
     yazpp_1::IPDU_Observer* sessionNotify(
@@ -151,17 +158,17 @@ private:
     void timeoutNotify();
     void connectNotify();
 private:
-    ThreadPoolSocketObserver *m_my_thread;
+    ThreadPoolSocketObserver *m_thread_pool_observer;
     const Package *m_package;
 };
 
 
 P2_Server::P2_Server(yazpp_1::IPDU_Observable *the_PDU_Observable,
-                     ThreadPoolSocketObserver *my_thread,
+                     ThreadPoolSocketObserver *thread_pool_observer,
                     const Package *package)
     :  Z_Assoc(the_PDU_Observable)
 {
-    m_my_thread = my_thread;
+    m_thread_pool_observer = thread_pool_observer;
     m_package = package;
 
 }
@@ -169,7 +176,7 @@ P2_Server::P2_Server(yazpp_1::IPDU_Observable *the_PDU_Observable,
 yazpp_1::IPDU_Observer *P2_Server::sessionNotify(yazpp_1::IPDU_Observable
                                                 *the_PDU_Observable, int fd)
 {
-    P2_Session *my = new P2_Session(the_PDU_Observable, m_my_thread,
+    P2_Session *my = new P2_Session(the_PDU_Observable, m_thread_pool_observer,
                                    m_package);
     return my;
 }
@@ -235,7 +242,7 @@ void My_Timer_Thread::socketNotify(int event)
     close(m_fd[1]);
 }
 
-Package &FilterFrontendNet::process(Package &package) const {
+void FilterFrontendNet::process(Package &package) const {
     yazpp_1::SocketManager mySocketManager;
 
     My_Timer_Thread *tt = 0;
@@ -255,7 +262,7 @@ Package &FilterFrontendNet::process(Package &package) const {
        if (tt && tt->timeout())
            break;
     }
-    return package;
+    delete tt;
 }
 
 std::string &FilterFrontendNet::listen_address()