- yaz_log (m_log, "new PDU_Assoc. Accepting");
- // assume comstack is accepting...
- m_state = Accepting;
- m_socketObservable->addObserver(cs_fileno(cs), this);
- yaz_log(m_log, "maskObserver 1");
- m_socketObservable->maskObserver(this,
- mask |SOCKET_OBSERVE_EXCEPT);
+ yaz_log (m_log, "new PDU_Assoc. Accepting");
+ // assume comstack is accepting...
+ m_state = Accepting;
+ m_socketObservable->addObserver(cs_fileno(cs), this);
+ yaz_log(m_log, "maskObserver 1");
+ m_socketObservable->maskObserver(this,
+ mask |SOCKET_OBSERVE_EXCEPT);
void PDU_Assoc::socketNotify(int event)
{
yaz_log (m_log, "PDU_Assoc::socketNotify p=%p state=%d event = %d",
void PDU_Assoc::socketNotify(int event)
{
yaz_log (m_log, "PDU_Assoc::socketNotify p=%p state=%d event = %d",
- if (!cs_accept (m_cs))
- {
- yaz_log (m_log, "PDU_Assoc::cs_accept failed");
- m_cs = 0;
- close();
- m_PDU_Observer->failNotify();
- }
- else
- {
- unsigned mask = 0;
- if (m_cs->io_pending & CS_WANT_WRITE)
- mask |= SOCKET_OBSERVE_WRITE;
- if (m_cs->io_pending & CS_WANT_READ)
- mask |= SOCKET_OBSERVE_READ;
- if (!mask)
- { // accept is complete. turn to ready state and write if needed
- m_state = Ready;
- flush_PDU();
- }
- else
- { // accept still incomplete.
- yaz_log(m_log, "maskObserver 2");
- m_socketObservable->maskObserver(this,
- mask|SOCKET_OBSERVE_EXCEPT);
- }
- }
- break;
+ if (!cs_accept (m_cs))
+ {
+ yaz_log (m_log, "PDU_Assoc::cs_accept failed");
+ m_cs = 0;
+ shutdown();
+ m_PDU_Observer->failNotify();
+ }
+ else
+ {
+ unsigned mask = 0;
+ if (m_cs->io_pending & CS_WANT_WRITE)
+ mask |= SOCKET_OBSERVE_WRITE;
+ if (m_cs->io_pending & CS_WANT_READ)
+ mask |= SOCKET_OBSERVE_READ;
+ if (!mask)
+ { // accept is complete. turn to ready state and write if needed
+ m_state = Ready;
+ flush_PDU();
+ }
+ else
+ { // accept still incomplete.
+ yaz_log(m_log, "maskObserver 2");
+ m_socketObservable->maskObserver(this,
+ mask|SOCKET_OBSERVE_EXCEPT);
+ }
+ }
+ break;
- if (event & SOCKET_OBSERVE_READ &&
- event & SOCKET_OBSERVE_WRITE)
- {
- // For Unix: if both read and write is set, then connect failed.
- close();
- m_PDU_Observer->failNotify();
- }
- else
- {
- yaz_log (m_log, "cs_rcvconnect");
- int res = cs_rcvconnect (m_cs);
- if (res == 1)
- {
- unsigned mask = SOCKET_OBSERVE_EXCEPT;
- if (m_cs->io_pending & CS_WANT_WRITE)
- mask |= SOCKET_OBSERVE_WRITE;
- if (m_cs->io_pending & CS_WANT_READ)
- mask |= SOCKET_OBSERVE_READ;
- yaz_log(m_log, "maskObserver 3");
- m_socketObservable->maskObserver(this, mask);
- }
- else
- {
- m_state = Ready;
- if (m_PDU_Observer)
- m_PDU_Observer->connectNotify();
- flush_PDU();
- }
- }
- break;
+ if (event & SOCKET_OBSERVE_READ &&
+ event & SOCKET_OBSERVE_WRITE)
+ {
+ // For Unix: if both read and write is set, then connect failed.
+ shutdown();
+ m_PDU_Observer->failNotify();
+ }
+ else
+ {
+ yaz_log (m_log, "cs_rcvconnect");
+ int res = cs_rcvconnect (m_cs);
+ if (res == 1)
+ {
+ unsigned mask = SOCKET_OBSERVE_EXCEPT;
+ if (m_cs->io_pending & CS_WANT_WRITE)
+ mask |= SOCKET_OBSERVE_WRITE;
+ if (m_cs->io_pending & CS_WANT_READ)
+ mask |= SOCKET_OBSERVE_READ;
+ yaz_log(m_log, "maskObserver 3");
+ m_socketObservable->maskObserver(this, mask);
+ }
+ else
+ {
+ m_state = Ready;
+ if (m_PDU_Observer)
+ m_PDU_Observer->connectNotify();
+ flush_PDU();
+ }
+ }
+ break;
- if (event & SOCKET_OBSERVE_READ)
- {
- int res;
- COMSTACK new_line;
-
- if ((res = cs_listen(m_cs, 0, 0)) == 1)
- return;
- if (res < 0)
- {
- yaz_log(YLOG_FATAL|YLOG_ERRNO, "cs_listen failed");
- return;
- }
- if (!(new_line = cs_accept(m_cs)))
- return;
- /* 1. create socket-manager
+ if (event & SOCKET_OBSERVE_READ)
+ {
+ int res;
+ COMSTACK new_line;
+
+ if ((res = cs_listen(m_cs, 0, 0)) == 1)
+ return;
+ if (res < 0)
+ {
+ yaz_log(YLOG_FATAL|YLOG_ERRNO, "cs_listen failed");
+ return;
+ }
+ if (!(new_line = cs_accept(m_cs)))
+ return;
+ /* 1. create socket-manager
- */
- yaz_log (m_log, "new session: parent fd=%d child fd=%d",
- cs_fileno(m_cs), cs_fileno(new_line));
- childNotify (new_line);
- }
- break;
+ */
+ yaz_log (m_log, "new session: parent fd=%d child fd=%d",
+ cs_fileno(m_cs), cs_fileno(new_line));
+ childNotify (new_line);
+ }
+ break;
- if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
- {
- do
- {
- int res = cs_get (m_cs, &m_input_buf, &m_input_len);
- if (res == 1)
+ if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
+ {
+ do
+ {
+ int res = cs_get (m_cs, &m_input_buf, &m_input_len);
+ if (res == 1)
{
unsigned mask = SOCKET_OBSERVE_EXCEPT;
if (m_cs->io_pending & CS_WANT_WRITE)
mask |= SOCKET_OBSERVE_WRITE;
if (m_cs->io_pending & CS_WANT_READ)
mask |= SOCKET_OBSERVE_READ;
{
unsigned mask = SOCKET_OBSERVE_EXCEPT;
if (m_cs->io_pending & CS_WANT_WRITE)
mask |= SOCKET_OBSERVE_WRITE;
if (m_cs->io_pending & CS_WANT_READ)
mask |= SOCKET_OBSERVE_READ;
- yaz_log(m_log, "maskObserver 4");
- m_socketObservable->maskObserver(this, mask);
- return;
+ yaz_log(m_log, "maskObserver 4");
+ m_socketObservable->maskObserver(this, mask);
+ return;
+ }
+ else if (res <= 0)
+ {
+ yaz_log (m_log, "PDU_Assoc::Connection closed by peer");
+ shutdown();
+ if (m_PDU_Observer)
+ m_PDU_Observer->failNotify(); // problem here..
+ return;
- else if (res <= 0)
- {
- yaz_log (m_log, "PDU_Assoc::Connection closed by peer");
- close();
- if (m_PDU_Observer)
- m_PDU_Observer->failNotify(); // problem here..
- return;
- }
- // lock it, so we know if recv_PDU deletes it.
- int destroyed = 0;
- m_destroyed = &destroyed;
-
- if (!m_PDU_Observer)
- return;
+ // lock it, so we know if recv_PDU deletes it.
+ int destroyed = 0;
+ m_destroyed = &destroyed;
+
+ if (!m_PDU_Observer)
+ return;
- PDU_Queue **pq = &m_queue_in;
- while (*pq)
- pq = &(*pq)->m_next;
-
- *pq = new PDU_Queue(m_input_buf, res);
+ PDU_Queue **pq = &m_queue_in;
+ while (*pq)
+ pq = &(*pq)->m_next;
+
+ *pq = new PDU_Queue(m_input_buf, res);
- yaz_log(m_log, "maskObserver 5");
- m_socketObservable->maskObserver(this,
- SOCKET_OBSERVE_EXCEPT|
- SOCKET_OBSERVE_READ);
- }
- }
- break;
+ yaz_log(m_log, "maskObserver 5");
+ m_socketObservable->maskObserver(this,
+ SOCKET_OBSERVE_EXCEPT|
+ SOCKET_OBSERVE_READ);
+ }
+ }
+ break;
- yaz_log (m_log, "CLOSING state=%d event was %d", m_state, event);
- close();
- m_PDU_Observer->failNotify();
- break;
+ yaz_log (m_log, "CLOSING state=%d event was %d", m_state, event);
+ shutdown();
+ m_PDU_Observer->failNotify();
+ break;
- yaz_log (m_log, "Unknown state=%d event was %d", m_state, event);
- close();
- m_PDU_Observer->failNotify();
+ yaz_log (m_log, "Unknown state=%d event was %d", m_state, event);
+ shutdown();
+ m_PDU_Observer->failNotify();
+ }
+}
+
+void PDU_Assoc::close_session()
+{
+ m_session_is_dead = true;
+ if (!m_queue_out)
+ {
+ shutdown();
+ m_PDU_Observer->failNotify();
if (m_state != Ready && m_state != Writing)
{
yaz_log (m_log, "YAZ_PDU_Assoc::flush_PDU, not ready");
if (m_state != Ready && m_state != Writing)
{
yaz_log (m_log, "YAZ_PDU_Assoc::flush_PDU, not ready");
- m_state = Ready;
- yaz_log (m_log, "YAZ_PDU_Assoc::flush_PDU queue empty");
- yaz_log(m_log, "maskObserver 6");
- m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
- SOCKET_OBSERVE_WRITE|
- SOCKET_OBSERVE_EXCEPT);
+ m_state = Ready;
+ yaz_log (m_log, "YAZ_PDU_Assoc::flush_PDU queue empty");
+ yaz_log(m_log, "maskObserver 6");
+ m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
+ SOCKET_OBSERVE_WRITE|
+ SOCKET_OBSERVE_EXCEPT);
+ if (m_session_is_dead)
+ {
+ shutdown();
+ m_PDU_Observer->failNotify();
+ }
return 0;
}
r = cs_put (m_cs, q->m_buf, q->m_len);
if (r < 0)
{
yaz_log (m_log, "PDU_Assoc::flush_PDU cs_put failed");
return 0;
}
r = cs_put (m_cs, q->m_buf, q->m_len);
if (r < 0)
{
yaz_log (m_log, "PDU_Assoc::flush_PDU cs_put failed");
- mask |= SOCKET_OBSERVE_WRITE;
- yaz_log(m_log, "maskObserver 7");
- m_socketObservable->maskObserver(this, mask);
+ mask |= SOCKET_OBSERVE_WRITE;
+ yaz_log(m_log, "maskObserver 7");
+ m_socketObservable->maskObserver(this, mask);
- yaz_log(m_log, "maskObserver 8");
- m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
- SOCKET_OBSERVE_EXCEPT);
+ yaz_log(m_log, "maskObserver 8");
+ m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
+ SOCKET_OBSERVE_EXCEPT);
m_socketObservable->addObserver(cs_fileno(m_cs), this);
yaz_log(m_log, "maskObserver 9");
m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
m_socketObservable->addObserver(cs_fileno(m_cs), this);
yaz_log(m_log, "maskObserver 9");
m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
int PDU_Assoc::connect(IPDU_Observer *observer, const char *addr)
{
yaz_log (m_log, "PDU_Assoc::connect %s", addr);
int PDU_Assoc::connect(IPDU_Observer *observer, const char *addr)
{
yaz_log (m_log, "PDU_Assoc::connect %s", addr);
m_PDU_Observer = observer;
void *ap;
m_cs = comstack(addr, &ap);
if (!m_cs)
m_PDU_Observer = observer;
void *ap;
m_cs = comstack(addr, &ap);
if (!m_cs)
int res = cs_connect (m_cs, ap);
yaz_log (m_log, "PDU_Assoc::connect fd=%d res=%d", cs_fileno(m_cs),
int res = cs_connect (m_cs, ap);
yaz_log (m_log, "PDU_Assoc::connect fd=%d res=%d", cs_fileno(m_cs),
- m_state = Connecting;
- unsigned mask = SOCKET_OBSERVE_EXCEPT;
- mask |= SOCKET_OBSERVE_WRITE;
- mask |= SOCKET_OBSERVE_READ;
- yaz_log(m_log, "maskObserver 11");
- m_socketObservable->maskObserver(this, mask);
+ m_state = Connecting;
+ unsigned mask = SOCKET_OBSERVE_EXCEPT;
+ mask |= SOCKET_OBSERVE_WRITE;
+ mask |= SOCKET_OBSERVE_READ;
+ yaz_log(m_log, "maskObserver 11");
+ m_socketObservable->maskObserver(this, mask);
- m_state = Connecting;
- unsigned mask = SOCKET_OBSERVE_EXCEPT;
- if (m_cs->io_pending & CS_WANT_WRITE)
- mask |= SOCKET_OBSERVE_WRITE;
- if (m_cs->io_pending & CS_WANT_READ)
- mask |= SOCKET_OBSERVE_READ;
- yaz_log(m_log, "maskObserver 11");
- m_socketObservable->maskObserver(this, mask);
+ m_state = Connecting;
+ unsigned mask = SOCKET_OBSERVE_EXCEPT;
+ if (m_cs->io_pending & CS_WANT_WRITE)
+ mask |= SOCKET_OBSERVE_WRITE;
+ if (m_cs->io_pending & CS_WANT_READ)
+ mask |= SOCKET_OBSERVE_READ;
+ yaz_log(m_log, "maskObserver 11");
+ m_socketObservable->maskObserver(this, mask);
- yaz_log(m_log, "maskObserver 12");
- m_socketObservable->maskObserver(this, SOCKET_OBSERVE_WRITE|
- SOCKET_OBSERVE_EXCEPT);
+ yaz_log(m_log, "maskObserver 12");
+ m_socketObservable->maskObserver(this, SOCKET_OBSERVE_WRITE|
+ SOCKET_OBSERVE_EXCEPT);
+ // Clone PDU Observer
+ new_observable->m_PDU_Observer = m_PDU_Observer->sessionNotify
+ (new_observable, cs_fileno(cs));
+
+ if (!new_observable->m_PDU_Observer)
+ {
+ new_observable->shutdown();
+ delete new_observable;
+ return;
+ }
new_observable->m_next = m_children;
m_children = new_observable;
new_observable->m_parent = this;
new_observable->m_next = m_children;
m_children = new_observable;
new_observable->m_parent = this;