1 /* This file is part of the yazpp toolkit.
2 * Copyright (C) 1998-2012 Index Data and Mike Taylor
3 * See the file LICENSE for details.
12 #include <yaz/tcpip.h>
14 #include <yazpp/pdu-assoc.h>
20 using namespace yazpp_1;
23 class PDU_Assoc_priv {
24 friend class PDU_Assoc;
36 PDU_Queue(const char *buf, int len);
42 PDU_Assoc *pdu_parent;
43 PDU_Assoc *pdu_children;
46 yazpp_1::ISocketObservable *m_socketObservable;
54 void init(yazpp_1::ISocketObservable *socketObservable);
55 COMSTACK comstack(const char *type_and_host, void **vp);
56 bool m_session_is_dead;
60 void PDU_Assoc_priv::init(ISocketObservable *socketObservable)
64 m_socketObservable = socketObservable;
75 m_session_is_dead = false;
78 PDU_Assoc::~PDU_Assoc()
83 PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable)
86 m_p = new PDU_Assoc_priv;
87 m_p->init(socketObservable);
90 PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable,
94 m_p = new PDU_Assoc_priv;
95 m_p->init(socketObservable);
98 if (cs->io_pending & CS_WANT_WRITE)
99 mask |= SOCKET_OBSERVE_WRITE;
100 if (cs->io_pending & CS_WANT_READ)
101 mask |= SOCKET_OBSERVE_READ;
102 m_p->m_socketObservable->addObserver(cs_fileno(cs), this);
105 yaz_log(m_p->log, "new PDU_Assoc. Ready");
106 m_p->state = PDU_Assoc_priv::Ready;
111 yaz_log(m_p->log, "new PDU_Assoc. Accepting");
112 // assume comstack is accepting...
113 m_p->state = PDU_Assoc_priv::Accepting;
114 m_p->m_socketObservable->addObserver(cs_fileno(cs), this);
115 yaz_log(m_p->log, "maskObserver 1");
116 m_p->m_socketObservable->maskObserver(this,
117 mask|SOCKET_OBSERVE_EXCEPT);
122 IPDU_Observable *PDU_Assoc::clone()
124 PDU_Assoc *copy = new PDU_Assoc(m_p->m_socketObservable);
128 void PDU_Assoc::socketNotify(int event)
130 yaz_log(m_p->log, "PDU_Assoc::socketNotify p=%p state=%d event = %d",
131 this, m_p->state, event);
132 if (event & SOCKET_OBSERVE_EXCEPT)
135 m_PDU_Observer->failNotify();
138 else if (event & SOCKET_OBSERVE_TIMEOUT)
140 m_PDU_Observer->timeoutNotify();
145 case PDU_Assoc_priv::Accepting:
146 if (!cs_accept(m_p->cs))
148 yaz_log(m_p->log, "PDU_Assoc::cs_accept failed");
151 m_PDU_Observer->failNotify();
156 if (m_p->cs->io_pending & CS_WANT_WRITE)
157 mask |= SOCKET_OBSERVE_WRITE;
158 if (m_p->cs->io_pending & CS_WANT_READ)
159 mask |= SOCKET_OBSERVE_READ;
161 { // accept is complete. turn to ready state and write if needed
162 m_p->state = PDU_Assoc_priv::Ready;
166 { // accept still incomplete.
167 yaz_log(m_p->log, "maskObserver 2");
168 m_p->m_socketObservable->maskObserver(this,
169 mask|SOCKET_OBSERVE_EXCEPT);
173 case PDU_Assoc_priv::Connecting:
174 if (event & SOCKET_OBSERVE_READ &&
175 event & SOCKET_OBSERVE_WRITE)
177 // For Unix: if both read and write is set, then connect failed.
179 m_PDU_Observer->failNotify();
183 yaz_log(m_p->log, "cs_rcvconnect");
184 int res = cs_rcvconnect(m_p->cs);
187 unsigned mask = SOCKET_OBSERVE_EXCEPT;
188 if (m_p->cs->io_pending & CS_WANT_WRITE)
189 mask |= SOCKET_OBSERVE_WRITE;
190 if (m_p->cs->io_pending & CS_WANT_READ)
191 mask |= SOCKET_OBSERVE_READ;
192 yaz_log(m_p->log, "maskObserver 3");
193 m_p->m_socketObservable->maskObserver(this, mask);
197 m_p->state = PDU_Assoc_priv::Ready;
199 m_PDU_Observer->connectNotify();
204 case PDU_Assoc_priv::Listen:
205 if (event & SOCKET_OBSERVE_READ)
210 if ((res = cs_listen(m_p->cs, 0, 0)) == 1)
214 yaz_log(YLOG_FATAL|YLOG_ERRNO, "cs_listen failed");
217 if (!(new_line = cs_accept(m_p->cs)))
219 /* 1. create socket-manager
221 3. create top-level object
222 setup observer for child fileid in pdu-assoc
225 yaz_log(m_p->log, "new session: parent fd=%d child fd=%d",
226 cs_fileno(m_p->cs), cs_fileno(new_line));
227 childNotify(new_line);
230 case PDU_Assoc_priv::Writing:
231 if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
234 case PDU_Assoc_priv::Ready:
235 if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
239 int res = cs_get(m_p->cs, &m_p->input_buf, &m_p->input_len);
242 unsigned mask = SOCKET_OBSERVE_EXCEPT;
243 if (m_p->cs->io_pending & CS_WANT_WRITE)
244 mask |= SOCKET_OBSERVE_WRITE;
245 if (m_p->cs->io_pending & CS_WANT_READ)
246 mask |= SOCKET_OBSERVE_READ;
247 yaz_log(m_p->log, "maskObserver 4");
248 m_p->m_socketObservable->maskObserver(this, mask);
253 yaz_log(m_p->log, "PDU_Assoc::Connection closed by peer");
256 m_PDU_Observer->failNotify(); // problem here..
259 // lock it, so we know if recv_PDU deletes it.
261 m_p->destroyed = &destroyed;
266 PDU_Assoc_priv::PDU_Queue **pq = &m_p->m_queue_in;
270 *pq = new PDU_Assoc_priv::PDU_Queue(m_p->m_input_buf, res);
272 m_PDU_Observer->recv_PDU(m_p->input_buf, res);
274 if (destroyed) // it really was destroyed, return now.
277 } while (m_p->cs && cs_more(m_p->cs));
278 if (m_p->cs && m_p->state == PDU_Assoc_priv::Ready)
280 yaz_log(m_p->log, "maskObserver 5");
281 m_p->m_socketObservable->maskObserver(this,
282 SOCKET_OBSERVE_EXCEPT|
283 SOCKET_OBSERVE_READ);
287 case PDU_Assoc_priv::Closed:
288 yaz_log(m_p->log, "CLOSING state=%d event was %d", m_p->state,
291 m_PDU_Observer->failNotify();
294 yaz_log(m_p->log, "Unknown state=%d event was %d", m_p->state, event);
296 m_PDU_Observer->failNotify();
300 void PDU_Assoc::close_session()
302 m_p->m_session_is_dead = true;
306 m_PDU_Observer->failNotify();
310 void PDU_Assoc::shutdown()
313 for (ch = m_p->pdu_children; ch; ch = ch->m_p->pdu_next)
316 m_p->m_socketObservable->deleteObserver(this);
317 m_p->state = PDU_Assoc_priv::Closed;
320 yaz_log(m_p->log, "PDU_Assoc::close fd=%d", cs_fileno(m_p->cs));
324 while (m_p->queue_out)
326 PDU_Assoc_priv::PDU_Queue *q_this = m_p->queue_out;
327 m_p->queue_out = m_p->queue_out->m_next;
330 xfree(m_p->input_buf);
335 void PDU_Assoc::destroy()
343 // delete from parent's child list (if any)
346 c = &m_p->pdu_parent->m_p->pdu_children;
350 c = &(*c)->m_p->pdu_next;
352 *c = (*c)->m_p->pdu_next;
354 // delete all children ...
355 c = &m_p->pdu_children;
358 PDU_Assoc *here = *c;
359 *c = (*c)->m_p->pdu_next;
360 here->m_p->pdu_parent = 0;
363 yaz_log(m_p->log, "PDU_Assoc::destroy this=%p", this);
366 PDU_Assoc_priv::PDU_Queue::PDU_Queue(const char *buf, int len)
368 m_buf = (char *) xmalloc(len);
369 memcpy(m_buf, buf, len);
374 PDU_Assoc_priv::PDU_Queue::~PDU_Queue()
379 int PDU_Assoc::flush_PDU()
383 if (m_p->state != PDU_Assoc_priv::Ready && m_p->state != PDU_Assoc_priv::Writing)
385 yaz_log(m_p->log, "YAZ_PDU_Assoc::flush_PDU, not ready");
388 PDU_Assoc_priv::PDU_Queue *q = m_p->queue_out;
391 m_p->state = PDU_Assoc_priv::Ready;
392 yaz_log(m_p->log, "YAZ_PDU_Assoc::flush_PDU queue empty");
393 yaz_log(m_p->log, "maskObserver 6");
394 m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
395 SOCKET_OBSERVE_WRITE|
396 SOCKET_OBSERVE_EXCEPT);
397 if (m_p->m_session_is_dead)
400 m_PDU_Observer->failNotify();
404 r = cs_put(m_p->cs, q->m_buf, q->m_len);
407 yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put failed");
409 m_PDU_Observer->failNotify();
414 unsigned mask = SOCKET_OBSERVE_EXCEPT;
415 m_p->state = PDU_Assoc_priv::Writing;
416 if (m_p->cs->io_pending & CS_WANT_WRITE)
417 mask |= SOCKET_OBSERVE_WRITE;
418 if (m_p->cs->io_pending & CS_WANT_READ)
419 mask |= SOCKET_OBSERVE_READ;
421 mask |= SOCKET_OBSERVE_WRITE;
422 yaz_log(m_p->log, "maskObserver 7");
423 m_p->m_socketObservable->maskObserver(this, mask);
424 yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put %d bytes fd=%d (inc)",
425 q->m_len, cs_fileno(m_p->cs));
428 yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put %d bytes", q->m_len);
429 // whole packet sent... delete this and proceed to next ...
430 m_p->queue_out = q->m_next;
432 // don't select on write if queue is empty ...
435 m_p->state = PDU_Assoc_priv::Ready;
436 yaz_log(m_p->log, "maskObserver 8");
437 m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
438 SOCKET_OBSERVE_EXCEPT);
439 if (m_p->m_session_is_dead)
445 int PDU_Assoc::send_PDU(const char *buf, int len)
447 yaz_log(m_p->log, "PDU_Assoc::send_PDU");
448 PDU_Assoc_priv::PDU_Queue **pq = &m_p->queue_out;
449 int is_idle = (*pq ? 0 : 1);
453 yaz_log(m_p->log, "PDU_Assoc::send_PDU failed, cs == 0");
458 *pq = new PDU_Assoc_priv::PDU_Queue(buf, len);
462 yaz_log(m_p->log, "PDU_Assoc::cannot send_PDU fd=%d",
467 COMSTACK PDU_Assoc_priv::comstack(const char *type_and_host, void **vp)
469 return cs_create_host(type_and_host, 2, vp);
472 int PDU_Assoc::listen(IPDU_Observer *observer, const char *addr)
476 m_p->m_socketObservable->deleteObserver(this);
477 m_p->state = PDU_Assoc_priv::Closed;
480 yaz_log(m_p->log, "PDU_Assoc::close fd=%d", cs_fileno(m_p->cs));
484 while (m_p->queue_out)
486 PDU_Assoc_priv::PDU_Queue *q_this = m_p->queue_out;
487 m_p->queue_out = m_p->queue_out->m_next;
490 xfree(m_p->input_buf);
499 m_PDU_Observer = observer;
501 m_p->cs = m_p->comstack(addr, &ap);
505 if (cs_bind(m_p->cs, ap, CS_SERVER) < 0)
508 int fd = cs_fileno(m_p->cs);
510 int oldflags = fcntl(fd, F_GETFD, 0);
513 oldflags |= FD_CLOEXEC;
514 fcntl(fd, F_SETFD, oldflags);
517 m_p->m_socketObservable->addObserver(fd, this);
518 yaz_log(m_p->log, "maskObserver 9");
519 m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
520 SOCKET_OBSERVE_EXCEPT);
521 yaz_log(m_p->log, "PDU_Assoc::listen ok fd=%d", fd);
522 m_p->state = PDU_Assoc_priv::Listen;
526 COMSTACK PDU_Assoc::get_comstack()
531 void PDU_Assoc::idleTime(int idleTime)
533 m_p->idleTime = idleTime;
534 yaz_log(m_p->log, "PDU_Assoc::idleTime(%d)", idleTime);
535 m_p->m_socketObservable->timeoutObserver(this, m_p->idleTime);
538 int PDU_Assoc::connect(IPDU_Observer *observer, const char *addr)
540 yaz_log(m_p->log, "PDU_Assoc::connect %s", addr);
542 m_PDU_Observer = observer;
544 m_p->cs = m_p->comstack(addr, &ap);
547 int res = cs_connect(m_p->cs, ap);
548 yaz_log(m_p->log, "PDU_Assoc::connect fd=%d res=%d", cs_fileno(m_p->cs),
550 m_p->m_socketObservable->addObserver(cs_fileno(m_p->cs), this);
553 { // Connect complete
554 m_p->state = PDU_Assoc_priv::Connecting;
555 unsigned mask = SOCKET_OBSERVE_EXCEPT;
556 mask |= SOCKET_OBSERVE_WRITE;
557 mask |= SOCKET_OBSERVE_READ;
558 yaz_log(m_p->log, "maskObserver 11");
559 m_p->m_socketObservable->maskObserver(this, mask);
563 m_p->state = PDU_Assoc_priv::Connecting;
564 unsigned mask = SOCKET_OBSERVE_EXCEPT;
565 if (m_p->cs->io_pending & CS_WANT_WRITE)
566 mask |= SOCKET_OBSERVE_WRITE;
567 if (m_p->cs->io_pending & CS_WANT_READ)
568 mask |= SOCKET_OBSERVE_READ;
569 yaz_log(m_p->log, "maskObserver 11");
570 m_p->m_socketObservable->maskObserver(this, mask);
573 { // Connect failed immediately
574 // Since m_state is Closed we can distinguish this case from
575 // normal connect in socketNotify handler
576 yaz_log(m_p->log, "maskObserver 12");
577 m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_WRITE|
578 SOCKET_OBSERVE_EXCEPT);
583 // Single-threaded... Only useful for non-blocking handlers
584 void PDU_Assoc::childNotify(COMSTACK cs)
586 PDU_Assoc *new_observable =
587 new PDU_Assoc(m_p->m_socketObservable, cs);
589 // Clone PDU Observer
590 new_observable->m_PDU_Observer = m_PDU_Observer->sessionNotify
591 (new_observable, cs_fileno(cs));
593 if (!new_observable->m_PDU_Observer)
595 new_observable->shutdown();
596 delete new_observable;
599 new_observable->m_p->pdu_next = m_p->pdu_children;
600 m_p->pdu_children = new_observable;
601 new_observable->m_p->pdu_parent = this;
604 const char*PDU_Assoc::getpeername()
608 return cs_addrstr(m_p->cs);
613 * c-file-style: "Stroustrup"
614 * indent-tabs-mode: nil
616 * vim: shiftwidth=4 tabstop=8 expandtab