From: Marc Cromme Date: Wed, 3 Jan 2007 15:03:55 +0000 (+0000) Subject: first working version of load balancing filter X-Git-Tag: METAPROXY.1.0.8~55 X-Git-Url: http://sru.miketaylor.org.uk/cgi-bin?a=commitdiff_plain;h=66d4ec677bbc06aa9be29ab2e5fa2bec7442a7fa;p=metaproxy-moved-to-github.git first working version of load balancing filter still needs testing for race conditions and wrong mutex locks there seems to be a slight error in the counting of dead connections, needs improvement also removing of debug output is needed --- diff --git a/src/filter_load_balance.cpp b/src/filter_load_balance.cpp index 57bd390..64536f2 100644 --- a/src/filter_load_balance.cpp +++ b/src/filter_load_balance.cpp @@ -1,13 +1,14 @@ -/* $Id: filter_load_balance.cpp,v 1.1 2007-01-02 15:35:36 marc Exp $ +/* $Id: filter_load_balance.cpp,v 1.2 2007-01-03 15:03:55 marc Exp $ Copyright (c) 2005-2006, Index Data. See the LICENSE file for details */ #include "config.hpp" +#include "session.hpp" +#include "package.hpp" #include "filter.hpp" #include "filter_load_balance.hpp" -#include "package.hpp" #include "util.hpp" #include @@ -16,6 +17,9 @@ #include #include +#include +#include +#include namespace mp = metaproxy_1; namespace yf = mp::filter; @@ -29,7 +33,40 @@ namespace metaproxy_1 { void process(metaproxy_1::Package & package); void configure(const xmlNode * ptr); private: + // statistic manipulating functions, + void add_dead(unsigned long session_id); + //void clear_dead(unsigned long session_id); + void add_package(unsigned long session_id); + void remove_package(unsigned long session_id); + void add_session(unsigned long session_id, std::string target); + void remove_session(unsigned long session_id); + std::string find_session_target(unsigned long session_id); + + // cost functions + unsigned int cost(std::string target); + unsigned int dead(std::string target); + + // local classes + class TargetStat { + public: + unsigned int sessions; + unsigned int packages; + unsigned int deads; + unsigned int cost() { + unsigned int c = sessions + packages + deads; + std::cout << "cost c:" << c + << " s:" << sessions + << " p:" << packages + << " d:" << deads + <<"\n"; + return c; + } + }; + + // local protected databases boost::mutex m_mutex; + std::map m_target_stat; + std::map m_session_target; }; } } @@ -73,81 +110,267 @@ void yf::LoadBalance::Impl::configure(const xmlNode *xmlnode) void yf::LoadBalance::Impl::process(mp::Package &package) { - Z_GDU *gdu_req = package.request().get(); + bool is_closed_front = false; - // passing anything but z3950 packages - if (!gdu_req - || !(gdu_req->which == Z_GDU_Z3950)) - { - package.move(); - return; - } - + // checking for closed front end packages + if (package.session().is_closed()){ + is_closed_front = true; + } - // target selecting only on Z39.50 init request - if (gdu_req->u.z3950->which == Z_APDU_initRequest){ + Z_GDU *gdu_req = package.request().get(); - mp::odr odr_en(ODR_ENCODE); - Z_InitRequest *org_init = gdu_req->u.z3950->u.initRequest; + // passing anything but z3950 packages + if (gdu_req && gdu_req->which == Z_GDU_Z3950){ - // extracting virtual hosts - std::list vhosts; + // target selecting only on Z39.50 init request + if (gdu_req->u.z3950->which == Z_APDU_initRequest){ - mp::util::remove_vhost_otherinfo(&(org_init->otherInfo), vhosts); + mp::odr odr_en(ODR_ENCODE); + Z_InitRequest *org_init = gdu_req->u.z3950->u.initRequest; - //std::cout << "LoadBalance::Impl::process() vhosts: " - // << vhosts.size() << "\n"; - //std::cout << "LoadBalance::Impl::process()" << *gdu_req << "\n"; - - // choosing one target according to load-balancing algorithm - - if (vhosts.size()){ - std::string target; + // extracting virtual hosts + std::list vhosts; - // getting timestamp for receiving of package - boost::posix_time::ptime receive_time - = boost::posix_time::microsec_clock::local_time(); + mp::util::remove_vhost_otherinfo(&(org_init->otherInfo), vhosts); - // //<< receive_time << " " - // //<< to_iso_string(receive_time) << " " - //<< to_iso_extended_string(receive_time) << " " - // package.session().id(); - - { // scope for locking local target database - boost::mutex::scoped_lock scoped_lock(m_mutex); - target = *vhosts.begin(); - } + // choosing one target according to load-balancing algorithm + if (vhosts.size()){ + std::string target; + unsigned int cost = std::numeric_limits::max(); + + { //locking scope for local databases + boost::mutex::scoped_lock scoped_lock(m_mutex); + + // load-balancing algorithm goes here + //target = *vhosts.begin(); + for(std::list::const_iterator ivh + = vhosts.begin(); + ivh != vhosts.end(); + ivh++){ + if ((*ivh).size() != 0){ + unsigned int vhcost + = yf::LoadBalance::Impl::cost(*ivh); + if (cost > vhcost){ + cost = vhcost; + target = *ivh; + } + } + } + + // updating local database + add_session(package.session().id(), target); + yf::LoadBalance::Impl::cost(target); + add_package(package.session().id()); + } + + // copying new target into init package + mp::util::set_vhost_otherinfo(&(org_init->otherInfo), + odr_en, target); + package.request() = gdu_req; + } - // copying new target into init package - mp::util::set_vhost_otherinfo(&(org_init->otherInfo), odr_en, target); - package.request() = gdu_req; } - + // frontend Z39.50 close request is added to statistics and marked + else if (gdu_req->u.z3950->which == Z_APDU_close){ + is_closed_front = true; + boost::mutex::scoped_lock scoped_lock(m_mutex); + add_package(package.session().id()); + } + // any other Z39.50 package is added to statistics + else { + boost::mutex::scoped_lock scoped_lock(m_mutex); + add_package(package.session().id()); + } } - - // moving all Z39.50 package typess + // moving all package types package.move(); - - //boost::posix_time::ptime send_time - // = boost::posix_time::microsec_clock::local_time(); + // checking for closed back end packages + if (package.session().is_closed()) { + boost::mutex::scoped_lock scoped_lock(m_mutex); + + // marking backend dead if backend closed without fronted close + if (is_closed_front == false) + add_dead(package.session().id()); + + remove_session(package.session().id()); + } + + Z_GDU *gdu_res = package.response().get(); + + // passing anything but z3950 packages + if (gdu_res && gdu_res->which == Z_GDU_Z3950){ + + // session closing only on Z39.50 close response + if (gdu_res->u.z3950->which == Z_APDU_close){ + boost::mutex::scoped_lock scoped_lock(m_mutex); + remove_package(package.session().id()); + + // marking backend dead if backend closed without fronted close + if (is_closed_front == false) + add_dead(package.session().id()); + + //remove_session(package.session().id()); + } + // any other Z39.50 package is removed from statistics + else { + boost::mutex::scoped_lock scoped_lock(m_mutex); + remove_package(package.session().id()); + } + } +} + +// getting timestamp for receiving of package +//boost::posix_time::ptime receive_time +// = boost::posix_time::microsec_clock::local_time(); +// //<< receive_time << " " +// //<< to_iso_string(receive_time) << " " +//<< to_iso_extended_string(receive_time) << " " + + +// statistic manipulating functions, +void yf::LoadBalance::Impl::add_dead(unsigned long session_id){ + + std::string target = find_session_target(session_id); - //boost::posix_time::time_duration duration = send_time - receive_time; + std::cout << "add_dead " << session_id << "\n"; +}; +//void yf::LoadBalance::Impl::clear_dead(unsigned long session_id){ +// std::cout << "clear_dead " << session_id << "\n"; +//}; - // { // scope for locking local target database - // boost::mutex::scoped_lock scoped_lock(m_mutex); - // target = *vhosts.begin(); - // } +void yf::LoadBalance::Impl::add_package(unsigned long session_id){ + std::string target = find_session_target(session_id); + if (target.size() != 0){ + std::map::iterator itarg; + itarg = m_target_stat.find(target); + if (itarg != m_target_stat.end()){ + itarg->second.packages += 1; + std::cout << "add_package " << session_id << " " << target + << " p:" << itarg->second.packages << "\n"; + } + } +}; + +void yf::LoadBalance::Impl::remove_package(unsigned long session_id){ + std::string target = find_session_target(session_id); + + if (target.size() != 0){ + std::map::iterator itarg; + itarg = m_target_stat.find(target); + if (itarg != m_target_stat.end()){ + itarg->second.packages -= 1; + std::cout << "remove_package " << session_id << " " << target + << " p:" << itarg->second.packages << "\n"; + } + } +}; + +void yf::LoadBalance::Impl::add_session(unsigned long session_id, + std::string target){ + + // finding and adding session + std::map::iterator isess; + isess = m_session_target.find(session_id); + if (isess == m_session_target.end()){ + m_session_target.insert(std::make_pair(session_id, target)); + } + + // finding and adding target statistics + std::map::iterator itarg; + itarg = m_target_stat.find(target); + if (itarg == m_target_stat.end()){ + TargetStat stat; + stat.sessions = 1; + m_target_stat.insert(std::make_pair(target, stat)); + std::cout << "add_session " << session_id << " " << target + << " s:1\n"; + } else { + itarg->second.sessions += 1; + std::cout << "add_session " << session_id << " " << target + << " s:" << itarg->second.sessions << "\n"; + } + + +}; + +void yf::LoadBalance::Impl::remove_session(unsigned long session_id){ + + std::string target; + + // finding session + std::map::iterator isess; + isess = m_session_target.find(session_id); + if (isess == m_session_target.end()) + return; + else + target = isess->second; + + // finding target statistics + std::map::iterator itarg; + itarg = m_target_stat.find(target); + if (itarg == m_target_stat.end()){ + m_session_target.erase(isess); + return; + } + + // counting session down + itarg->second.sessions -= 1; + + std::cout << "remove_session " << session_id << " " << target + << " s:" << itarg->second.sessions << "\n"; + + // clearing empty sessions and targets + if (itarg->second.sessions == 0){ + m_target_stat.erase(itarg); + m_session_target.erase(isess); + } +}; + +std::string +yf::LoadBalance::Impl::find_session_target(unsigned long session_id){ + + std::string target; + std::map::iterator isess; + isess = m_session_target.find(session_id); + if (isess != m_session_target.end()) + target = isess->second; + return target; } +// cost functions +unsigned int yf::LoadBalance::Impl::cost(std::string target){ + + unsigned int cost; + + if (target.size() != 0){ + std::map::iterator itarg; + itarg = m_target_stat.find(target); + if (itarg != m_target_stat.end()){ + cost = itarg->second.cost(); + } + } + + std::cout << "cost " << target << " c:" << cost << "\n"; + return cost; +}; + +unsigned int yf::LoadBalance::Impl::dead(std::string target){ + std::cout << "dead " << target << "\n"; + return 0; +}; + + + + static mp::filter::Base* filter_creator() { return new mp::filter::LoadBalance;