X-Git-Url: http://sru.miketaylor.org.uk/?a=blobdiff_plain;f=src%2Ffilter_session_shared.cpp;h=b2e00d7002283d71ffe85dfa5fd514df8899387b;hb=73f37c91c144b070020df2f27472c09b62367acf;hp=ac26c47ede86f16316e4824be045957807f0c1b2;hpb=a83814901948c8dc8d6d3da35e413dad5bf03400;p=metaproxy-moved-to-github.git diff --git a/src/filter_session_shared.cpp b/src/filter_session_shared.cpp index ac26c47..b2e00d7 100644 --- a/src/filter_session_shared.cpp +++ b/src/filter_session_shared.cpp @@ -74,7 +74,7 @@ namespace metaproxy_1 { public: std::string m_result_set_id; Databases m_databases; - int m_result_set_size; + Odr_int m_result_set_size; yazpp_1::Yaz_Z_Query m_query; time_t m_time_last_use; void timestamp(); @@ -169,12 +169,12 @@ namespace metaproxy_1 { Rep *m_p; BackendClassPtr m_backend_class; FrontendSets m_frontend_sets; - }; + }; // representation class SessionShared::Rep { friend class SessionShared; friend struct Frontend; - + FrontendPtr get_frontend(Package &package); void release_frontend(Package &package); Rep(); @@ -195,6 +195,7 @@ namespace metaproxy_1 { int m_resultset_max; int m_session_ttl; bool m_optimize_search; + bool m_restart; int m_session_max; }; } @@ -207,7 +208,7 @@ yf::SessionShared::FrontendSet::FrontendSet( { } -const yf::SessionShared::Databases & +const yf::SessionShared::Databases & yf::SessionShared::FrontendSet::get_databases() { return m_databases; @@ -221,7 +222,7 @@ const yazpp_1::Yaz_Z_Query& yf::SessionShared::FrontendSet::get_query() yf::SessionShared::InitKey::InitKey(const InitKey &k) { m_odr = odr_createmem(ODR_ENCODE); - + m_idAuthentication_size = k.m_idAuthentication_size; m_idAuthentication_buf = (char*)odr_malloc(m_odr, m_idAuthentication_size); memcpy(m_idAuthentication_buf, k.m_idAuthentication_buf, @@ -253,7 +254,7 @@ yf::SessionShared::InitKey::~InitKey() } bool yf::SessionShared::InitKey::operator < (const SessionShared::InitKey &k) - const + const { int c; c = mp::util::memcmp2( @@ -283,7 +284,7 @@ void yf::SessionShared::BackendClass::release_backend(BackendInstancePtr b) void yf::SessionShared::BackendClass::remove_backend(BackendInstancePtr b) { BackendInstanceList::iterator it = m_backend_list.begin(); - + while (it != m_backend_list.end()) { if (*it == b) @@ -293,7 +294,7 @@ void yf::SessionShared::BackendClass::remove_backend(BackendInstancePtr b) 0, Z_Close_lackOfActivity, 0); (*it)->m_close_package->session().close(); (*it)->m_close_package->move(); - + it = m_backend_list.erase(it); } else @@ -303,22 +304,22 @@ void yf::SessionShared::BackendClass::remove_backend(BackendInstancePtr b) -yf::SessionShared::BackendInstancePtr +yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::get_backend( const mp::Package &frontend_package) { { boost::mutex::scoped_lock lock(m_mutex_backend_class); - + BackendInstanceList::const_iterator it = m_backend_list.begin(); - + BackendInstancePtr backend1; // null - + for (; it != m_backend_list.end(); it++) { if (!(*it)->m_in_use) { - if (!backend1 + if (!backend1 || (*it)->m_sequence_this < backend1->m_sequence_this) backend1 = *it; } @@ -394,7 +395,7 @@ yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_ba /* already closed. We don't know why */ return null; } - else if (gdu && gdu->which == Z_GDU_Z3950 + else if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which == Z_APDU_initResponse && *gdu->u.z3950->u.initResponse->result) { @@ -459,37 +460,46 @@ void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu, } else { - frontend->m_backend_class = it->second; + frontend->m_backend_class = it->second; } } BackendClassPtr bc = frontend->m_backend_class; - BackendInstancePtr backend = bc->get_backend(package); - + BackendInstancePtr backend; mp::odr odr; - if (!backend) + + // we only need to get init response from "first" target in + // backend class - the assumption being that init response is + // same for all + if (bc->m_init_response.get() == 0) { - Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0); - *apdu->u.initResponse->result = 0; - package.response() = apdu; - package.session().close(); + backend = bc->get_backend(package); } - else { boost::mutex::scoped_lock lock(bc->m_mutex_backend_class); - yazpp_1::GDU init_response = bc->m_init_response; - Z_GDU *response_gdu = init_response.get(); - mp::util::transfer_referenceId(odr, gdu->u.z3950, - response_gdu->u.z3950); - - Z_Options *server_options = - response_gdu->u.z3950->u.initResponse->options; - Z_Options *client_options = &frontend->m_init_options; - - int i; - for (i = 0; i<30; i++) - if (!ODR_MASK_GET(client_options, i)) - ODR_MASK_CLEAR(server_options, i); - package.response() = init_response; + if (bc->m_init_response.get() == 0) + { + Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0); + *apdu->u.initResponse->result = 0; + package.response() = apdu; + package.session().close(); + } + else + { + yazpp_1::GDU init_response = bc->m_init_response; + Z_GDU *response_gdu = init_response.get(); + mp::util::transfer_referenceId(odr, gdu->u.z3950, + response_gdu->u.z3950); + + Z_Options *server_options = + response_gdu->u.z3950->u.initResponse->options; + Z_Options *client_options = &frontend->m_init_options; + + int i; + for (i = 0; i < 30; i++) + if (!ODR_MASK_GET(client_options, i)) + ODR_MASK_CLEAR(server_options, i); + package.response() = init_response; + } } if (backend) bc->release_backend(backend); @@ -505,7 +515,7 @@ yf::SessionShared::BackendSet::BackendSet( const Databases &databases, const yazpp_1::Yaz_Z_Query &query) : m_result_set_id(result_set_id), - m_databases(databases), m_result_set_size(0), m_query(query) + m_databases(databases), m_result_set_size(0), m_query(query) { timestamp(); } @@ -530,7 +540,7 @@ bool yf::SessionShared::BackendSet::search( req->query = m_query.get_Z_Query(); req->num_databaseNames = m_databases.size(); - req->databaseNames = (char**) + req->databaseNames = (char**) odr_malloc(odr, req->num_databaseNames * sizeof(char *)); Databases::const_iterator it = m_databases.begin(); size_t i = 0; @@ -547,7 +557,7 @@ bool yf::SessionShared::BackendSet::search( Z_GDU *gdu = search_package.response().get(); if (!search_package.session().is_closed() - && gdu && gdu->which == Z_GDU_Z3950 + && gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which == Z_APDU_searchResponse) { Z_SearchResponse *b_resp = gdu->u.z3950->u.searchResponse; @@ -610,7 +620,7 @@ void yf::SessionShared::Frontend::override_set( if (bc->m_named_result_sets) { result_set_id = boost::io::str( - boost::format("%1%") % + boost::format("%1%") % found_backend->m_result_set_sequence); found_backend->m_result_set_sequence++; } @@ -639,7 +649,7 @@ restart: if ((int) bc->m_backend_list.size() >= m_p->m_session_max) out_of_sessions = true; - + if (m_p->m_optimize_search) { // look at each backend and see if we have a similar search @@ -657,7 +667,6 @@ restart: found_set = *set_it; found_backend = *it; bc->use_backend(found_backend); - found_set->timestamp(); // found matching set. No need to search again return; } @@ -680,7 +689,7 @@ restart: Z_APDU *f_apdu = 0; mp::odr odr; const char *addinfo = 0; - + if (out_of_sessions) addinfo = "session_shared: all sessions in use"; if (apdu_req->which == Z_APDU_searchRequest) @@ -724,7 +733,7 @@ restart: apdu_req, found_backend, &z_records)) { bc->remove_backend(found_backend); - return; // search error + return; // search error } if (z_records) @@ -738,19 +747,20 @@ restart: else if (z_records->which == Z_Records_multipleNSD) { if (z_records->u.multipleNonSurDiagnostics->num_diagRecs >= 1 - && - + && + z_records->u.multipleNonSurDiagnostics->diagRecs[0]->which == Z_DiagRec_defaultFormat) { condition = get_diagnostic( z_records->u.multipleNonSurDiagnostics->diagRecs[0]->u.defaultFormat); - + } } - if (!session_restarted && + if (m_p->m_restart && !session_restarted && condition == YAZ_BIB1_TEMPORARY_SYSTEM_ERROR) { + package.log("session_shared", YLOG_LOG, "restart"); bc->remove_backend(found_backend); session_restarted = true; found_backend.reset(); @@ -763,7 +773,7 @@ restart: mp::odr odr; if (apdu_req->which == Z_APDU_searchRequest) { - Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, + Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0); Z_SearchResponse *f_resp = f_apdu->u.searchResponse; *f_resp->searchStatus = Z_SearchResponse_none; @@ -772,18 +782,19 @@ restart: } if (apdu_req->which == Z_APDU_presentRequest) { - Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, + Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 0, 0); Z_PresentResponse *f_resp = f_apdu->u.presentResponse; f_resp->records = z_records; package.response() = f_apdu; } bc->release_backend(found_backend); - return; // search error + return; // search error } } - if (!session_restarted && new_set->m_result_set_size < 0) + if (m_p->m_restart && !session_restarted && new_set->m_result_set_size < 0) { + package.log("session_shared", YLOG_LOG, "restart"); bc->remove_backend(found_backend); session_restarted = true; found_backend.reset(); @@ -799,33 +810,33 @@ void yf::SessionShared::Frontend::search(mp::Package &package, Z_APDU *apdu_req) { Z_SearchRequest *req = apdu_req->u.searchRequest; - FrontendSets::iterator fset_it = + FrontendSets::iterator fset_it = m_frontend_sets.find(req->resultSetName); if (fset_it != m_frontend_sets.end()) { - // result set already exist + // result set already exist // if replace indicator is off: we return diagnostic if // result set already exist. if (*req->replaceIndicator == 0) { mp::odr odr; - Z_APDU *apdu = + Z_APDU *apdu = odr.create_searchResponse( apdu_req, YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF, 0); package.response() = apdu; - + return; } m_frontend_sets.erase(fset_it); } - + yazpp_1::Yaz_Z_Query query; query.set_Z_Query(req->query); Databases databases; int i; - for (i = 0; inum_databaseNames; i++) + for (i = 0; i < req->num_databaseNames; i++) databases.push_back(req->databaseNames[i]); BackendSetPtr found_set; // null @@ -853,12 +864,12 @@ void yf::SessionShared::Frontend::present(mp::Package &package, mp::odr odr; Z_PresentRequest *req = apdu_req->u.presentRequest; - FrontendSets::iterator fset_it = + FrontendSets::iterator fset_it = m_frontend_sets.find(req->resultSetId); if (fset_it == m_frontend_sets.end()) { - Z_APDU *apdu = + Z_APDU *apdu = odr.create_presentResponse( apdu_req, YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST, @@ -880,7 +891,7 @@ void yf::SessionShared::Frontend::present(mp::Package &package, return; Z_NamePlusRecordList *npr_res = 0; - if (found_set->m_record_cache.lookup(odr, &npr_res, + if (found_set->m_record_cache.lookup(odr, &npr_res, *req->resultSetStartPoint, *req->numberOfRecordsRequested, req->preferredRecordSyntax, @@ -889,14 +900,14 @@ void yf::SessionShared::Frontend::present(mp::Package &package, Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0); Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse; - yaz_log(YLOG_LOG, "Found " ODR_INT_PRINTF "+" ODR_INT_PRINTF + yaz_log(YLOG_LOG, "Found " ODR_INT_PRINTF "+" ODR_INT_PRINTF " records in cache %p", - *req->resultSetStartPoint, + *req->resultSetStartPoint, *req->numberOfRecordsRequested, - &found_set->m_record_cache); + &found_set->m_record_cache); *f_resp->numberOfRecordsReturned = *req->numberOfRecordsRequested; - *f_resp->nextResultSetPosition = + *f_resp->nextResultSetPosition = *req->resultSetStartPoint + *req->numberOfRecordsRequested; // f_resp->presentStatus assumed OK. f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records)); @@ -908,7 +919,7 @@ void yf::SessionShared::Frontend::present(mp::Package &package, } found_backend->timestamp(); - + Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest); Z_PresentRequest *p_req = p_apdu->u.presentRequest; p_req->preferredRecordSyntax = req->preferredRecordSyntax; @@ -927,7 +938,7 @@ void yf::SessionShared::Frontend::present(mp::Package &package, Z_GDU *gdu = present_package.response().get(); if (!present_package.session().is_closed() - && gdu && gdu->which == Z_GDU_Z3950 + && gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which == Z_APDU_presentResponse) { Z_PresentResponse *b_resp = gdu->u.z3950->u.presentResponse; @@ -945,13 +956,13 @@ void yf::SessionShared::Frontend::present(mp::Package &package, { yaz_log(YLOG_LOG, "Adding " ODR_INT_PRINTF "+" ODR_INT_PRINTF " records to cache %p", - *req->resultSetStartPoint, + *req->resultSetStartPoint, *f_resp->numberOfRecordsReturned, - &found_set->m_record_cache); + &found_set->m_record_cache); found_set->m_record_cache.add( odr, b_resp->records->u.databaseOrSurDiagnostics, - *req->resultSetStartPoint, + *req->resultSetStartPoint, *f_resp->numberOfRecordsReturned); } bc->release_backend(found_backend); @@ -959,7 +970,7 @@ void yf::SessionShared::Frontend::present(mp::Package &package, else { bc->remove_backend(found_backend); - Z_APDU *f_apdu_res = + Z_APDU *f_apdu_res = odr.create_presentResponse( apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0); package.response() = f_apdu_res; @@ -1014,7 +1025,7 @@ void yf::SessionShared::BackendClass::expire_class() while (bit != m_backend_list.end()) { time_t last_use = (*bit)->m_time_last_use; - + if ((*bit)->m_in_use) { bit++; @@ -1044,7 +1055,7 @@ void yf::SessionShared::Rep::expire() boost::xtime_get(&xt, boost::TIME_UTC); xt.sec += m_session_ttl / 3; boost::thread::sleep(xt); - + BackendClassMap::const_iterator b_it = m_backend_map.begin(); for (; b_it != m_backend_map.end(); b_it++) b_it->second->expire_class(); @@ -1057,6 +1068,7 @@ yf::SessionShared::Rep::Rep() m_resultset_max = 10; m_session_ttl = 90; m_optimize_search = true; + m_restart = false; m_session_max = 100; } @@ -1091,13 +1103,13 @@ yf::SessionShared::FrontendPtr yf::SessionShared::Rep::get_frontend(mp::Package boost::mutex::scoped_lock lock(m_mutex); std::map::iterator it; - + while(true) { it = m_clients.find(package.session()); if (it == m_clients.end()) break; - + if (!it->second->m_in_use) { it->second->m_in_use = true; @@ -1115,7 +1127,7 @@ void yf::SessionShared::Rep::release_frontend(mp::Package &package) { boost::mutex::scoped_lock lock(m_mutex); std::map::iterator it; - + it = m_clients.find(package.session()); if (it != m_clients.end()) { @@ -1137,7 +1149,7 @@ void yf::SessionShared::process(mp::Package &package) const FrontendPtr f = m_p->get_frontend(package); Z_GDU *gdu = package.request().get(); - + if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which == Z_APDU_initRequest && !f->m_is_virtual) { @@ -1151,18 +1163,18 @@ void yf::SessionShared::process(mp::Package &package) const if (apdu->which == Z_APDU_initRequest) { mp::odr odr; - + package.response() = odr.create_close( apdu, Z_Close_protocolError, "double init"); - + package.session().close(); } else if (apdu->which == Z_APDU_close) { mp::odr odr; - + package.response() = odr.create_close( apdu, Z_Close_peerAbort, "received close from client"); @@ -1183,11 +1195,11 @@ void yf::SessionShared::process(mp::Package &package) const else { mp::odr odr; - + package.response() = odr.create_close( apdu, Z_Close_protocolError, "unsupported APDU in filter_session_shared"); - + package.session().close(); } } @@ -1207,11 +1219,11 @@ void yf::SessionShared::configure(const xmlNode *ptr, bool test_only, for (attr = ptr->properties; attr; attr = attr->next) { if (!strcmp((const char *) attr->name, "ttl")) - m_p->m_resultset_ttl = + m_p->m_resultset_ttl = mp::xml::get_int(attr->children, 30); else if (!strcmp((const char *) attr->name, "max")) { - m_p->m_resultset_max = + m_p->m_resultset_max = mp::xml::get_int(attr->children, 10); } else if (!strcmp((const char *) attr->name, "optimizesearch")) @@ -1219,6 +1231,10 @@ void yf::SessionShared::configure(const xmlNode *ptr, bool test_only, m_p->m_optimize_search = mp::xml::get_bool(attr->children, true); } + else if (!strcmp((const char *) attr->name, "restart")) + { + m_p->m_restart = mp::xml::get_bool(attr->children, true); + } else throw mp::filter::FilterException( "Bad attribute " + std::string((const char *) @@ -1231,10 +1247,10 @@ void yf::SessionShared::configure(const xmlNode *ptr, bool test_only, for (attr = ptr->properties; attr; attr = attr->next) { if (!strcmp((const char *) attr->name, "ttl")) - m_p->m_session_ttl = + m_p->m_session_ttl = mp::xml::get_int(attr->children, 90); else if (!strcmp((const char *) attr->name, "max")) - m_p->m_session_max = + m_p->m_session_max = mp::xml::get_int(attr->children, 100); else throw mp::filter::FilterException( @@ -1244,7 +1260,7 @@ void yf::SessionShared::configure(const xmlNode *ptr, bool test_only, } else { - throw mp::filter::FilterException("Bad element " + throw mp::filter::FilterException("Bad element " + std::string((const char *) ptr->name)); }