2 * Copyright (c) 1998-2003, Index Data.
3 * See the file LICENSE for details.
5 * $Id: yaz-proxy.cpp,v 1.56 2003-10-10 17:58:29 adam Exp $
11 #include <yaz/marcdisp.h>
12 #include <yaz/yaz-iconv.h>
14 #include <yaz/diagbib1.h>
15 #include <yaz++/proxy.h>
17 static const char *apdu_name(Z_APDU *apdu)
21 case Z_APDU_initRequest:
23 case Z_APDU_initResponse:
24 return "initResponse";
25 case Z_APDU_searchRequest:
26 return "searchRequest";
27 case Z_APDU_searchResponse:
28 return "searchResponse";
29 case Z_APDU_presentRequest:
30 return "presentRequest";
31 case Z_APDU_presentResponse:
32 return "presentResponse";
33 case Z_APDU_deleteResultSetRequest:
34 return "deleteResultSetRequest";
35 case Z_APDU_deleteResultSetResponse:
36 return "deleteResultSetResponse";
37 case Z_APDU_scanRequest:
39 case Z_APDU_scanResponse:
40 return "scanResponse";
41 case Z_APDU_sortRequest:
43 case Z_APDU_sortResponse:
44 return "sortResponse";
45 case Z_APDU_extendedServicesRequest:
46 return "extendedServicesRequest";
47 case Z_APDU_extendedServicesResponse:
48 return "extendedServicesResponse";
55 Yaz_Proxy::Yaz_Proxy(IYaz_PDU_Observable *the_PDU_Observable) :
56 Yaz_Z_Assoc(the_PDU_Observable), m_bw_stat(60), m_pdu_stat(60)
58 m_PDU_Observable = the_PDU_Observable;
63 m_keepalive_limit_bw = 500000;
64 m_keepalive_limit_pdu = 1000;
67 m_proxy_authentication = 0;
70 m_client_idletime = 600;
71 m_target_idletime = 600;
72 m_optimize = xstrdup ("1");
73 strcpy(m_session_str, "x");
75 m_bytes_sent = m_bytes_recv = 0;
79 m_max_record_retrieve = 0;
83 m_invalid_session = 0;
88 Yaz_Proxy::~Yaz_Proxy()
90 yaz_log(LOG_LOG, "%sClosed %d/%d sent/recv bytes total", m_session_str,
91 m_bytes_sent, m_bytes_recv);
92 xfree (m_proxyTarget);
93 xfree (m_default_target);
94 xfree (m_proxy_authentication);
99 int Yaz_Proxy::set_config(const char *config)
102 m_config = new Yaz_ProxyConfig();
103 xfree(m_config_fname);
104 m_config_fname = xstrdup(config);
105 int r = m_config->read_xml(config);
109 void Yaz_Proxy::set_default_target(const char *target)
111 xfree (m_default_target);
112 m_default_target = 0;
114 m_default_target = (char *) xstrdup (target);
117 void Yaz_Proxy::set_proxy_authentication (const char *auth)
119 xfree (m_proxy_authentication);
120 m_proxy_authentication = 0;
122 m_proxy_authentication = (char *) xstrdup (auth);
125 Yaz_ProxyConfig *Yaz_Proxy::check_reconfigure()
128 return m_parent->check_reconfigure();
130 Yaz_ProxyConfig *cfg = m_config;
133 yaz_log(LOG_LOG, "reconfigure");
135 if (m_config_fname && cfg)
137 yaz_log(LOG_LOG, "reconfigure config %s", m_config_fname);
138 int r = cfg->read_xml(m_config_fname);
140 yaz_log(LOG_WARN, "reconfigure failed");
143 yaz_log(LOG_LOG, "reconfigure");
149 IYaz_PDU_Observer *Yaz_Proxy::sessionNotify(IYaz_PDU_Observable
150 *the_PDU_Observable, int fd)
153 Yaz_Proxy *new_proxy = new Yaz_Proxy(the_PDU_Observable);
154 new_proxy->m_parent = this;
155 new_proxy->m_config = 0;
156 new_proxy->m_config_fname = 0;
157 new_proxy->timeout(m_client_idletime);
158 new_proxy->m_target_idletime = m_target_idletime;
159 new_proxy->set_default_target(m_default_target);
160 new_proxy->set_APDU_log(get_APDU_log());
161 new_proxy->set_proxy_authentication(m_proxy_authentication);
162 sprintf(new_proxy->m_session_str, "%ld:%d ", (long) time(0), m_session_no);
164 yaz_log (LOG_LOG, "%sNew session %s", new_proxy->m_session_str,
165 the_PDU_Observable->getpeername());
169 char *Yaz_Proxy::get_cookie(Z_OtherInformation **otherInfo)
172 Z_OtherInformationUnit *oi;
174 ent.proto = PROTO_Z3950;
175 ent.oclass = CLASS_USERINFO;
176 ent.value = (oid_value) VAL_COOKIE;
177 assert (oid_ent_to_oid (&ent, oid));
179 if (oid_ent_to_oid (&ent, oid) &&
180 (oi = update_otherInformation(otherInfo, 0, oid, 1, 1)) &&
181 oi->which == Z_OtherInfo_characterInfo)
182 return oi->information.characterInfo;
186 char *Yaz_Proxy::get_proxy(Z_OtherInformation **otherInfo)
189 Z_OtherInformationUnit *oi;
191 ent.proto = PROTO_Z3950;
192 ent.oclass = CLASS_USERINFO;
193 ent.value = (oid_value) VAL_PROXY;
194 if (oid_ent_to_oid (&ent, oid) &&
195 (oi = update_otherInformation(otherInfo, 0, oid, 1, 1)) &&
196 oi->which == Z_OtherInfo_characterInfo)
197 return oi->information.characterInfo;
201 const char *Yaz_Proxy::load_balance(const char **url)
203 int zurl_in_use[MAX_ZURL_PLEX];
207 for (i = 0; i<MAX_ZURL_PLEX; i++)
209 for (c = m_parent->m_clientPool; c; c = c->m_next)
211 for (i = 0; url[i]; i++)
212 if (!strcmp(url[i], c->get_hostname()))
217 for (i = 0; url[i]; i++)
219 yaz_log(LOG_DEBUG, "%szurl=%s use=%d",
220 m_session_str, url[i], zurl_in_use[i]);
221 if (min > zurl_in_use[i])
224 min = zurl_in_use[i];
230 Yaz_ProxyClient *Yaz_Proxy::get_client(Z_APDU *apdu)
233 Yaz_Proxy *parent = m_parent;
234 Z_OtherInformation **oi;
235 Yaz_ProxyClient *c = m_client;
237 get_otherInfoAPDU(apdu, &oi);
238 char *cookie = get_cookie(oi);
242 const char *url[MAX_ZURL_PLEX];
243 const char *proxy_host = get_proxy(oi);
244 Yaz_ProxyConfig *cfg = check_reconfigure();
247 xfree(m_default_target);
248 m_default_target = xstrdup(proxy_host);
249 proxy_host = m_default_target;
251 int client_idletime = -1;
253 cfg->get_target_info(proxy_host, url, &m_bw_max,
254 &m_pdu_max, &m_max_record_retrieve,
255 &m_target_idletime, &client_idletime,
256 &parent->m_max_clients,
257 &m_keepalive_limit_bw,
258 &m_keepalive_limit_pdu);
259 if (client_idletime != -1)
261 m_client_idletime = client_idletime;
262 timeout(m_client_idletime);
266 yaz_log(LOG_LOG, "%sNo default target", m_session_str);
269 // we don't handle multiplexing for cookie session, so we just
270 // pick the first one in this case (anonymous users will be able
271 // to use any backend)
272 if (cookie && *cookie)
273 m_proxyTarget = (char*) xstrdup(url[0]);
275 m_proxyTarget = (char*) xstrdup(load_balance(url));
277 if (cookie && *cookie)
279 Yaz_ProxyClient *cc = 0;
281 for (c = parent->m_clientPool; c; c = c->m_next)
284 assert (*c->m_prev == c);
285 if (c->m_cookie && !strcmp(cookie,c->m_cookie) &&
286 !strcmp(m_proxyTarget, c->get_hostname()))
295 // The following handles "cancel"
296 // If connection is busy (waiting for PDU) and
297 // we have an initRequest we can safely do re-open
298 if (c->m_waiting && apdu->which == Z_APDU_initRequest)
300 yaz_log (LOG_LOG, "%s REOPEN target=%s", m_session_str,
307 c->m_last_resultCount = 0;
308 c->m_sr_transform = 0;
310 c->m_resultSetStartPoint = 0;
311 if (c->client(m_proxyTarget))
316 c->timeout(m_target_idletime);
318 c->m_seqno = parent->m_seqno;
319 if (c->m_server && c->m_server != this)
320 c->m_server->m_client = 0;
323 yaz_log (LOG_DEBUG, "get_client 1 %p %p", this, c);
329 Yaz_ProxyClient *cc = 0;
331 for (c = parent->m_clientPool; c; c = c->m_next)
334 assert (*c->m_prev == c);
335 if (c->m_server == 0 && c->m_cookie == 0 &&
336 !strcmp(m_proxyTarget, c->get_hostname()))
346 yaz_log (LOG_LOG, "%sREUSE %d %d %s",
348 c->m_seqno, parent->m_seqno, c->get_hostname());
350 c->m_seqno = parent->m_seqno;
351 assert(c->m_server == 0);
360 if (apdu->which != Z_APDU_initRequest)
362 yaz_log (LOG_LOG, "no first INIT!");
365 Z_InitRequest *initRequest = apdu->u.initRequest;
367 if (!initRequest->idAuthentication)
369 if (m_proxy_authentication)
371 initRequest->idAuthentication =
372 (Z_IdAuthentication *)
373 odr_malloc (odr_encode(),
374 sizeof(*initRequest->idAuthentication));
375 initRequest->idAuthentication->which =
376 Z_IdAuthentication_open;
377 initRequest->idAuthentication->u.open =
378 odr_strdup (odr_encode(), m_proxy_authentication);
382 // go through list of clients - and find the lowest/oldest one.
383 Yaz_ProxyClient *c_min = 0;
385 int no_of_clients = 0;
386 if (parent->m_clientPool)
387 yaz_log (LOG_DEBUG, "Existing sessions");
388 for (c = parent->m_clientPool; c; c = c->m_next)
390 yaz_log (LOG_DEBUG, " Session %-3d wait=%d %s cookie=%s", c->m_seqno,
391 c->m_waiting, c->get_hostname(),
392 c->m_cookie ? c->m_cookie : "");
394 if (min_seq < 0 || c->m_seqno < min_seq)
396 min_seq = c->m_seqno;
400 if (no_of_clients >= parent->m_max_clients)
403 if (c->m_waiting || strcmp(m_proxyTarget, c->get_hostname()))
405 yaz_log (LOG_LOG, "%sMAXCLIENTS Destroy %d",
406 m_session_str, c->m_seqno);
407 if (c->m_server && c->m_server != this)
413 yaz_log (LOG_LOG, "%sMAXCLIENTS Reuse %d %d %s",
415 c->m_seqno, parent->m_seqno, c->get_hostname());
419 c->m_cookie = xstrdup(cookie);
420 c->m_seqno = parent->m_seqno;
421 if (c->m_server && c->m_server != this)
423 c->m_server->m_client = 0;
432 yaz_log (LOG_LOG, "%sNEW %d %s",
433 m_session_str, parent->m_seqno, m_proxyTarget);
434 c = new Yaz_ProxyClient(m_PDU_Observable->clone());
435 c->m_next = parent->m_clientPool;
437 c->m_next->m_prev = &c->m_next;
438 parent->m_clientPool = c;
439 c->m_prev = &parent->m_clientPool;
445 c->m_cookie = xstrdup(cookie);
447 c->m_seqno = parent->m_seqno;
449 c->m_last_resultCount = 0;
452 c->m_sr_transform = 0;
454 c->m_resultSetStartPoint = 0;
456 if (c->client(m_proxyTarget))
464 yaz_log (LOG_DEBUG, "get_client 3 %p %p", this, c);
468 void Yaz_Proxy::display_diagrecs(Z_DiagRec **pp, int num)
471 for (i = 0; i<num; i++)
474 Z_DefaultDiagFormat *r;
475 Z_DiagRec *p = pp[i];
476 if (p->which != Z_DiagRec_defaultFormat)
478 yaz_log(LOG_LOG, "%sError no diagnostics", m_session_str);
482 r = p->u.defaultFormat;
483 if (!(ent = oid_getentbyoid(r->diagnosticSetId)) ||
484 ent->oclass != CLASS_DIAGSET || ent->value != VAL_BIB1)
485 yaz_log(LOG_LOG, "%sError unknown diagnostic set", m_session_str);
488 case Z_DefaultDiagFormat_v2Addinfo:
489 yaz_log(LOG_LOG, "%sError %d %s:%s",
491 *r->condition, diagbib1_str(*r->condition),
494 case Z_DefaultDiagFormat_v3Addinfo:
495 yaz_log(LOG_LOG, "%sError %d %s:%s",
497 *r->condition, diagbib1_str(*r->condition),
504 void Yaz_Proxy::convert_to_marcxml(Z_NamePlusRecordList *p)
508 yaz_marc_t mt = yaz_marc_create();
509 yaz_marc_xml(mt, YAZ_MARC_MARCXML);
510 for (i = 0; i < p->num_records; i++)
512 Z_NamePlusRecord *npr = p->records[i];
513 if (npr->which == Z_NamePlusRecord_databaseRecord)
515 Z_External *r = npr->u.databaseRecord;
516 if (r->which == Z_External_octet)
520 if (yaz_marc_decode_buf(mt, (char*) r->u.octet_aligned->buf,
521 r->u.octet_aligned->len,
524 yaz_iconv_t cd = yaz_iconv_open("UTF-8", "MARC-8");
525 WRBUF wrbuf = wrbuf_alloc();
528 size_t inbytesleft = rlen;
529 const char *inp = result;
530 while (cd && inbytesleft)
532 size_t outbytesleft = sizeof(outbuf);
536 r = yaz_iconv (cd, (char**) &inp,
538 &outp, &outbytesleft);
539 if (r == (size_t) (-1))
541 int e = yaz_iconv_error(cd);
542 if (e != YAZ_ICONV_E2BIG)
544 yaz_log(LOG_WARN, "conversion failure");
548 wrbuf_write(wrbuf, outbuf, outp - outbuf);
553 npr->u.databaseRecord = z_ext_record(odr_encode(),
557 wrbuf_free(wrbuf, 1);
562 yaz_marc_destroy(mt);
565 int Yaz_Proxy::send_to_client(Z_APDU *apdu)
568 if (apdu->which == Z_APDU_searchResponse)
570 Z_SearchResponse *sr = apdu->u.searchResponse;
571 Z_Records *p = sr->records;
572 if (p && p->which == Z_Records_NSD)
574 Z_DiagRec dr, *dr_p = &dr;
575 dr.which = Z_DiagRec_defaultFormat;
576 dr.u.defaultFormat = p->u.nonSurrogateDiagnostic;
578 display_diagrecs(&dr_p, 1);
582 if (m_marcxml_flag && p && p->which == Z_Records_DBOSD)
583 convert_to_marcxml(p->u.databaseOrSurDiagnostics);
586 yaz_log(LOG_LOG, "%s%d hits", m_session_str,
588 if (*sr->resultCount < 0)
589 m_invalid_session = 1;
593 else if (apdu->which == Z_APDU_presentResponse)
595 Z_PresentResponse *sr = apdu->u.presentResponse;
596 Z_Records *p = sr->records;
597 if (p && p->which == Z_Records_NSD)
599 Z_DiagRec dr, *dr_p = &dr;
600 dr.which = Z_DiagRec_defaultFormat;
601 dr.u.defaultFormat = p->u.nonSurrogateDiagnostic;
603 display_diagrecs(&dr_p, 1);
605 if (m_marcxml_flag && p && p->which == Z_Records_DBOSD)
606 convert_to_marcxml(p->u.databaseOrSurDiagnostics);
608 int r = send_Z_PDU(apdu, &len);
609 yaz_log (LOG_DEBUG, "%sSending %s to client %d bytes", m_session_str,
610 apdu_name(apdu), len);
612 m_bw_stat.add_bytes(len);
616 int Yaz_ProxyClient::send_to_target(Z_APDU *apdu)
619 int r = send_Z_PDU(apdu, &len);
620 yaz_log (LOG_DEBUG, "%sSending %s to %s %d bytes",
622 apdu_name(apdu), get_hostname(), len);
627 Z_APDU *Yaz_Proxy::result_set_optimize(Z_APDU *apdu)
629 if (*m_parent->m_optimize == '0')
630 return apdu; // don't optimize result sets..
631 if (apdu->which == Z_APDU_presentRequest)
633 Z_PresentRequest *pr = apdu->u.presentRequest;
634 Z_NamePlusRecordList *npr;
635 int toget = *pr->numberOfRecordsRequested;
636 int start = *pr->resultSetStartPoint;
638 if (m_client->m_last_resultSetId &&
639 !strcmp(m_client->m_last_resultSetId, pr->resultSetId))
641 if (m_client->m_cache.lookup (odr_encode(), &npr, start, toget,
642 pr->preferredRecordSyntax,
643 pr->recordComposition))
645 yaz_log (LOG_LOG, "%sReturned cached records for present request",
647 Z_APDU *new_apdu = create_Z_PDU(Z_APDU_presentResponse);
648 new_apdu->u.presentResponse->referenceId = pr->referenceId;
650 new_apdu->u.presentResponse->numberOfRecordsReturned
651 = odr_intdup(odr_encode(), toget);
653 new_apdu->u.presentResponse->records = (Z_Records*)
654 odr_malloc(odr_encode(), sizeof(Z_Records));
655 new_apdu->u.presentResponse->records->which = Z_Records_DBOSD;
656 new_apdu->u.presentResponse->records->u.databaseOrSurDiagnostics = npr;
657 new_apdu->u.presentResponse->nextResultSetPosition =
658 odr_intdup(odr_encode(), start+toget);
660 send_to_client(new_apdu);
666 if (apdu->which != Z_APDU_searchRequest)
668 Z_SearchRequest *sr = apdu->u.searchRequest;
669 Yaz_Z_Query *this_query = new Yaz_Z_Query;
670 Yaz_Z_Databases this_databases;
672 this_databases.set(sr->num_databaseNames, (const char **)
675 this_query->set_Z_Query(sr->query);
678 this_query->print(query_str, sizeof(query_str)-1);
679 yaz_log(LOG_LOG, "%sQuery %s", m_session_str, query_str);
681 if (m_client->m_last_ok && m_client->m_last_query &&
682 m_client->m_last_query->match(this_query) &&
683 !strcmp(m_client->m_last_resultSetId, sr->resultSetName) &&
684 m_client->m_last_databases.match(this_databases))
687 if (m_client->m_last_resultCount > *sr->smallSetUpperBound &&
688 m_client->m_last_resultCount < *sr->largeSetLowerBound)
690 Z_NamePlusRecordList *npr;
691 int toget = *sr->mediumSetPresentNumber;
692 Z_RecordComposition *comp = 0;
694 if (toget > m_client->m_last_resultCount)
695 toget = m_client->m_last_resultCount;
697 if (sr->mediumSetElementSetNames)
699 comp = (Z_RecordComposition *)
700 odr_malloc(odr_encode(), sizeof(Z_RecordComposition));
701 comp->which = Z_RecordComp_simple;
702 comp->u.simple = sr->mediumSetElementSetNames;
705 if (m_client->m_cache.lookup (odr_encode(), &npr, 1, toget,
706 sr->preferredRecordSyntax, comp))
708 yaz_log (LOG_LOG, "%sReturned cached records for medium set",
710 Z_APDU *new_apdu = create_Z_PDU(Z_APDU_searchResponse);
711 new_apdu->u.searchResponse->referenceId = sr->referenceId;
712 new_apdu->u.searchResponse->resultCount =
713 &m_client->m_last_resultCount;
715 new_apdu->u.searchResponse->numberOfRecordsReturned
716 = odr_intdup(odr_encode(), toget);
718 new_apdu->u.searchResponse->presentStatus =
719 odr_intdup(odr_encode(), Z_PresentStatus_success);
720 new_apdu->u.searchResponse->records = (Z_Records*)
721 odr_malloc(odr_encode(), sizeof(Z_Records));
722 new_apdu->u.searchResponse->records->which = Z_Records_DBOSD;
723 new_apdu->u.searchResponse->records->u.databaseOrSurDiagnostics = npr;
724 new_apdu->u.searchResponse->nextResultSetPosition =
725 odr_intdup(odr_encode(), toget+1);
726 send_to_client(new_apdu);
732 // send present request (medium size)
733 yaz_log (LOG_LOG, "%sOptimizing search for medium set",
736 Z_APDU *new_apdu = create_Z_PDU(Z_APDU_presentRequest);
737 Z_PresentRequest *pr = new_apdu->u.presentRequest;
738 pr->referenceId = sr->referenceId;
739 pr->resultSetId = sr->resultSetName;
740 pr->preferredRecordSyntax = sr->preferredRecordSyntax;
741 *pr->numberOfRecordsRequested = toget;
742 pr->recordComposition = comp;
743 m_client->m_sr_transform = 1;
747 else if (m_client->m_last_resultCount >= *sr->largeSetLowerBound ||
748 m_client->m_last_resultCount <= 0)
750 // large set. Return pseudo-search response immediately
751 yaz_log (LOG_LOG, "%sOptimizing search for large set",
753 Z_APDU *new_apdu = create_Z_PDU(Z_APDU_searchResponse);
754 new_apdu->u.searchResponse->referenceId = sr->referenceId;
755 new_apdu->u.searchResponse->resultCount =
756 &m_client->m_last_resultCount;
757 send_to_client(new_apdu);
762 Z_NamePlusRecordList *npr;
763 int toget = m_client->m_last_resultCount;
764 Z_RecordComposition *comp = 0;
766 // send a present request (small set)
768 if (sr->smallSetElementSetNames)
770 comp = (Z_RecordComposition *)
771 odr_malloc(odr_encode(), sizeof(Z_RecordComposition));
772 comp->which = Z_RecordComp_simple;
773 comp->u.simple = sr->smallSetElementSetNames;
776 if (m_client->m_cache.lookup (odr_encode(), &npr, 1, toget,
777 sr->preferredRecordSyntax, comp))
779 yaz_log (LOG_LOG, "%sReturned cached records for small set",
781 Z_APDU *new_apdu = create_Z_PDU(Z_APDU_searchResponse);
782 new_apdu->u.searchResponse->referenceId = sr->referenceId;
783 new_apdu->u.searchResponse->resultCount =
784 &m_client->m_last_resultCount;
786 new_apdu->u.searchResponse->numberOfRecordsReturned
787 = odr_intdup(odr_encode(), toget);
789 new_apdu->u.searchResponse->presentStatus =
790 odr_intdup(odr_encode(), Z_PresentStatus_success);
791 new_apdu->u.searchResponse->records = (Z_Records*)
792 odr_malloc(odr_encode(), sizeof(Z_Records));
793 new_apdu->u.searchResponse->records->which = Z_Records_DBOSD;
794 new_apdu->u.searchResponse->records->u.databaseOrSurDiagnostics = npr;
795 new_apdu->u.searchResponse->nextResultSetPosition =
796 odr_intdup(odr_encode(), toget+1);
797 send_to_client(new_apdu);
802 yaz_log (LOG_LOG, "%sOptimizing search for small set",
804 Z_APDU *new_apdu = create_Z_PDU(Z_APDU_presentRequest);
805 Z_PresentRequest *pr = new_apdu->u.presentRequest;
806 pr->referenceId = sr->referenceId;
807 pr->resultSetId = sr->resultSetName;
808 pr->preferredRecordSyntax = sr->preferredRecordSyntax;
809 *pr->numberOfRecordsRequested = toget;
810 pr->recordComposition = comp;
811 m_client->m_sr_transform = 1;
816 else // query doesn't match
818 delete m_client->m_last_query;
819 m_client->m_last_query = this_query;
820 m_client->m_last_ok = 0;
821 m_client->m_cache.clear();
822 m_client->m_resultSetStartPoint = 0;
824 xfree (m_client->m_last_resultSetId);
825 m_client->m_last_resultSetId = xstrdup (sr->resultSetName);
827 m_client->m_last_databases.set(sr->num_databaseNames,
828 (const char **) sr->databaseNames);
834 void Yaz_Proxy::recv_Z_PDU(Z_APDU *apdu, int len)
836 char *cp = strchr(m_session_str, ' ');
839 sprintf(cp+1, "%d ", m_request_no);
844 yaz_log (LOG_DEBUG, "%sReceiving %s from client %d bytes", m_session_str,
845 apdu_name(apdu), len);
847 if (m_bw_hold_PDU) // double incoming PDU. shutdown now.
850 m_bw_stat.add_bytes(len);
851 m_pdu_stat.add_bytes(1);
853 int bw_total = m_bw_stat.get_total();
854 int pdu_total = m_pdu_stat.get_total();
856 yaz_log(LOG_LOG, "%sstat bw=%d pdu=%d limit-bw=%d limit-pdu=%d",
857 m_session_str, bw_total, pdu_total, m_bw_max, m_pdu_max);
860 if (bw_total > m_bw_max)
862 reduce = (bw_total/m_bw_max);
867 if (pdu_total > m_pdu_max)
869 int nreduce = (60/m_pdu_max);
870 reduce = (reduce > nreduce) ? reduce : nreduce;
875 yaz_log(LOG_LOG, "%sLimit delay=%d", m_session_str, reduce);
876 m_bw_hold_PDU = apdu; // save PDU and signal "on hold"
877 timeout(reduce); // call us reduce seconds later
880 recv_Z_PDU_0(apdu); // all fine. Proceed receive PDU as usual
883 void Yaz_Proxy::handle_max_record_retrieve(Z_APDU *apdu)
885 if (m_max_record_retrieve)
887 if (apdu->which == Z_APDU_presentRequest)
889 Z_PresentRequest *pr = apdu->u.presentRequest;
890 if (pr->numberOfRecordsRequested &&
891 *pr->numberOfRecordsRequested > m_max_record_retrieve)
892 *pr->numberOfRecordsRequested = m_max_record_retrieve;
897 Z_Records *Yaz_Proxy::create_nonSurrogateDiagnostics(ODR odr,
901 Z_Records *rec = (Z_Records *)
902 odr_malloc (odr, sizeof(*rec));
904 odr_malloc (odr, sizeof(*err));
905 Z_DiagRec *drec = (Z_DiagRec *)
906 odr_malloc (odr, sizeof(*drec));
907 Z_DefaultDiagFormat *dr = (Z_DefaultDiagFormat *)
908 odr_malloc (odr, sizeof(*dr));
910 rec->which = Z_Records_NSD;
911 rec->u.nonSurrogateDiagnostic = dr;
912 dr->diagnosticSetId =
913 yaz_oidval_to_z3950oid (odr, CLASS_DIAGSET, VAL_BIB1);
915 dr->which = Z_DefaultDiagFormat_v2Addinfo;
916 dr->u.v2Addinfo = odr_strdup (odr, addinfo ? addinfo : "");
920 Z_APDU *Yaz_Proxy::handle_query_validation(Z_APDU *apdu)
922 if (apdu->which == Z_APDU_searchRequest)
924 Z_SearchRequest *sr = apdu->u.searchRequest;
928 Yaz_ProxyConfig *cfg = check_reconfigure();
930 err = cfg->check_query(odr_encode(), m_default_target,
931 sr->query, &addinfo);
934 Z_APDU *new_apdu = create_Z_PDU(Z_APDU_searchResponse);
936 new_apdu->u.searchResponse->referenceId = sr->referenceId;
937 new_apdu->u.searchResponse->records =
938 create_nonSurrogateDiagnostics(odr_encode(), err, addinfo);
939 *new_apdu->u.searchResponse->searchStatus = 0;
941 send_to_client(new_apdu);
949 Z_APDU *Yaz_Proxy::handle_syntax_validation(Z_APDU *apdu)
952 if (apdu->which == Z_APDU_searchRequest)
954 Z_SearchRequest *sr = apdu->u.searchRequest;
955 if (*sr->smallSetUpperBound > 0 || *sr->largeSetLowerBound > 1)
959 Yaz_ProxyConfig *cfg = check_reconfigure();
962 err = cfg->check_syntax(odr_encode(),
964 sr->preferredRecordSyntax,
968 sr->preferredRecordSyntax =
969 yaz_oidval_to_z3950oid(odr_decode(), CLASS_RECSYN,
975 Z_APDU *new_apdu = create_Z_PDU(Z_APDU_searchResponse);
977 new_apdu->u.searchResponse->referenceId = sr->referenceId;
978 new_apdu->u.searchResponse->records =
979 create_nonSurrogateDiagnostics(odr_encode(), err, addinfo);
980 *new_apdu->u.searchResponse->searchStatus = 0;
982 send_to_client(new_apdu);
988 else if (apdu->which == Z_APDU_presentRequest)
990 Z_PresentRequest *pr = apdu->u.presentRequest;
993 Yaz_ProxyConfig *cfg = check_reconfigure();
996 err = cfg->check_syntax(odr_encode(), m_default_target,
997 pr->preferredRecordSyntax,
1001 pr->preferredRecordSyntax =
1002 yaz_oidval_to_z3950oid(odr_decode(), CLASS_RECSYN,
1008 Z_APDU *new_apdu = create_Z_PDU(Z_APDU_presentResponse);
1010 new_apdu->u.presentResponse->referenceId = pr->referenceId;
1011 new_apdu->u.presentResponse->records =
1012 create_nonSurrogateDiagnostics(odr_encode(), err, addinfo);
1013 *new_apdu->u.presentResponse->presentStatus =
1014 Z_PresentStatus_failure;
1016 send_to_client(new_apdu);
1024 void Yaz_Proxy::recv_Z_PDU_0(Z_APDU *apdu)
1026 // Determine our client.
1027 m_client = get_client(apdu);
1033 m_client->m_server = this;
1035 if (apdu->which == Z_APDU_initRequest)
1037 if (apdu->u.initRequest->implementationId)
1038 yaz_log(LOG_LOG, "%simplementationId: %s",
1039 m_session_str, apdu->u.initRequest->implementationId);
1040 if (apdu->u.initRequest->implementationName)
1041 yaz_log(LOG_LOG, "%simplementationName: %s",
1042 m_session_str, apdu->u.initRequest->implementationName);
1043 if (apdu->u.initRequest->implementationVersion)
1044 yaz_log(LOG_LOG, "%simplementationVersion: %s",
1045 m_session_str, apdu->u.initRequest->implementationVersion);
1046 if (m_client->m_init_flag)
1048 Z_APDU *apdu = m_client->m_initResponse;
1049 apdu->u.initResponse->otherInfo = 0;
1050 if (m_client->m_cookie && *m_client->m_cookie)
1051 set_otherInformationString(apdu, VAL_COOKIE, 1,
1052 m_client->m_cookie);
1053 send_to_client(apdu);
1056 m_client->m_init_flag = 1;
1058 handle_max_record_retrieve(apdu);
1061 apdu = handle_syntax_validation(apdu);
1064 apdu = handle_query_validation(apdu);
1067 apdu = result_set_optimize(apdu);
1070 m_client->timeout(m_target_idletime); // mark it active even
1071 // though we didn't use it
1075 // delete other info part from PDU before sending to target
1076 Z_OtherInformation **oi;
1077 get_otherInfoAPDU(apdu, &oi);
1081 if (apdu->which == Z_APDU_presentRequest &&
1082 m_client->m_resultSetStartPoint == 0)
1084 Z_PresentRequest *pr = apdu->u.presentRequest;
1085 m_client->m_resultSetStartPoint = *pr->resultSetStartPoint;
1086 m_client->m_cache.copy_presentRequest(apdu->u.presentRequest);
1088 m_client->m_resultSetStartPoint = 0;
1090 if (m_client->send_to_target(apdu) < 0)
1097 m_client->m_waiting = 1;
1100 void Yaz_Proxy::connectNotify()
1104 void Yaz_Proxy::shutdown()
1106 // only keep if keep_alive flag is set...
1108 !m_invalid_session &&
1109 m_client->m_pdu_recv < m_keepalive_limit_pdu &&
1110 m_client->m_bytes_recv+m_client->m_bytes_sent < m_keepalive_limit_bw &&
1111 m_client->m_waiting == 0)
1113 yaz_log(LOG_LOG, "%sShutdown (client to proxy) keepalive %s",
1115 m_client->get_hostname());
1116 yaz_log(LOG_LOG, "%sbw=%d pdu=%d limit-bw=%d limit-pdu=%d",
1117 m_session_str, m_client->m_pdu_recv,
1118 m_client->m_bytes_sent + m_client->m_bytes_recv,
1119 m_keepalive_limit_bw, m_keepalive_limit_pdu);
1120 assert (m_client->m_waiting != 2);
1121 // Tell client (if any) that no server connection is there..
1122 m_client->m_server = 0;
1126 yaz_log (LOG_LOG, "%sShutdown (client to proxy) close %s",
1128 m_client->get_hostname());
1129 assert (m_client->m_waiting != 2);
1134 yaz_log (LOG_LOG, "%sshutdown (client to proxy) bad state",
1140 yaz_log (LOG_LOG, "%sShutdown (client to proxy)",
1146 const char *Yaz_ProxyClient::get_session_str()
1150 return m_server->get_session_str();
1153 void Yaz_ProxyClient::shutdown()
1155 yaz_log (LOG_LOG, "%sShutdown (proxy to target) %s", get_session_str(),
1161 void Yaz_Proxy::failNotify()
1163 yaz_log (LOG_LOG, "%sConnection closed by client",
1168 void Yaz_ProxyClient::failNotify()
1170 yaz_log (LOG_LOG, "%sConnection closed by target %s",
1171 get_session_str(), get_hostname());
1175 void Yaz_ProxyClient::connectNotify()
1177 yaz_log (LOG_LOG, "%sConnection accepted by %s", get_session_str(),
1181 to = m_server->get_target_idletime();
1187 IYaz_PDU_Observer *Yaz_ProxyClient::sessionNotify(IYaz_PDU_Observable
1188 *the_PDU_Observable, int fd)
1190 return new Yaz_ProxyClient(the_PDU_Observable);
1193 Yaz_ProxyClient::~Yaz_ProxyClient()
1198 m_next->m_prev = m_prev;
1199 m_waiting = 2; // for debugging purposes only.
1200 odr_destroy(m_init_odr);
1201 delete m_last_query;
1202 xfree (m_last_resultSetId);
1206 void Yaz_Proxy::timeoutNotify()
1210 timeout(m_client_idletime);
1211 Z_APDU *apdu = m_bw_hold_PDU;
1217 yaz_log (LOG_LOG, "%sTimeout (client to proxy)", m_session_str);
1222 void Yaz_ProxyClient::timeoutNotify()
1224 yaz_log (LOG_LOG, "%sTimeout (proxy to target) %s", get_session_str(),
1229 Yaz_ProxyClient::Yaz_ProxyClient(IYaz_PDU_Observable *the_PDU_Observable) :
1230 Yaz_Z_Assoc (the_PDU_Observable)
1237 m_last_resultSetId = 0;
1238 m_last_resultCount = 0;
1242 m_init_odr = odr_createmem (ODR_DECODE);
1244 m_resultSetStartPoint = 0;
1245 m_bytes_sent = m_bytes_recv = 0;
1249 const char *Yaz_Proxy::option(const char *name, const char *value)
1251 if (!strcmp (name, "optimize")) {
1254 m_optimize = xstrdup (value);
1261 void Yaz_ProxyClient::recv_Z_PDU(Z_APDU *apdu, int len)
1263 m_bytes_recv += len;
1266 yaz_log (LOG_DEBUG, "%sReceiving %s from %s %d bytes", get_session_str(),
1267 apdu_name(apdu), get_hostname(), len);
1268 if (apdu->which == Z_APDU_initResponse)
1270 NMEM nmem = odr_extract_mem (odr_decode());
1271 odr_reset (m_init_odr);
1272 nmem_transfer (m_init_odr->mem, nmem);
1273 m_initResponse = apdu;
1275 Z_InitResponse *ir = apdu->u.initResponse;
1276 char *im0 = ir->implementationName;
1279 odr_malloc(m_init_odr, 20 + (im0 ? strlen(im0) : 0));
1286 strcat(im1, "(YAZ Proxy)");
1287 ir->implementationName = im1;
1289 nmem_destroy (nmem);
1291 if (apdu->which == Z_APDU_searchResponse)
1293 Z_SearchResponse *sr = apdu->u.searchResponse;
1294 m_last_resultCount = *sr->resultCount;
1295 int status = *sr->searchStatus;
1296 if (status && (!sr->records || sr->records->which == Z_Records_DBOSD))
1300 if (sr->records && sr->records->which == Z_Records_DBOSD)
1302 m_cache.add(odr_decode(),
1303 sr->records->u.databaseOrSurDiagnostics, 1,
1308 if (apdu->which == Z_APDU_presentResponse)
1310 Z_PresentResponse *pr = apdu->u.presentResponse;
1314 Z_APDU *new_apdu = create_Z_PDU(Z_APDU_searchResponse);
1315 Z_SearchResponse *sr = new_apdu->u.searchResponse;
1316 sr->referenceId = pr->referenceId;
1317 *sr->resultCount = m_last_resultCount;
1318 sr->records = pr->records;
1319 sr->nextResultSetPosition = pr->nextResultSetPosition;
1320 sr->numberOfRecordsReturned = pr->numberOfRecordsReturned;
1324 pr->records->which == Z_Records_DBOSD && m_resultSetStartPoint)
1326 m_cache.add(odr_decode(),
1327 pr->records->u.databaseOrSurDiagnostics,
1328 m_resultSetStartPoint, -1);
1329 m_resultSetStartPoint = 0;
1333 set_otherInformationString (apdu, VAL_COOKIE, 1, m_cookie);
1336 m_server->send_to_client(apdu);
1338 if (apdu->which == Z_APDU_close)