From: Adam Dickmeiss Date: Sun, 15 Jan 2006 20:03:14 +0000 (+0000) Subject: Added filter multi. init+search operational X-Git-Tag: YP2.0.0.2~89 X-Git-Url: http://sru.miketaylor.org.uk/cgi-bin?a=commitdiff_plain;h=b0e3c437bffcf99c6f5a3ceb77d661cc366be05f;p=metaproxy-moved-to-github.git Added filter multi. init+search operational --- diff --git a/src/.cvsignore b/src/.cvsignore index d27fd48..dea9d51 100644 --- a/src/.cvsignore +++ b/src/.cvsignore @@ -18,6 +18,7 @@ test_filter1 test_filter2 test_filter_frontend_net test_filter_log +test_filter_multi test_package1 test_pipe test_thread_pool_observer diff --git a/src/Makefile.am b/src/Makefile.am index 4d114b8..275c69d 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1,4 +1,4 @@ -## $Id: Makefile.am,v 1.42 2006-01-12 10:04:34 adam Exp $ +## $Id: Makefile.am,v 1.43 2006-01-15 20:03:14 adam Exp $ MAINTAINERCLEANFILES = Makefile.in config.in config.hpp @@ -21,6 +21,7 @@ libyp2_la_SOURCES = \ filter_auth_simple.cpp filter_auth_simple.hpp \ filter_frontend_net.cpp filter_frontend_net.hpp \ filter_log.cpp filter_log.hpp \ + filter_multi.cpp filter_multi.hpp \ filter_virt_db.cpp filter_virt_db.hpp \ filter_z3950_client.cpp filter_z3950_client.hpp \ filter_backend_test.cpp filter_backend_test.hpp \ @@ -62,6 +63,7 @@ check_PROGRAMS = \ test_filter_factory \ test_filter_frontend_net \ test_filter_log \ + test_filter_multi \ test_filter_z3950_client \ test_filter_backend_test \ test_filter_virt_db \ @@ -83,6 +85,7 @@ test_filter_auth_simple_SOURCES = test_filter_auth_simple.cpp test_filter_factory_SOURCES = test_filter_factory.cpp test_filter_frontend_net_SOURCES = test_filter_frontend_net.cpp test_filter_log_SOURCES = test_filter_log.cpp +test_filter_multi_SOURCES = test_filter_multi.cpp test_filter_z3950_client_SOURCES = test_filter_z3950_client.cpp test_filter_backend_test_SOURCES = test_filter_backend_test.cpp test_filter_virt_db_SOURCES = test_filter_virt_db.cpp @@ -104,6 +107,7 @@ test_filter_auth_simple_LDADD = $(TESTLDADD) test_filter_factory_LDADD = $(TESTLDADD) test_filter_frontend_net_LDADD = $(TESTLDADD) test_filter_log_LDADD = $(TESTLDADD) +test_filter_multi_LDADD = $(TESTLDADD) test_filter_z3950_client_LDADD = $(TESTLDADD) test_filter_backend_test_LDADD = $(TESTLDADD) test_filter_virt_db_LDADD = $(TESTLDADD) diff --git a/src/factory_static.cpp b/src/factory_static.cpp index 2b95aa9..8e7f283 100644 --- a/src/factory_static.cpp +++ b/src/factory_static.cpp @@ -1,4 +1,4 @@ -/* $Id: factory_static.cpp,v 1.5 2006-01-12 10:04:34 adam Exp $ +/* $Id: factory_static.cpp,v 1.6 2006-01-15 20:03:14 adam Exp $ Copyright (c) 2005, Index Data. %LICENSE% @@ -20,6 +20,7 @@ #include "filter_backend_test.hpp" #include "filter_frontend_net.hpp" #include "filter_log.hpp" +#include "filter_multi.hpp" #include "filter_session_shared.hpp" #include "filter_template.hpp" #include "filter_virt_db.hpp" @@ -32,6 +33,7 @@ yp2::FactoryStatic::FactoryStatic() &yp2_filter_backend_test, &yp2_filter_frontend_net, &yp2_filter_log, + &yp2_filter_multi, &yp2_filter_session_shared, &yp2_filter_template, &yp2_filter_virt_db, diff --git a/src/filter_multi.cpp b/src/filter_multi.cpp new file mode 100644 index 0000000..09f5f2f --- /dev/null +++ b/src/filter_multi.cpp @@ -0,0 +1,502 @@ +/* $Id: filter_multi.cpp,v 1.1 2006-01-15 20:03:14 adam Exp $ + Copyright (c) 2005, Index Data. + +%LICENSE% + */ + +#include "config.hpp" + +#include "filter.hpp" +#include "package.hpp" + +#include +#include +#include +#include + +#include "util.hpp" +#include "filter_multi.hpp" + +#include +#include +#include + +#include +#include + +namespace yf = yp2::filter; + +namespace yp2 { + namespace filter { + + struct Multi::BackendSet { + BackendPtr m_backend; + long size; + }; + struct Multi::Set { + Set(std::string setname); + Set(); + ~Set(); + + std::list m_backend_sets; + std::string m_setname; + }; + struct Multi::Backend { + PackagePtr m_package; + std::string m_backend_database; + std::string m_vhost; + std::string m_route; + void operator() (void); // thread operation + }; + struct Multi::Frontend { + Frontend(Rep *rep); + ~Frontend(); + yp2::Session m_session; + bool m_is_multi; + bool m_in_use; + std::list m_backend_list; + std::map m_sets; + void multi_move(); + void init(Package &package, Z_GDU *gdu); + void close(Package &package); + void search(Package &package, Z_APDU *apdu); +#if 0 + void present(Package &package, Z_APDU *apdu); + void scan(Package &package, Z_APDU *apdu); +#endif + Rep *m_p; + }; + struct Multi::Map { + Map(std::list hosts, std::string route); + Map(); + std::list m_hosts; + std::string m_route; + }; + class Multi::Rep { + friend class Multi; + friend class Frontend; + + FrontendPtr get_frontend(Package &package); + void release_frontend(Package &package); + private: + boost::mutex m_sessions_mutex; + std::mapm_maps; + + boost::mutex m_mutex; + boost::condition m_cond_session_ready; + std::map m_clients; + }; + } +} + +using namespace yp2; + +yf::Multi::Frontend::Frontend(Rep *rep) +{ + m_p = rep; + m_is_multi = false; +} + +yf::Multi::Frontend::~Frontend() +{ +} + +yf::Multi::FrontendPtr yf::Multi::Rep::get_frontend(Package &package) +{ + boost::mutex::scoped_lock lock(m_mutex); + + std::map::iterator it; + + while(true) + { + it = m_clients.find(package.session()); + if (it == m_clients.end()) + break; + + if (!it->second->m_in_use) + { + it->second->m_in_use = true; + return it->second; + } + m_cond_session_ready.wait(lock); + } + FrontendPtr f(new Frontend(this)); + m_clients[package.session()] = f; + f->m_in_use = true; + return f; +} + +void yf::Multi::Rep::release_frontend(Package &package) +{ + boost::mutex::scoped_lock lock(m_mutex); + std::map::iterator it; + + it = m_clients.find(package.session()); + if (it != m_clients.end()) + { + if (package.session().is_closed()) + { + it->second->close(package); + m_clients.erase(it); + } + else + { + it->second->m_in_use = false; + } + m_cond_session_ready.notify_all(); + } +} + +yf::Multi::Set::Set(std::string setname) + : m_setname(setname) +{ +} + + +yf::Multi::Set::Set() +{ +} + + +yf::Multi::Set::~Set() +{ +} + +yf::Multi::Map::Map(std::list hosts, std::string route) + : m_hosts(hosts), m_route(route) +{ +} + +yf::Multi::Map::Map() +{ +} + +yf::Multi::Multi() : m_p(new Multi::Rep) +{ +} + +yf::Multi::~Multi() { +} + + +void yf::Multi::add_map_host2hosts(std::string host, + std::list hosts, + std::string route) +{ + m_p->m_maps[host] = Multi::Map(hosts, route); +} + +void yf::Multi::Backend::operator() (void) +{ + m_package->move(m_route); +} + +void yf::Multi::Frontend::close(Package &package) +{ + std::list::const_iterator bit; + for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++) + { + BackendPtr b = *bit; + + b->m_package->copy_filter(package); + b->m_package->request() = (Z_GDU *) 0; + b->m_package->session().close(); + b->m_package->move(b->m_route); + } +} + +void yf::Multi::Frontend::multi_move() +{ + std::list::const_iterator bit; + boost::thread_group g; + for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++) + { + g.add_thread(new boost::thread(**bit)); + } + g.join_all(); +} + +void yf::Multi::Frontend::init(Package &package, Z_GDU *gdu) +{ + Z_InitRequest *req = gdu->u.z3950->u.initRequest; + + // empty or non-existang vhost is the same.. + const char *vhost_cstr = + yaz_oi_get_string_oidval(&req->otherInfo, VAL_PROXY, 1, 0); + std::string vhost; + if (vhost_cstr) + vhost = std::string(vhost_cstr); + + std::map::const_iterator it; + it = m_p->m_maps.find(std::string(vhost)); + if (it == m_p->m_maps.end()) + { + // might return diagnostics if no match + package.move(); + return; + } + std::list::const_iterator hit = it->second.m_hosts.begin(); + for (; hit != it->second.m_hosts.end(); hit++) + { + Session s; + Backend *b = new Backend; + b->m_vhost = *hit; + b->m_route = it->second.m_route; + b->m_package = PackagePtr(new Package(s, package.origin())); + + m_backend_list.push_back(BackendPtr(b)); + } + // we're going to deal with this for sure.. + + m_is_multi = true; + + // create init request + std::list::const_iterator bit; + for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++) + { + yp2::odr odr; + BackendPtr b = *bit; + Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest); + + yaz_oi_set_string_oidval(&init_apdu->u.initRequest->otherInfo, odr, + VAL_PROXY, 1, b->m_vhost.c_str()); + + Z_InitRequest *req = init_apdu->u.initRequest; + + ODR_MASK_SET(req->options, Z_Options_search); + ODR_MASK_SET(req->options, Z_Options_present); + ODR_MASK_SET(req->options, Z_Options_namedResultSets); + + ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1); + ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2); + ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3); + + b->m_package->request() = init_apdu; + + b->m_package->copy_filter(package); + } + multi_move(); + + // create the frontend init response based on each backend init response + yp2::odr odr; + + int i; + + Z_APDU *f_apdu = odr.create_initResponse(gdu->u.z3950, 0, 0); + Z_InitResponse *f_resp = f_apdu->u.initResponse; + + ODR_MASK_SET(f_resp->options, Z_Options_search); + ODR_MASK_SET(f_resp->options, Z_Options_present); + ODR_MASK_SET(f_resp->options, Z_Options_namedResultSets); + + ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_1); + ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_2); + ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_3); + + for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++) + { + PackagePtr p = (*bit)->m_package; + + if (p->session().is_closed()) // if any backend closes, close frontend + package.session().close(); + Z_GDU *gdu = p->response().get(); + if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which == + Z_APDU_initResponse) + { + Z_APDU *b_apdu = gdu->u.z3950; + Z_InitResponse *b_resp = b_apdu->u.initResponse; + + // common options for all backends + for (i = 0; i <= Z_Options_stringSchema; i++) + { + if (!ODR_MASK_GET(b_resp->options, i)) + ODR_MASK_CLEAR(f_resp->options, i); + } + // common protocol version + for (i = 0; i <= Z_ProtocolVersion_3; i++) + if (!ODR_MASK_GET(b_resp->protocolVersion, i)) + ODR_MASK_CLEAR(f_resp->protocolVersion, i); + // reject if any of the backends reject + if (!*b_resp->result) + *f_resp->result = 0; + } + else + { + // if any target does not return init return that (close or + // similar ) + package.response() = p->response(); + return; + } + } + package.response() = f_apdu; +} + +void yf::Multi::Frontend::search(Package &package, Z_APDU *apdu_req) +{ + // create search request + Z_SearchRequest *req = apdu_req->u.searchRequest; + + // deal with piggy back (for now disable) + *req->smallSetUpperBound = 0; + *req->largeSetLowerBound = 1; + *req->mediumSetPresentNumber = 1; + + std::list::const_iterator bit; + for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++) + { + PackagePtr p = (*bit)->m_package; + // we don't modify database name yet! + + p->request() = apdu_req; + p->copy_filter(package); + } + multi_move(); + + // look at each response + int total_hits = 0; + for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++) + { + PackagePtr p = (*bit)->m_package; + + if (p->session().is_closed()) // if any backend closes, close frontend + package.session().close(); + + Z_GDU *gdu = p->response().get(); + if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which == + Z_APDU_searchResponse) + { + Z_APDU *b_apdu = gdu->u.z3950; + Z_SearchResponse *b_resp = b_apdu->u.searchResponse; + + total_hits += *b_resp->resultCount; + } + else + { + // if any target does not return search response - return that + package.response() = p->response(); + return; + } + } + + yp2::odr odr; + Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0); + Z_SearchResponse *f_resp = f_apdu->u.searchResponse; + + *f_resp->resultCount = total_hits; + + package.response() = f_apdu; +} + +void yf::Multi::process(Package &package) const +{ + FrontendPtr f = m_p->get_frontend(package); + + Z_GDU *gdu = package.request().get(); + + if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which == + Z_APDU_initRequest && !f->m_is_multi) + { + f->init(package, gdu); + } + else if (!f->m_is_multi) + package.move(); + else if (gdu && gdu->which == Z_GDU_Z3950) + { + Z_APDU *apdu = gdu->u.z3950; + if (apdu->which == Z_APDU_initRequest) + { + yp2::odr odr; + + package.response() = odr.create_close( + apdu, + Z_Close_protocolError, + "double init"); + + package.session().close(); + } + else if (apdu->which == Z_APDU_searchRequest) + { + f->search(package, apdu); + } + else + { + yp2::odr odr; + + package.response() = odr.create_close( + apdu, Z_Close_protocolError, + "unsupported APDU in filter multi"); + + package.session().close(); + } + } + m_p->release_frontend(package); +} + +void yp2::filter::Multi::configure(const xmlNode * ptr) +{ + for (ptr = ptr->children; ptr; ptr = ptr->next) + { + if (ptr->type != XML_ELEMENT_NODE) + continue; + if (!strcmp((const char *) ptr->name, "virtual")) + { + std::list targets; + std::string vhost; + xmlNode *v_node = ptr->children; + for (; v_node; v_node = v_node->next) + { + if (v_node->type != XML_ELEMENT_NODE) + continue; + + if (yp2::xml::is_element_yp2(v_node, "vhost")) + vhost = yp2::xml::get_text(v_node); + else if (yp2::xml::is_element_yp2(v_node, "target")) + targets.push_back(yp2::xml::get_text(v_node)); + else + throw yp2::filter::FilterException + ("Bad element " + + std::string((const char *) v_node->name) + + " in virtual section" + ); + } + std::string route = yp2::xml::get_route(ptr); + add_map_host2hosts(vhost, targets, route); + std::list::const_iterator it; + for (it = targets.begin(); it != targets.end(); it++) + { + std::cout << "Add " << vhost << "->" << *it + << "," << route << "\n"; + } + } + else + { + throw yp2::filter::FilterException + ("Bad element " + + std::string((const char *) ptr->name) + + " in virt_db filter"); + } + } +} + +static yp2::filter::Base* filter_creator() +{ + return new yp2::filter::Multi; +} + +extern "C" { + struct yp2_filter_struct yp2_filter_multi = { + 0, + "multi", + filter_creator + }; +} + + +/* + * Local variables: + * c-basic-offset: 4 + * indent-tabs-mode: nil + * c-file-style: "stroustrup" + * End: + * vim: shiftwidth=4 tabstop=8 expandtab + */ diff --git a/src/filter_multi.hpp b/src/filter_multi.hpp new file mode 100644 index 0000000..98d7210 --- /dev/null +++ b/src/filter_multi.hpp @@ -0,0 +1,57 @@ +/* $Id: filter_multi.hpp,v 1.1 2006-01-15 20:03:14 adam Exp $ + Copyright (c) 2005, Index Data. + +%LICENSE% + */ + +#ifndef FILTER_MULTI_HPP +#define FILTER_MULTI_HPP + +#include +#include +#include +#include +#include + +#include "filter.hpp" + +namespace yp2 { + namespace filter { + class Multi : public Base { + class Rep; + class Frontend; + class Map; + class Set; + class Backend; + class BackendSet; + typedef boost::shared_ptr BackendPtr; + typedef boost::shared_ptr FrontendPtr; + typedef boost::shared_ptr PackagePtr; + typedef std::map::iterator Sets_it; + public: + ~Multi(); + Multi(); + void process(yp2::Package & package) const; + void configure(const xmlNode * ptr); + void add_map_host2hosts(std::string host, + std::list hosts, + std::string route); + private: + boost::scoped_ptr m_p; + }; + } +} + +extern "C" { + extern struct yp2_filter_struct yp2_filter_multi; +} + +#endif +/* + * Local variables: + * c-basic-offset: 4 + * indent-tabs-mode: nil + * c-file-style: "stroustrup" + * End: + * vim: shiftwidth=4 tabstop=8 expandtab + */ diff --git a/src/filter_virt_db.hpp b/src/filter_virt_db.hpp index bbbfd71..6bd733c 100644 --- a/src/filter_virt_db.hpp +++ b/src/filter_virt_db.hpp @@ -1,4 +1,4 @@ -/* $Id: filter_virt_db.hpp,v 1.11 2006-01-14 08:38:57 adam Exp $ +/* $Id: filter_virt_db.hpp,v 1.12 2006-01-15 20:03:14 adam Exp $ Copyright (c) 2005, Index Data. %LICENSE% @@ -22,8 +22,8 @@ namespace yp2 { class Map; class Set; class Backend; - typedef boost::shared_ptr BackendPtr; - typedef boost::shared_ptr FrontendPtr; + typedef boost::shared_ptr BackendPtr; + typedef boost::shared_ptr FrontendPtr; public: ~Virt_db(); Virt_db(); diff --git a/src/test_filter_multi.cpp b/src/test_filter_multi.cpp new file mode 100644 index 0000000..c9d2af2 --- /dev/null +++ b/src/test_filter_multi.cpp @@ -0,0 +1,104 @@ +/* $Id: test_filter_multi.cpp,v 1.1 2006-01-15 20:03:14 adam Exp $ + Copyright (c) 2005, Index Data. + +%LICENSE% + */ + +#include "config.hpp" +#include +#include + +#include "filter_multi.hpp" +#include "util.hpp" +#include "router_chain.hpp" +#include "session.hpp" +#include "package.hpp" + +#define BOOST_AUTO_TEST_MAIN +#include + +using namespace boost::unit_test; + +class FilterBounceInit: public yp2::filter::Base { +public: + void process(yp2::Package & package) const { + + if (package.session().is_closed()) + { + // std::cout << "Got Close.\n"; + } + + Z_GDU *gdu = package.request().get(); + if (gdu) + { + // std::cout << "Got PDU. Sending init response\n"; + yp2::odr odr; + Z_APDU *apdu = zget_APDU(odr, Z_APDU_initResponse); + + apdu->u.initResponse->implementationName = "YP2/YAZ"; + + package.response() = apdu; + } + package.move(); + }; +}; + + +BOOST_AUTO_UNIT_TEST( test_filter_multi_1 ) +{ + try + { + yp2::filter::Multi lf; + } + catch ( ... ) { + BOOST_CHECK (false); + } +} + +BOOST_AUTO_UNIT_TEST( test_filter_multi_2 ) +{ + try + { + yp2::RouterChain router; + + yp2::filter::Multi multi; + FilterBounceInit bounce; + + router.append(multi); + router.append(bounce); + + // Create package with Z39.50 init request in it + yp2::Package pack; + + yp2::odr odr; + Z_APDU *apdu = zget_APDU(odr, Z_APDU_initRequest); + + pack.request() = apdu; + // Done creating query. + + // Put it in router + pack.router(router).move(); + + // Inspect that we got Z39.50 init response + yazpp_1::GDU *gdu = &pack.response(); + + Z_GDU *z_gdu = gdu->get(); + BOOST_CHECK(z_gdu); + if (z_gdu) { + BOOST_CHECK_EQUAL(z_gdu->which, Z_GDU_Z3950); + BOOST_CHECK_EQUAL(z_gdu->u.z3950->which, Z_APDU_initResponse); + } + } + catch ( ... ) { + BOOST_CHECK (false); + } +} + +/* + * Local variables: + * c-basic-offset: 4 + * indent-tabs-mode: nil + * c-file-style: "stroustrup" + * End: + * vim: shiftwidth=4 tabstop=8 expandtab + */