public:
~P2_Session();
P2_Session(yazpp_1::IPDU_Observable *the_PDU_Observable,
- ThreadPoolSocketObserver *m_my_thread,
+ ThreadPoolSocketObserver *m_thread_pool_observer,
const Package *package);
int m_no_requests;
private:
void timeoutNotify();
void connectNotify();
private:
- ThreadPoolSocketObserver *m_my_thread;
+ ThreadPoolSocketObserver *m_thread_pool_observer;
Session m_session;
Origin m_origin;
bool m_delete_flag;
int len;
m_session->send_GDU(gdu->get(), &len);
}
+ if (m_session->m_no_requests == 0 && m_package->session().is_closed())
+ delete m_session;
+ delete this;
}
IThreadPoolMsg *ThreadPoolPackage::handle()
const Package *package)
: Z_Assoc(the_PDU_Observable)
{
- m_my_thread = my_thread_pool;
+ m_thread_pool_observer = my_thread_pool;
m_no_requests = 0;
m_delete_flag = false;
m_package = package;
Package *p = new Package(m_session, m_origin);
- ThreadPoolPackage *m = new ThreadPoolPackage(p, this);
+ ThreadPoolPackage *tp = new ThreadPoolPackage(p, this);
p->copy_filter(*m_package);
p->request() = yazpp_1::GDU(z_pdu);
- m_my_thread->put(m);
+ m_thread_pool_observer->put(tp);
}
void P2_Session::failNotify()
{
// TODO: send Package to signal "close"
- m_delete_flag = true;
- if (m_no_requests == 0)
- delete this;
-
+ if (m_session.is_closed())
+ return;
+ m_no_requests++;
+
+ m_session.close();
+
+ Package *p = new Package(m_session, m_origin);
+
+ ThreadPoolPackage *tp = new ThreadPoolPackage(p, this);
+ p->copy_filter(*m_package);
+ m_thread_pool_observer->put(tp);
}
void P2_Session::timeoutNotify()
{
- // TODO: send Package to signal "close"
- m_delete_flag = true;
- if (m_no_requests == 0)
- delete this;
+ failNotify();
}
void P2_Session::connectNotify()
public:
~P2_Server();
P2_Server(yazpp_1::IPDU_Observable *the_PDU_Observable,
- ThreadPoolSocketObserver *m_my_thread,
+ ThreadPoolSocketObserver *m_thread_pool_observer,
const Package *package);
private:
yazpp_1::IPDU_Observer* sessionNotify(
void timeoutNotify();
void connectNotify();
private:
- ThreadPoolSocketObserver *m_my_thread;
+ ThreadPoolSocketObserver *m_thread_pool_observer;
const Package *m_package;
};
P2_Server::P2_Server(yazpp_1::IPDU_Observable *the_PDU_Observable,
- ThreadPoolSocketObserver *my_thread,
+ ThreadPoolSocketObserver *thread_pool_observer,
const Package *package)
: Z_Assoc(the_PDU_Observable)
{
- m_my_thread = my_thread;
+ m_thread_pool_observer = thread_pool_observer;
m_package = package;
}
yazpp_1::IPDU_Observer *P2_Server::sessionNotify(yazpp_1::IPDU_Observable
*the_PDU_Observable, int fd)
{
- P2_Session *my = new P2_Session(the_PDU_Observable, m_my_thread,
+ P2_Session *my = new P2_Session(the_PDU_Observable, m_thread_pool_observer,
m_package);
return my;
}
close(m_fd[1]);
}
-Package &FilterFrontendNet::process(Package &package) const {
+void FilterFrontendNet::process(Package &package) const {
yazpp_1::SocketManager mySocketManager;
My_Timer_Thread *tt = 0;
if (tt && tt->timeout())
break;
}
- return package;
+ delete tt;
}
std::string &FilterFrontendNet::listen_address()