X-Git-Url: http://sru.miketaylor.org.uk/?a=blobdiff_plain;ds=sidebyside;f=src%2Ffilter_session_shared.cpp;h=720a71d961a254dd0d5290b432c689349c00cf8f;hb=d0123337d70b0fb97b578cc57467bb94980f1014;hp=e0f835b9dd42a76a24b5e6087210f641483a10c0;hpb=16921033f2609ed948cf6985b4fbce3d927a20c1;p=metaproxy-moved-to-github.git diff --git a/src/filter_session_shared.cpp b/src/filter_session_shared.cpp index e0f835b..720a71d 100644 --- a/src/filter_session_shared.cpp +++ b/src/filter_session_shared.cpp @@ -1,8 +1,20 @@ -/* $Id: filter_session_shared.cpp,v 1.12 2006-06-19 23:54:02 adam Exp $ - Copyright (c) 2005-2006, Index Data. +/* This file is part of Metaproxy. + Copyright (C) 2005-2009 Index Data - See the LICENSE file for details - */ +Metaproxy is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free +Software Foundation; either version 2, or (at your option) any later +version. + +Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY +WARRANTY; without even the implied warranty of MERCHANTABILITY or +FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +*/ #include "config.hpp" @@ -24,6 +36,7 @@ #include #include #include +#include #include #include #include @@ -34,6 +47,7 @@ namespace yf = metaproxy_1::filter; namespace metaproxy_1 { namespace filter { + // key for session.. We'll only share sessions with same InitKey class SessionShared::InitKey { public: bool operator < (const SessionShared::InitKey &k) const; @@ -41,13 +55,13 @@ namespace metaproxy_1 { InitKey(const InitKey &); ~InitKey(); private: - InitKey &operator = (const InitKey &k); char *m_idAuthentication_buf; int m_idAuthentication_size; char *m_otherInfo_buf; int m_otherInfo_size; ODR m_odr; }; + // worker thread .. for expiry of sessions class SessionShared::Worker { public: Worker(SessionShared::Rep *rep); @@ -55,6 +69,7 @@ namespace metaproxy_1 { private: SessionShared::Rep *m_p; }; + // backend result set class SessionShared::BackendSet { public: std::string m_result_set_id; @@ -63,6 +78,7 @@ namespace metaproxy_1 { yazpp_1::Yaz_Z_Query m_query; time_t m_time_last_use; void timestamp(); + yazpp_1::RecordCache m_record_cache; BackendSet( const std::string &result_set_id, const Databases &databases, @@ -70,8 +86,10 @@ namespace metaproxy_1 { bool search( Package &frontend_package, const Z_APDU *apdu_req, - const BackendInstancePtr bp); + const BackendInstancePtr bp, + bool &fatal_error); }; + // backend connection instance class SessionShared::BackendInstance { friend class Rep; friend class BackendClass; @@ -86,6 +104,7 @@ namespace metaproxy_1 { mp::Package * m_close_package; ~BackendInstance(); }; + // backends of some class (all with same InitKey) class SessionShared::BackendClass : boost::noncopyable { friend class Rep; friend struct Frontend; @@ -96,7 +115,7 @@ namespace metaproxy_1 { BackendInstancePtr get_backend(const Package &package); void use_backend(BackendInstancePtr b); void release_backend(BackendInstancePtr b); - void expire(); + void expire_class(); yazpp_1::GDU m_init_request; yazpp_1::GDU m_init_response; boost::mutex m_mutex_backend_class; @@ -105,9 +124,13 @@ namespace metaproxy_1 { time_t m_backend_expiry_ttl; size_t m_backend_set_max; public: - BackendClass(const yazpp_1::GDU &init_request); + BackendClass(const yazpp_1::GDU &init_request, + int resultset_ttl, + int resultset_max, + int session_ttl); ~BackendClass(); }; + // frontend result set class SessionShared::FrontendSet { Databases m_databases; yazpp_1::Yaz_Z_Query m_query; @@ -119,14 +142,16 @@ namespace metaproxy_1 { const yazpp_1::Yaz_Z_Query &query); FrontendSet(); }; + // frontend session struct SessionShared::Frontend { Frontend(Rep *rep); ~Frontend(); bool m_is_virtual; bool m_in_use; - + Z_Options m_init_options; void search(Package &package, Z_APDU *apdu); void present(Package &package, Z_APDU *apdu); + void scan(Package &package, Z_APDU *apdu); void get_set(mp::Package &package, const Z_APDU *apdu_req, @@ -141,6 +166,7 @@ namespace metaproxy_1 { BackendClassPtr m_backend_class; FrontendSets m_frontend_sets; }; + // representation class SessionShared::Rep { friend class SessionShared; friend struct Frontend; @@ -160,6 +186,9 @@ namespace metaproxy_1 { BackendClassMap m_backend_map; boost::mutex m_mutex_backend_map; boost::thread_group m_thrds; + int m_resultset_ttl; + int m_resultset_max; + int m_session_ttl; }; } } @@ -251,7 +280,15 @@ void yf::SessionShared::BackendClass::remove_backend(BackendInstancePtr b) while (it != m_backend_list.end()) { if (*it == b) + { + mp::odr odr; + (*it)->m_close_package->response() = odr.create_close( + 0, Z_Close_lackOfActivity, 0); + (*it)->m_close_package->session().close(); + (*it)->m_close_package->move(); + it = m_backend_list.erase(it); + } else it++; } @@ -314,7 +351,25 @@ yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_ba init_package.copy_filter(frontend_package); - init_package.request() = m_init_request; + yazpp_1::GDU actual_init_request = m_init_request; + Z_GDU *init_pdu = actual_init_request.get(); + + assert(init_pdu->which == Z_GDU_Z3950); + assert(init_pdu->u.z3950->which == Z_APDU_initRequest); + + Z_InitRequest *req = init_pdu->u.z3950->u.initRequest; + ODR_MASK_ZERO(req->options); + + ODR_MASK_SET(req->options, Z_Options_search); + ODR_MASK_SET(req->options, Z_Options_present); + ODR_MASK_SET(req->options, Z_Options_namedResultSets); + ODR_MASK_SET(req->options, Z_Options_scan); + + ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1); + ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2); + ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3); + + init_package.request() = init_pdu; init_package.move(); @@ -350,10 +405,13 @@ yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_ba } -yf::SessionShared::BackendClass::BackendClass(const yazpp_1::GDU &init_request) +yf::SessionShared::BackendClass::BackendClass(const yazpp_1::GDU &init_request, + int resultset_ttl, + int resultset_max, + int session_ttl) : m_named_result_sets(false), m_init_request(init_request), - m_sequence_top(0), m_backend_set_ttl(30), - m_backend_expiry_ttl(90), m_backend_set_max(10) + m_sequence_top(0), m_backend_set_ttl(resultset_ttl), + m_backend_expiry_ttl(session_ttl), m_backend_set_max(resultset_max) {} yf::SessionShared::BackendClass::~BackendClass() @@ -365,6 +423,7 @@ void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu, Z_InitRequest *req = gdu->u.z3950->u.initRequest; frontend->m_is_virtual = true; + frontend->m_init_options = *req->options; InitKey k(req); { boost::mutex::scoped_lock lock(m_mutex_backend_map); @@ -372,17 +431,16 @@ void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu, it = m_backend_map.find(k); if (it == m_backend_map.end()) { - BackendClassPtr b(new BackendClass(gdu->u.z3950)); + BackendClassPtr b(new BackendClass(gdu->u.z3950, + m_resultset_ttl, + m_resultset_max, + m_session_ttl)); m_backend_map[k] = b; frontend->m_backend_class = b; - std::cout << "SessionShared::Rep::init new session " - << frontend->m_backend_class << "\n"; } else { frontend->m_backend_class = it->second; - std::cout << "SessionShared::Rep::init existing session " - << frontend->m_backend_class << "\n"; } } BackendClassPtr bc = frontend->m_backend_class; @@ -399,10 +457,20 @@ void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu, else { boost::mutex::scoped_lock lock(bc->m_mutex_backend_class); - Z_GDU *response_gdu = bc->m_init_response.get(); + yazpp_1::GDU init_response = bc->m_init_response; + Z_GDU *response_gdu = init_response.get(); mp::util::transfer_referenceId(odr, gdu->u.z3950, response_gdu->u.z3950); - package.response() = response_gdu; + + Z_Options *server_options = + response_gdu->u.z3950->u.initResponse->options; + Z_Options *client_options = &frontend->m_init_options; + + int i; + for (i = 0; i<30; i++) + if (!ODR_MASK_GET(client_options, i)) + ODR_MASK_CLEAR(server_options, i); + package.response() = init_response; } if (backend) bc->release_backend(backend); @@ -424,9 +492,10 @@ yf::SessionShared::BackendSet::BackendSet( } bool yf::SessionShared::BackendSet::search( - Package &frontend_package, + mp::Package &frontend_package, const Z_APDU *frontend_apdu, - const BackendInstancePtr bp) + const BackendInstancePtr bp, + bool & fatal_error) { Package search_package(bp->m_session, frontend_package.origin()); @@ -450,7 +519,8 @@ bool yf::SessionShared::BackendSet::search( search_package.request() = apdu_req; search_package.move(); - + fatal_error = false; // assume backend session is good + Z_Records *z_records_diag = 0; Z_GDU *gdu = search_package.response().get(); if (!search_package.session().is_closed() @@ -466,11 +536,15 @@ bool yf::SessionShared::BackendSet::search( } if (z_records_diag) { + // there could be diagnostics that are so bad.. that + // we simply mark the error as fatal.. For now we assume + // we can resume if (frontend_apdu->which == Z_APDU_searchRequest) { Z_APDU *f_apdu = odr.create_searchResponse(frontend_apdu, 0, 0); Z_SearchResponse *f_resp = f_apdu->u.searchResponse; + *f_resp->searchStatus = *b_resp->searchStatus; f_resp->records = z_records_diag; frontend_package.response() = f_apdu; return false; @@ -488,18 +562,18 @@ bool yf::SessionShared::BackendSet::search( m_result_set_size = *b_resp->resultCount; return true; } + Z_APDU *f_apdu = 0; if (frontend_apdu->which == Z_APDU_searchRequest) - { - Z_APDU *f_apdu = - odr.create_searchResponse(frontend_apdu, 1, "Search closed"); - frontend_package.response() = f_apdu; - } - if (frontend_apdu->which == Z_APDU_presentRequest) - { - Z_APDU *f_apdu = - odr.create_presentResponse(frontend_apdu, 1, "Search closed"); - frontend_package.response() = f_apdu; - } + f_apdu = odr.create_searchResponse( + frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0); + else if (frontend_apdu->which == Z_APDU_presentRequest) + f_apdu = odr.create_presentResponse( + frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0); + else + f_apdu = odr.create_close( + frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0); + frontend_package.response() = f_apdu; + fatal_error = true; // weired response.. bad backend return false; } @@ -525,7 +599,6 @@ void yf::SessionShared::Frontend::override_set( found_backend = *it; result_set_id = (*set_it)->m_result_set_id; found_backend->m_sets.erase(set_it); - std::cout << "REUSE TTL SET: " << result_set_id << "\n"; return; } } @@ -546,11 +619,9 @@ void yf::SessionShared::Frontend::override_set( } else result_set_id = "default"; - std::cout << "AVAILABLE SET: " << result_set_id << "\n"; return; } } - } void yf::SessionShared::Frontend::get_set(mp::Package &package, @@ -560,6 +631,9 @@ void yf::SessionShared::Frontend::get_set(mp::Package &package, BackendInstancePtr &found_backend, BackendSetPtr &found_set) { + bool session_restarted = false; + +restart: std::string result_set_id; BackendClassPtr bc = m_backend_class; { @@ -582,8 +656,6 @@ void yf::SessionShared::Frontend::get_set(mp::Package &package, found_backend = *it; bc->use_backend(found_backend); found_set->timestamp(); - std::cout << "MATCH SET: " << - found_set->m_result_set_id << "\n"; // found matching set. No need to search again return; } @@ -598,9 +670,29 @@ void yf::SessionShared::Frontend::get_set(mp::Package &package, { // create a new backend set (and new set) found_backend = bc->create_backend(package); - assert(found_backend); - std::cout << "NEW " << found_backend << "\n"; - + + if (!found_backend) + { + Z_APDU *f_apdu = 0; + mp::odr odr; + if (apdu_req->which == Z_APDU_searchRequest) + { + f_apdu = odr.create_searchResponse( + apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0); + } + else if (apdu_req->which == Z_APDU_presentRequest) + { + f_apdu = odr.create_presentResponse( + apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0); + } + else + { + f_apdu = odr.create_close( + apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0); + } + package.response() = f_apdu; + return; + } if (bc->m_named_result_sets) { result_set_id = boost::io::str( @@ -609,17 +701,27 @@ void yf::SessionShared::Frontend::get_set(mp::Package &package, } else result_set_id = "default"; - std::cout << "NEW SET: " << result_set_id << "\n"; } // we must search ... BackendSetPtr new_set(new BackendSet(result_set_id, databases, query)); - if (!new_set->search(package, apdu_req, found_backend)) + bool fatal_error = false; + if (!new_set->search(package, apdu_req, found_backend, fatal_error)) { - std::cout << "search error\n"; - bc->release_backend(found_backend); + if (fatal_error) + bc->remove_backend(found_backend); + else + bc->release_backend(found_backend); return; // search error } + if (!session_restarted && new_set->m_result_set_size < 0) + { + bc->remove_backend(found_backend); + session_restarted = true; + found_backend.reset(); + goto restart; + } + found_set = new_set; found_set->timestamp(); found_backend->m_sets.push_back(found_set); @@ -662,9 +764,9 @@ void yf::SessionShared::Frontend::search(mp::Package &package, BackendInstancePtr found_backend; // null get_set(package, apdu_req, databases, query, found_backend, found_set); - if (!found_set) return; + mp::odr odr; Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0); Z_SearchResponse *f_resp = f_apdu->u.searchResponse; @@ -709,6 +811,33 @@ void yf::SessionShared::Frontend::present(mp::Package &package, if (!found_set) return; + Z_NamePlusRecordList *npr_res = 0; + if (found_set->m_record_cache.lookup(odr, &npr_res, + *req->resultSetStartPoint, + *req->numberOfRecordsRequested, + req->preferredRecordSyntax, + req->recordComposition)) + { + Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0); + Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse; + + yaz_log(YLOG_LOG, "Found %d+%d records in cache %p", + *req->resultSetStartPoint, + *req->numberOfRecordsRequested, + &found_set->m_record_cache); + + *f_resp->numberOfRecordsReturned = *req->numberOfRecordsRequested; + *f_resp->nextResultSetPosition = + *req->resultSetStartPoint + *req->numberOfRecordsRequested; + // f_resp->presentStatus assumed OK. + f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records)); + f_resp->records->which = Z_Records_DBOSD; + f_resp->records->u.databaseOrSurDiagnostics = npr_res; + package.response() = f_apdu_res; + bc->release_backend(found_backend); + return; + } + Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest); Z_PresentRequest *p_req = p_apdu->u.presentRequest; p_req->preferredRecordSyntax = req->preferredRecordSyntax; @@ -740,17 +869,60 @@ void yf::SessionShared::Frontend::present(mp::Package &package, f_resp->records = b_resp->records; f_resp->otherInfo = b_resp->otherInfo; package.response() = f_apdu_res; + + if (b_resp->records && b_resp->records->which == Z_Records_DBOSD) + { + yaz_log(YLOG_LOG, "Adding %d+%d records to cache %p", + *req->resultSetStartPoint, + *f_resp->numberOfRecordsReturned, + &found_set->m_record_cache); + found_set->m_record_cache.add( + odr, + b_resp->records->u.databaseOrSurDiagnostics, + *req->resultSetStartPoint, + *f_resp->numberOfRecordsReturned); + } bc->release_backend(found_backend); } else { bc->remove_backend(found_backend); Z_APDU *f_apdu_res = - odr.create_presentResponse(apdu_req, 1, "present error"); + odr.create_presentResponse( + apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0); package.response() = f_apdu_res; } } +void yf::SessionShared::Frontend::scan(mp::Package &frontend_package, + Z_APDU *apdu_req) +{ + BackendClassPtr bc = m_backend_class; + BackendInstancePtr backend = bc->get_backend(frontend_package); + if (!backend) + { + mp::odr odr; + Z_APDU *apdu = odr.create_scanResponse( + apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0); + frontend_package.response() = apdu; + } + else + { + Package scan_package(backend->m_session, frontend_package.origin()); + scan_package.copy_filter(frontend_package); + scan_package.request() = apdu_req; + scan_package.move(); + frontend_package.response() = scan_package.response(); + if (scan_package.session().is_closed()) + { + frontend_package.session().close(); + bc->remove_backend(backend); + } + else + bc->release_backend(backend); + } +} + yf::SessionShared::Worker::Worker(SessionShared::Rep *rep) : m_p(rep) { } @@ -760,7 +932,7 @@ void yf::SessionShared::Worker::operator() (void) m_p->expire(); } -void yf::SessionShared::BackendClass::expire() +void yf::SessionShared::BackendClass::expire_class() { time_t now; time(&now); @@ -768,26 +940,27 @@ void yf::SessionShared::BackendClass::expire() BackendInstanceList::iterator bit = m_backend_list.begin(); while (bit != m_backend_list.end()) { - std::cout << "expiry "; time_t last_use = (*bit)->m_time_last_use; - if ((now >= last_use && now - last_use > m_backend_expiry_ttl) + + if ((*bit)->m_in_use) + { + bit++; + } + else if ((now >= last_use && now - last_use > m_backend_expiry_ttl) || (now < last_use)) { mp::odr odr; (*bit)->m_close_package->response() = odr.create_close( - 0, Z_Close_lackOfActivity, "session expired"); + 0, Z_Close_lackOfActivity, 0); (*bit)->m_close_package->session().close(); (*bit)->m_close_package->move(); bit = m_backend_list.erase(bit); - std::cout << "erase"; } else { - std::cout << "keep"; bit++; } - std::cout << std::endl; } } @@ -799,16 +972,18 @@ void yf::SessionShared::Rep::expire() boost::xtime_get(&xt, boost::TIME_UTC); xt.sec += 30; boost::thread::sleep(xt); - std::cout << "." << std::endl; BackendClassMap::const_iterator b_it = m_backend_map.begin(); for (; b_it != m_backend_map.end(); b_it++) - b_it->second->expire(); + b_it->second->expire_class(); } } yf::SessionShared::Rep::Rep() { + m_resultset_ttl = 30; + m_resultset_max = 10; + m_session_ttl = 90; yf::SessionShared::Worker w(this); m_thrds.add_thread(new boost::thread(w)); } @@ -919,6 +1094,10 @@ void yf::SessionShared::process(mp::Package &package) const { f->present(package, apdu); } + else if (apdu->which == Z_APDU_scanRequest) + { + f->scan(package, apdu); + } else { mp::odr odr; @@ -933,6 +1112,54 @@ void yf::SessionShared::process(mp::Package &package) const m_p->release_frontend(package); } +void yf::SessionShared::configure(const xmlNode *ptr, bool test_only) +{ + for (ptr = ptr->children; ptr; ptr = ptr->next) + { + if (ptr->type != XML_ELEMENT_NODE) + continue; + if (!strcmp((const char *) ptr->name, "resultset")) + { + const struct _xmlAttr *attr; + for (attr = ptr->properties; attr; attr = attr->next) + { + if (!strcmp((const char *) attr->name, "ttl")) + m_p->m_resultset_ttl = + mp::xml::get_int(attr->children, 30); + else if (!strcmp((const char *) attr->name, "max")) + { + m_p->m_resultset_max = + mp::xml::get_int(attr->children, 10); + } + else + throw mp::filter::FilterException( + "Bad attribute " + std::string((const char *) + attr->name)); + } + } + else if (!strcmp((const char *) ptr->name, "session")) + { + const struct _xmlAttr *attr; + for (attr = ptr->properties; attr; attr = attr->next) + { + if (!strcmp((const char *) attr->name, "ttl")) + m_p->m_session_ttl = + mp::xml::get_int(attr->children, 120); + else + throw mp::filter::FilterException( + "Bad attribute " + std::string((const char *) + attr->name)); + } + } + else + { + throw mp::filter::FilterException("Bad element " + + std::string((const char *) + ptr->name)); + } + } +} + static mp::filter::Base* filter_creator() { return new mp::filter::SessionShared; @@ -949,8 +1176,9 @@ extern "C" { /* * Local variables: * c-basic-offset: 4 + * c-file-style: "Stroustrup" * indent-tabs-mode: nil - * c-file-style: "stroustrup" * End: * vim: shiftwidth=4 tabstop=8 expandtab */ +