X-Git-Url: http://sru.miketaylor.org.uk/?a=blobdiff_plain;f=src%2Ffilter_session_shared.cpp;h=29c4164c0688ce401c3f589c61861d42adbe49ea;hb=b16a88ecd56e0bf89926dfd1eef008bf155221d3;hp=4110d8ee4f370cb0fc5a9aba02f9a97db0154f9a;hpb=98baf24c524d68c2bc44e31863a89b0f66e2dbda;p=metaproxy-moved-to-github.git diff --git a/src/filter_session_shared.cpp b/src/filter_session_shared.cpp index 4110d8e..29c4164 100644 --- a/src/filter_session_shared.cpp +++ b/src/filter_session_shared.cpp @@ -1,5 +1,5 @@ /* This file is part of Metaproxy. - Copyright (C) 2005-2012 Index Data + Copyright (C) 2005-2013 Index Data Metaproxy is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free @@ -31,6 +31,7 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA #include #include "filter_session_shared.hpp" +#include #include #include #include @@ -79,16 +80,20 @@ namespace metaproxy_1 { time_t m_time_last_use; void timestamp(); yazpp_1::RecordCache m_record_cache; + Z_OtherInformation *additionalSearchInfoResponse; + NMEM mem_additionalSearchInfoResponse; BackendSet( const std::string &result_set_id, const Databases &databases, const yazpp_1::Yaz_Z_Query &query); + ~BackendSet(); bool search( Package &frontend_package, Package &search_package, const Z_APDU *apdu_req, const BackendInstancePtr bp, - Z_Records **z_records); + Z_Records **z_records, + Z_OtherInformation *additionalSearchInfo); }; // backend connection instance class SessionShared::BackendInstance { @@ -117,7 +122,7 @@ namespace metaproxy_1 { BackendInstancePtr get_backend(const Package &package); void use_backend(BackendInstancePtr b); void release_backend(BackendInstancePtr b); - void expire_class(); + bool expire_instances(); yazpp_1::GDU m_init_request; yazpp_1::GDU m_init_response; boost::mutex m_mutex_backend_class; @@ -125,11 +130,15 @@ namespace metaproxy_1 { time_t m_backend_set_ttl; time_t m_backend_expiry_ttl; size_t m_backend_set_max; + Odr_int m_preferredMessageSize; + Odr_int m_maximumRecordSize; public: BackendClass(const yazpp_1::GDU &init_request, int resultset_ttl, int resultset_max, - int session_ttl); + int session_ttl, + Odr_int preferredRecordSize, + Odr_int maximumRecordSize); ~BackendClass(); }; // frontend result set @@ -155,6 +164,9 @@ namespace metaproxy_1 { void present(Package &package, Z_APDU *apdu); void scan(Package &package, Z_APDU *apdu); + int result_set_ref(ODR o, + const Databases &databases, + Z_RPNStructure *s, std::string &rset); void get_set(mp::Package &package, const Z_APDU *apdu_req, const Databases &databases, @@ -181,6 +193,8 @@ namespace metaproxy_1 { public: void expire(); private: + void expire_classes(); + void stat(); void init(Package &package, const Z_GDU *gdu, FrontendPtr frontend); void start(); @@ -197,6 +211,8 @@ namespace metaproxy_1 { bool m_optimize_search; bool m_restart; int m_session_max; + Odr_int m_preferredMessageSize; + Odr_int m_maximumRecordSize; }; } } @@ -382,6 +398,11 @@ yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_ba ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2); ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3); + if (m_preferredMessageSize) + *req->preferredMessageSize = m_preferredMessageSize; + if (m_maximumRecordSize) + *req->maximumRecordSize = m_maximumRecordSize; + init_package.request() = init_pdu; init_package.move(); @@ -430,15 +451,39 @@ yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_ba yf::SessionShared::BackendClass::BackendClass(const yazpp_1::GDU &init_request, int resultset_ttl, int resultset_max, - int session_ttl) + int session_ttl, + Odr_int preferredMessageSize, + Odr_int maximumRecordSize) : m_named_result_sets(false), m_init_request(init_request), m_sequence_top(0), m_backend_set_ttl(resultset_ttl), - m_backend_expiry_ttl(session_ttl), m_backend_set_max(resultset_max) + m_backend_expiry_ttl(session_ttl), m_backend_set_max(resultset_max), + m_preferredMessageSize(preferredMessageSize), + m_maximumRecordSize(maximumRecordSize) {} yf::SessionShared::BackendClass::~BackendClass() {} +void yf::SessionShared::Rep::stat() +{ + int no_classes = 0; + int no_instances = 0; + BackendClassMap::const_iterator it; + { + boost::mutex::scoped_lock lock(m_mutex_backend_map); + for (it = m_backend_map.begin(); it != m_backend_map.end(); it++) + { + BackendClassPtr bc = it->second; + no_classes++; + BackendInstanceList::iterator bit = bc->m_backend_list.begin(); + for (; bit != bc->m_backend_list.end(); bit++) + no_instances++; + } + } + yaz_log(YLOG_LOG, "backend classes=%d instances=%d", no_classes, + no_instances); +} + void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu, FrontendPtr frontend) { @@ -456,7 +501,9 @@ void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu, BackendClassPtr b(new BackendClass(gdu->u.z3950, m_resultset_ttl, m_resultset_max, - m_session_ttl)); + m_session_ttl, + m_preferredMessageSize, + m_maximumRecordSize)); m_backend_map[k] = b; frontend->m_backend_class = b; } @@ -474,7 +521,6 @@ void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu, if (bc->m_backend_list.size() == 0) { BackendInstancePtr backend = bc->create_backend(package); - if (backend) bc->release_backend(backend); } @@ -491,13 +537,22 @@ void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu, Z_GDU *response_gdu = init_response.get(); mp::util::transfer_referenceId(odr, gdu->u.z3950, response_gdu->u.z3950); - Z_Options *server_options = - response_gdu->u.z3950->u.initResponse->options; + Z_InitResponse *init_res = response_gdu->u.z3950->u.initResponse; + Z_Options *server_options = init_res->options; Z_Options *client_options = &frontend->m_init_options; int i; for (i = 0; i < 30; i++) if (!ODR_MASK_GET(client_options, i)) ODR_MASK_CLEAR(server_options, i); + + if (!m_preferredMessageSize || + *init_res->preferredMessageSize > *req->preferredMessageSize) + *init_res->preferredMessageSize = *req->preferredMessageSize; + + if (!m_maximumRecordSize || + *init_res->maximumRecordSize > *req->maximumRecordSize) + *init_res->maximumRecordSize = *req->maximumRecordSize; + package.response() = init_response; if (!*response_gdu->u.z3950->u.initResponse->result) package.session().close(); @@ -527,6 +582,13 @@ yf::SessionShared::BackendSet::BackendSet( m_databases(databases), m_result_set_size(0), m_query(query) { timestamp(); + mem_additionalSearchInfoResponse = nmem_create(); + additionalSearchInfoResponse = 0; +} + +yf::SessionShared::BackendSet::~BackendSet() +{ + nmem_destroy(mem_additionalSearchInfoResponse); } static int get_diagnostic(Z_DefaultDiagFormat *r) @@ -539,12 +601,14 @@ bool yf::SessionShared::BackendSet::search( mp::Package &search_package, const Z_APDU *frontend_apdu, const BackendInstancePtr bp, - Z_Records **z_records) + Z_Records **z_records, + Z_OtherInformation *additionalSearchInfo) { mp::odr odr; Z_APDU *apdu_req = zget_APDU(odr, Z_APDU_searchRequest); Z_SearchRequest *req = apdu_req->u.searchRequest; + req->additionalSearchInfo = additionalSearchInfo; req->resultSetName = odr_strdup(odr, m_result_set_id.c_str()); req->query = m_query.get_Z_Query(); @@ -572,6 +636,10 @@ bool yf::SessionShared::BackendSet::search( Z_SearchResponse *b_resp = gdu->u.z3950->u.searchResponse; *z_records = b_resp->records; m_result_set_size = *b_resp->resultCount; + + nmem_reset(mem_additionalSearchInfoResponse); + additionalSearchInfoResponse = yaz_clone_z_OtherInformation( + b_resp->additionalSearchInfo, mem_additionalSearchInfoResponse); return true; } Z_APDU *f_apdu = 0; @@ -650,6 +718,10 @@ void yf::SessionShared::Frontend::get_set(mp::Package &package, BackendSetPtr &found_set) { bool session_restarted = false; + Z_OtherInformation *additionalSearchInfo = 0; + + if (apdu_req->which == Z_APDU_searchRequest) + additionalSearchInfo = apdu_req->u.searchRequest->additionalSearchInfo; restart: std::string result_set_id; @@ -661,7 +733,7 @@ restart: if ((int) bc->m_backend_list.size() >= m_p->m_session_max) out_of_sessions = true; - if (m_p->m_optimize_search) + if (m_p->m_optimize_search && !additionalSearchInfo) { // look at each backend and see if we have a similar search BackendInstanceList::const_iterator it = bc->m_backend_list.begin(); @@ -743,7 +815,8 @@ restart: search_package.copy_filter(package); if (!new_set->search(package, search_package, - apdu_req, found_backend, &z_records)) + apdu_req, found_backend, &z_records, + additionalSearchInfo)) { bc->remove_backend(found_backend); return; // search error @@ -819,6 +892,54 @@ restart: found_backend->m_sets.push_back(found_set); } +int yf::SessionShared::Frontend::result_set_ref(ODR o, + const Databases &databases, + Z_RPNStructure *s, + std::string &rset) +{ + int ret = 0; + switch (s->which) + { + case Z_RPNStructure_simple: + if (s->u.simple->which == Z_Operand_resultSetId) + { + const char *id = s->u.simple->u.resultSetId; + rset = id; + + FrontendSets::iterator fset_it = m_frontend_sets.find(id); + if (fset_it == m_frontend_sets.end()) + { + ret = YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST; + } + else if (fset_it->second->get_databases() != databases) + { + ret = YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST; + } + else + { + yazpp_1::Yaz_Z_Query query = fset_it->second->get_query(); + Z_Query *q = yaz_copy_Z_Query(query.get_Z_Query(), o); + if (q->which == Z_Query_type_1 || q->which == Z_Query_type_101) + { + s->which = q->u.type_1->RPNStructure->which; + s->u.simple = q->u.type_1->RPNStructure->u.simple; + } + else + { + ret = YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST; + } + } + } + break; + case Z_RPNStructure_complex: + ret = result_set_ref(o, databases, s->u.complex->s1, rset); + if (!ret) + ret = result_set_ref(o, databases, s->u.complex->s2, rset); + break; + } + return ret; +} + void yf::SessionShared::Frontend::search(mp::Package &package, Z_APDU *apdu_req) { @@ -839,19 +960,40 @@ void yf::SessionShared::Frontend::search(mp::Package &package, YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF, 0); package.response() = apdu; - return; } m_frontend_sets.erase(fset_it); } - yazpp_1::Yaz_Z_Query query; - query.set_Z_Query(req->query); Databases databases; int i; for (i = 0; i < req->num_databaseNames; i++) databases.push_back(req->databaseNames[i]); + + yazpp_1::Yaz_Z_Query query; + query.set_Z_Query(req->query); + + Z_Query *q = query.get_Z_Query(); + if (q->which == Z_Query_type_1 || q->which == Z_Query_type_101) + { + mp::odr odr; + std::string rset; + int diag = result_set_ref(odr, databases, q->u.type_1->RPNStructure, + rset); + if (diag) + { + Z_APDU *apdu = + odr.create_searchResponse( + apdu_req, + diag, + rset.c_str()); + package.response() = apdu; + return; + } + query.set_Z_Query(q); + } + BackendSetPtr found_set; // null BackendInstancePtr found_backend; // null @@ -863,6 +1005,7 @@ void yf::SessionShared::Frontend::search(mp::Package &package, Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0); Z_SearchResponse *f_resp = f_apdu->u.searchResponse; *f_resp->resultCount = found_set->m_result_set_size; + f_resp->additionalSearchInfo = found_set->additionalSearchInfoResponse; package.response() = f_apdu; FrontendSetPtr fset(new FrontendSet(databases, query)); @@ -967,11 +1110,13 @@ void yf::SessionShared::Frontend::present(mp::Package &package, if (b_resp->records && b_resp->records->which == Z_Records_DBOSD) { +#if 0 yaz_log(YLOG_LOG, "Adding " ODR_INT_PRINTF "+" ODR_INT_PRINTF " records to cache %p", *req->resultSetStartPoint, *f_resp->numberOfRecordsReturned, &found_set->m_record_cache); +#endif found_set->m_record_cache.add( odr, b_resp->records->u.databaseOrSurDiagnostics, @@ -1000,7 +1145,8 @@ void yf::SessionShared::Frontend::scan(mp::Package &frontend_package, { mp::odr odr; Z_APDU *apdu = odr.create_scanResponse( - apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0); + apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, + "session_shared: could not create backend"); frontend_package.response() = apdu; } else @@ -1030,7 +1176,7 @@ void yf::SessionShared::Worker::operator() (void) m_p->expire(); } -void yf::SessionShared::BackendClass::expire_class() +bool yf::SessionShared::BackendClass::expire_instances() { time_t now; time(&now); @@ -1059,6 +1205,25 @@ void yf::SessionShared::BackendClass::expire_class() bit++; } } + if (m_backend_list.empty()) + return true; + return false; +} + +void yf::SessionShared::Rep::expire_classes() +{ + boost::mutex::scoped_lock lock(m_mutex_backend_map); + BackendClassMap::iterator b_it = m_backend_map.begin(); + while (b_it != m_backend_map.end()) + { + if (b_it->second->expire_instances()) + { + m_backend_map.erase(b_it); + b_it = m_backend_map.begin(); + } + else + b_it++; + } } void yf::SessionShared::Rep::expire() @@ -1066,13 +1231,18 @@ void yf::SessionShared::Rep::expire() while (true) { boost::xtime xt; - boost::xtime_get(&xt, boost::TIME_UTC); + boost::xtime_get(&xt, +#if BOOST_VERSION >= 105000 + boost::TIME_UTC_ +#else + boost::TIME_UTC +#endif + ); xt.sec += m_session_ttl / 3; boost::thread::sleep(xt); - BackendClassMap::const_iterator b_it = m_backend_map.begin(); - for (; b_it != m_backend_map.end(); b_it++) - b_it->second->expire_class(); + stat(); + expire_classes(); } } @@ -1084,6 +1254,8 @@ yf::SessionShared::Rep::Rep() m_optimize_search = true; m_restart = false; m_session_max = 100; + m_preferredMessageSize = 0; + m_maximumRecordSize = 0; } void yf::SessionShared::Rep::start() @@ -1272,6 +1444,24 @@ void yf::SessionShared::configure(const xmlNode *ptr, bool test_only, attr->name)); } } + else if (!strcmp((const char *) ptr->name, "init")) + { + const struct _xmlAttr *attr; + for (attr = ptr->properties; attr; attr = attr->next) + { + if (!strcmp((const char *) attr->name, "maximum-record-size")) + m_p->m_maximumRecordSize = + mp::xml::get_int(attr->children, 0); + else if (!strcmp((const char *) attr->name, + "preferred-message-size")) + m_p->m_preferredMessageSize = + mp::xml::get_int(attr->children, 0); + else + throw mp::filter::FilterException( + "Bad attribute " + std::string((const char *) + attr->name)); + } + } else { throw mp::filter::FilterException("Bad element "