X-Git-Url: http://sru.miketaylor.org.uk/?a=blobdiff_plain;ds=sidebyside;f=src%2Fyaz-pdu-assoc.cpp;h=b8e3416f4b283d091e2f0125185f620c9920f1a8;hb=75f7c460d6d10961f3d2ed841b757d6d6b7725d7;hp=0ed6f2955489aca54c62d3b49e126f419e683be9;hpb=966d1a0443071c2b75426d0214bfb9960c5c3fba;p=yazpp-moved-to-github.git diff --git a/src/yaz-pdu-assoc.cpp b/src/yaz-pdu-assoc.cpp index 0ed6f29..b8e3416 100644 --- a/src/yaz-pdu-assoc.cpp +++ b/src/yaz-pdu-assoc.cpp @@ -1,8 +1,8 @@ /* - * Copyright (c) 1998-2001, Index Data. + * Copyright (c) 1998-2004, Index Data. * See the file LICENSE for details. * - * $Id: yaz-pdu-assoc.cpp,v 1.25 2001-11-04 22:36:21 adam Exp $ + * $Id: yaz-pdu-assoc.cpp,v 1.42 2005-06-21 21:30:24 adam Exp $ */ #include @@ -10,16 +10,18 @@ #include #include -#include +#include +using namespace yazpp_1; -void Yaz_PDU_Assoc::init(IYazSocketObservable *socketObservable) +void PDU_Assoc::init(ISocketObservable *socketObservable) { m_state = Closed; m_cs = 0; m_socketObservable = socketObservable; m_PDU_Observer = 0; m_queue_out = 0; + m_queue_in = 0; m_input_buf = 0; m_input_len = 0; m_children = 0; @@ -27,24 +29,24 @@ void Yaz_PDU_Assoc::init(IYazSocketObservable *socketObservable) m_next = 0; m_destroyed = 0; m_idleTime = 0; - m_log = LOG_DEBUG; + m_log = YLOG_DEBUG; } -Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable) +PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable) { init (socketObservable); } -Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable, +PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable, COMSTACK cs) { init(socketObservable); m_cs = cs; unsigned mask = 0; if (cs->io_pending & CS_WANT_WRITE) - mask |= YAZ_SOCKET_OBSERVE_WRITE; + mask |= SOCKET_OBSERVE_WRITE; if (cs->io_pending & CS_WANT_READ) - mask |= YAZ_SOCKET_OBSERVE_READ; + mask |= SOCKET_OBSERVE_READ; m_socketObservable->addObserver(cs_fileno(cs), this); if (!mask) { @@ -58,29 +60,30 @@ Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable, // assume comstack is accepting... m_state = Accepting; m_socketObservable->addObserver(cs_fileno(cs), this); + yaz_log(m_log, "maskObserver 1"); m_socketObservable->maskObserver(this, - mask |YAZ_SOCKET_OBSERVE_EXCEPT); + mask |SOCKET_OBSERVE_EXCEPT); } } -IYaz_PDU_Observable *Yaz_PDU_Assoc::clone() +IPDU_Observable *PDU_Assoc::clone() { - Yaz_PDU_Assoc *copy = new Yaz_PDU_Assoc(m_socketObservable); + PDU_Assoc *copy = new PDU_Assoc(m_socketObservable); return copy; } -void Yaz_PDU_Assoc::socketNotify(int event) +void PDU_Assoc::socketNotify(int event) { - yaz_log (m_log, "Yaz_PDU_Assoc::socketNotify p=%p state=%d event = %d", + yaz_log (m_log, "PDU_Assoc::socketNotify p=%p state=%d event = %d", this, m_state, event); - if (event & YAZ_SOCKET_OBSERVE_EXCEPT) + if (event & SOCKET_OBSERVE_EXCEPT) { close(); m_PDU_Observer->failNotify(); return; } - else if (event & YAZ_SOCKET_OBSERVE_TIMEOUT) + else if (event & SOCKET_OBSERVE_TIMEOUT) { m_PDU_Observer->timeoutNotify(); return; @@ -90,7 +93,7 @@ void Yaz_PDU_Assoc::socketNotify(int event) case Accepting: if (!cs_accept (m_cs)) { - yaz_log (m_log, "Yaz_PDU_Assoc::cs_accept failed"); + yaz_log (m_log, "PDU_Assoc::cs_accept failed"); m_cs = 0; close(); m_PDU_Observer->failNotify(); @@ -99,9 +102,9 @@ void Yaz_PDU_Assoc::socketNotify(int event) { unsigned mask = 0; if (m_cs->io_pending & CS_WANT_WRITE) - mask |= YAZ_SOCKET_OBSERVE_WRITE; + mask |= SOCKET_OBSERVE_WRITE; if (m_cs->io_pending & CS_WANT_READ) - mask |= YAZ_SOCKET_OBSERVE_READ; + mask |= SOCKET_OBSERVE_READ; if (!mask) { // accept is complete. turn to ready state and write if needed m_state = Ready; @@ -109,14 +112,15 @@ void Yaz_PDU_Assoc::socketNotify(int event) } else { // accept still incomplete. + yaz_log(m_log, "maskObserver 2"); m_socketObservable->maskObserver(this, - mask|YAZ_SOCKET_OBSERVE_EXCEPT); + mask|SOCKET_OBSERVE_EXCEPT); } } break; case Connecting: - if (event & YAZ_SOCKET_OBSERVE_READ && - event & YAZ_SOCKET_OBSERVE_WRITE) + if (event & SOCKET_OBSERVE_READ && + event & SOCKET_OBSERVE_WRITE) { // For Unix: if both read and write is set, then connect failed. close(); @@ -124,15 +128,16 @@ void Yaz_PDU_Assoc::socketNotify(int event) } else { - yaz_log (m_log, "cs_connect again"); - int res = cs_connect (m_cs, 0); + yaz_log (m_log, "cs_rcvconnect"); + int res = cs_rcvconnect (m_cs); if (res == 1) { - unsigned mask = YAZ_SOCKET_OBSERVE_EXCEPT; + unsigned mask = SOCKET_OBSERVE_EXCEPT; if (m_cs->io_pending & CS_WANT_WRITE) - mask |= YAZ_SOCKET_OBSERVE_WRITE; + mask |= SOCKET_OBSERVE_WRITE; if (m_cs->io_pending & CS_WANT_READ) - mask |= YAZ_SOCKET_OBSERVE_READ; + mask |= SOCKET_OBSERVE_READ; + yaz_log(m_log, "maskObserver 3"); m_socketObservable->maskObserver(this, mask); } else @@ -145,7 +150,7 @@ void Yaz_PDU_Assoc::socketNotify(int event) } break; case Listen: - if (event & YAZ_SOCKET_OBSERVE_READ) + if (event & SOCKET_OBSERVE_READ) { int res; COMSTACK new_line; @@ -154,7 +159,7 @@ void Yaz_PDU_Assoc::socketNotify(int event) return; if (res < 0) { - yaz_log(LOG_FATAL|LOG_ERRNO, "cs_listen failed"); + yaz_log(YLOG_FATAL|YLOG_ERRNO, "cs_listen failed"); return; } if (!(new_line = cs_accept(m_cs))) @@ -171,28 +176,29 @@ void Yaz_PDU_Assoc::socketNotify(int event) } break; case Writing: - if (event & (YAZ_SOCKET_OBSERVE_READ|YAZ_SOCKET_OBSERVE_WRITE)) + if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE)) flush_PDU(); break; case Ready: - if (event & (YAZ_SOCKET_OBSERVE_READ|YAZ_SOCKET_OBSERVE_WRITE)) + if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE)) { do { int res = cs_get (m_cs, &m_input_buf, &m_input_len); if (res == 1) { - unsigned mask = YAZ_SOCKET_OBSERVE_EXCEPT; + unsigned mask = SOCKET_OBSERVE_EXCEPT; if (m_cs->io_pending & CS_WANT_WRITE) - mask |= YAZ_SOCKET_OBSERVE_WRITE; + mask |= SOCKET_OBSERVE_WRITE; if (m_cs->io_pending & CS_WANT_READ) - mask |= YAZ_SOCKET_OBSERVE_READ; + mask |= SOCKET_OBSERVE_READ; + yaz_log(m_log, "maskObserver 4"); m_socketObservable->maskObserver(this, mask); return; } else if (res <= 0) { - yaz_log (m_log, "Yaz_PDU_Assoc::Connection closed by peer"); + yaz_log (m_log, "PDU_Assoc::Connection closed by peer"); close(); if (m_PDU_Observer) m_PDU_Observer->failNotify(); // problem here.. @@ -204,16 +210,26 @@ void Yaz_PDU_Assoc::socketNotify(int event) if (!m_PDU_Observer) return; - +#if 0 + PDU_Queue **pq = &m_queue_in; + while (*pq) + pq = &(*pq)->m_next; + + *pq = new PDU_Queue(m_input_buf, res); +#else m_PDU_Observer->recv_PDU(m_input_buf, res); - m_destroyed = 0; +#endif if (destroyed) // it really was destroyed, return now. return; + m_destroyed = 0; } while (m_cs && cs_more (m_cs)); - if (m_cs) + if (m_cs && m_state == Ready) + { + yaz_log(m_log, "maskObserver 5"); m_socketObservable->maskObserver(this, - YAZ_SOCKET_OBSERVE_EXCEPT| - YAZ_SOCKET_OBSERVE_READ); + SOCKET_OBSERVE_EXCEPT| + SOCKET_OBSERVE_READ); + } } break; case Closed: @@ -228,9 +244,9 @@ void Yaz_PDU_Assoc::socketNotify(int event) } } -void Yaz_PDU_Assoc::close() +void PDU_Assoc::close() { - Yaz_PDU_Assoc *ch; + PDU_Assoc *ch; for (ch = m_children; ch; ch = ch->m_next) ch->close(); @@ -238,7 +254,7 @@ void Yaz_PDU_Assoc::close() m_state = Closed; if (m_cs) { - yaz_log (m_log, "Yaz_PDU_Assoc::close fd=%d", cs_fileno(m_cs)); + yaz_log (m_log, "PDU_Assoc::close fd=%d", cs_fileno(m_cs)); cs_close (m_cs); } m_cs = 0; @@ -253,13 +269,13 @@ void Yaz_PDU_Assoc::close() m_input_len = 0; } -void Yaz_PDU_Assoc::destroy() +void PDU_Assoc::destroy() { close(); if (m_destroyed) *m_destroyed = 1; - Yaz_PDU_Assoc **c; + PDU_Assoc **c; // delete from parent's child list (if any) if (m_parent) @@ -276,28 +292,28 @@ void Yaz_PDU_Assoc::destroy() c = &m_children; while (*c) { - Yaz_PDU_Assoc *here = *c; + PDU_Assoc *here = *c; *c = (*c)->m_next; here->m_parent = 0; delete here; } - yaz_log (m_log, "Yaz_PDU_Assoc::destroy this=%p", this); + yaz_log (m_log, "PDU_Assoc::destroy this=%p", this); } -Yaz_PDU_Assoc::PDU_Queue::PDU_Queue(const char *buf, int len) +PDU_Assoc::PDU_Queue::PDU_Queue(const char *buf, int len) { - m_buf = (char *) malloc (len); + m_buf = (char *) xmalloc (len); memcpy (m_buf, buf, len); m_len = len; m_next = 0; } -Yaz_PDU_Assoc::PDU_Queue::~PDU_Queue() +PDU_Assoc::PDU_Queue::~PDU_Queue() { - free (m_buf); + xfree (m_buf); } -int Yaz_PDU_Assoc::flush_PDU() +int PDU_Assoc::flush_PDU() { int r; @@ -311,54 +327,60 @@ int Yaz_PDU_Assoc::flush_PDU() { m_state = Ready; yaz_log (m_log, "YAZ_PDU_Assoc::flush_PDU queue empty"); - m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ| - YAZ_SOCKET_OBSERVE_WRITE| - YAZ_SOCKET_OBSERVE_EXCEPT); + yaz_log(m_log, "maskObserver 6"); + m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ| + SOCKET_OBSERVE_WRITE| + SOCKET_OBSERVE_EXCEPT); return 0; } r = cs_put (m_cs, q->m_buf, q->m_len); if (r < 0) { - yaz_log (m_log, "Yaz_PDU_Assoc::flush_PDU cs_put failed"); + yaz_log (m_log, "PDU_Assoc::flush_PDU cs_put failed"); close(); m_PDU_Observer->failNotify(); return r; } if (r == 1) { - unsigned mask = YAZ_SOCKET_OBSERVE_EXCEPT; + unsigned mask = SOCKET_OBSERVE_EXCEPT; m_state = Writing; if (m_cs->io_pending & CS_WANT_WRITE) - mask |= YAZ_SOCKET_OBSERVE_WRITE; + mask |= SOCKET_OBSERVE_WRITE; if (m_cs->io_pending & CS_WANT_READ) - mask |= YAZ_SOCKET_OBSERVE_READ; - + mask |= SOCKET_OBSERVE_READ; + + mask |= SOCKET_OBSERVE_WRITE; + yaz_log(m_log, "maskObserver 7"); m_socketObservable->maskObserver(this, mask); - yaz_log (m_log, "Yaz_PDU_Assoc::flush_PDU cs_put %d bytes (incomp)", - q->m_len); + yaz_log (m_log, "PDU_Assoc::flush_PDU cs_put %d bytes fd=%d (inc)", + q->m_len, cs_fileno(m_cs)); return r; } - m_state = Ready; - yaz_log (m_log, "Yaz_PDU_Assoc::flush_PDU cs_put %d bytes", q->m_len); + yaz_log (m_log, "PDU_Assoc::flush_PDU cs_put %d bytes", q->m_len); // whole packet sent... delete this and proceed to next ... m_queue_out = q->m_next; delete q; // don't select on write if queue is empty ... if (!m_queue_out) - m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ| - YAZ_SOCKET_OBSERVE_EXCEPT); + { + m_state = Ready; + yaz_log(m_log, "maskObserver 8"); + m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ| + SOCKET_OBSERVE_EXCEPT); + } return r; } -int Yaz_PDU_Assoc::send_PDU(const char *buf, int len) +int PDU_Assoc::send_PDU(const char *buf, int len) { - yaz_log (m_log, "Yaz_PDU_Assoc::send_PDU"); + yaz_log (m_log, "PDU_Assoc::send_PDU"); PDU_Queue **pq = &m_queue_out; int is_idle = (*pq ? 0 : 1); if (!m_cs) { - yaz_log (m_log, "Yaz_PDU_Assoc::send_PDU failed, m_cs == 0"); + yaz_log (m_log, "PDU_Assoc::send_PDU failed, m_cs == 0"); return -1; } while (*pq) @@ -367,84 +389,94 @@ int Yaz_PDU_Assoc::send_PDU(const char *buf, int len) if (is_idle) return flush_PDU (); else - yaz_log (m_log, "Yaz_PDU_Assoc::cannot send_PDU fd=%d", + yaz_log (m_log, "PDU_Assoc::cannot send_PDU fd=%d", cs_fileno(m_cs)); return 0; } -COMSTACK Yaz_PDU_Assoc::comstack(const char *type_and_host, void **vp) +COMSTACK PDU_Assoc::comstack(const char *type_and_host, void **vp) { - return cs_create_host(type_and_host, 0, vp); + return cs_create_host(type_and_host, 2, vp); } -void Yaz_PDU_Assoc::listen(IYaz_PDU_Observer *observer, - const char *addr) +int PDU_Assoc::listen(IPDU_Observer *observer, const char *addr) { close(); - yaz_log (LOG_LOG, "Adding listener %s", addr); - m_PDU_Observer = observer; void *ap; m_cs = comstack(addr, &ap); if (!m_cs) - return; + return -1; if (cs_bind(m_cs, ap, CS_SERVER) < 0) - return; + return -2; m_socketObservable->addObserver(cs_fileno(m_cs), this); - m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ| - YAZ_SOCKET_OBSERVE_EXCEPT); - yaz_log (m_log, "Yaz_PDU_Assoc::listen ok fd=%d", cs_fileno(m_cs)); + yaz_log(m_log, "maskObserver 9"); + m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ| + SOCKET_OBSERVE_EXCEPT); + yaz_log (m_log, "PDU_Assoc::listen ok fd=%d", cs_fileno(m_cs)); m_state = Listen; + return 0; } -void Yaz_PDU_Assoc::idleTime(int idleTime) +void PDU_Assoc::idleTime(int idleTime) { m_idleTime = idleTime; - yaz_log (m_log, "Yaz_PDU_Assoc::idleTime(%d)", idleTime); + yaz_log (m_log, "PDU_Assoc::idleTime(%d)", idleTime); m_socketObservable->timeoutObserver(this, m_idleTime); } -void Yaz_PDU_Assoc::connect(IYaz_PDU_Observer *observer, - const char *addr) +int PDU_Assoc::connect(IPDU_Observer *observer, const char *addr) { - yaz_log (m_log, "Yaz_PDU_Assoc::connect %s", addr); + yaz_log (m_log, "PDU_Assoc::connect %s", addr); close(); m_PDU_Observer = observer; void *ap; m_cs = comstack(addr, &ap); + if (!m_cs) + return -1; int res = cs_connect (m_cs, ap); - yaz_log (m_log, "Yaz_PDU_Assoc::connect fd=%d res=%d", cs_fileno(m_cs), + yaz_log (m_log, "PDU_Assoc::connect fd=%d res=%d", cs_fileno(m_cs), res); m_socketObservable->addObserver(cs_fileno(m_cs), this); - if (res >= 0) - { // Connect pending or complet + if (res == 0) + { // Connect complete + m_state = Connecting; + unsigned mask = SOCKET_OBSERVE_EXCEPT; + mask |= SOCKET_OBSERVE_WRITE; + mask |= SOCKET_OBSERVE_READ; + yaz_log(m_log, "maskObserver 11"); + m_socketObservable->maskObserver(this, mask); + } + else if (res > 0) + { // Connect pending m_state = Connecting; - unsigned mask = YAZ_SOCKET_OBSERVE_EXCEPT; + unsigned mask = SOCKET_OBSERVE_EXCEPT; if (m_cs->io_pending & CS_WANT_WRITE) - mask |= YAZ_SOCKET_OBSERVE_WRITE; + mask |= SOCKET_OBSERVE_WRITE; if (m_cs->io_pending & CS_WANT_READ) - mask |= YAZ_SOCKET_OBSERVE_READ; + mask |= SOCKET_OBSERVE_READ; + yaz_log(m_log, "maskObserver 11"); m_socketObservable->maskObserver(this, mask); } else { // Connect failed immediately // Since m_state is Closed we can distinguish this case from // normal connect in socketNotify handler - m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_WRITE| - YAZ_SOCKET_OBSERVE_EXCEPT); + yaz_log(m_log, "maskObserver 12"); + m_socketObservable->maskObserver(this, SOCKET_OBSERVE_WRITE| + SOCKET_OBSERVE_EXCEPT); } + return 0; } // Single-threaded... Only useful for non-blocking handlers -void Yaz_PDU_Assoc::childNotify(COMSTACK cs) +void PDU_Assoc::childNotify(COMSTACK cs) { - - - Yaz_PDU_Assoc *new_observable = - new Yaz_PDU_Assoc (m_socketObservable, cs); + PDU_Assoc *new_observable = + new PDU_Assoc (m_socketObservable, cs); new_observable->m_next = m_children; m_children = new_observable; @@ -454,3 +486,10 @@ void Yaz_PDU_Assoc::childNotify(COMSTACK cs) new_observable->m_PDU_Observer = m_PDU_Observer->sessionNotify (new_observable, cs_fileno(cs)); } + +const char*PDU_Assoc::getpeername() +{ + if (!m_cs) + return 0; + return cs_addrstr(m_cs); +}