1 /* $Id: filter_load_balance.cpp,v 1.7 2007-01-25 14:05:54 adam Exp $
2 Copyright (c) 2005-2007, Index Data.
4 See the LICENSE file for details
11 #include "filter_load_balance.hpp"
15 #include <boost/thread/mutex.hpp>
16 #include <boost/date_time/posix_time/posix_time.hpp>
20 // remove max macro if already defined (defined later in <limits>)
30 namespace mp = metaproxy_1;
31 namespace yf = mp::filter;
33 namespace metaproxy_1 {
35 class LoadBalance::Impl {
39 void process(metaproxy_1::Package & package);
40 void configure(const xmlNode * ptr);
42 // statistic manipulating functions,
43 void add_dead(unsigned long session_id);
44 //void clear_dead(unsigned long session_id);
45 void add_package(unsigned long session_id);
46 void remove_package(unsigned long session_id);
47 void add_session(unsigned long session_id, std::string target);
48 void remove_session(unsigned long session_id);
49 std::string find_session_target(unsigned long session_id);
52 unsigned int cost(std::string target);
53 unsigned int dead(std::string target);
58 unsigned int sessions;
59 unsigned int packages;
62 unsigned int c = sessions + packages + deads;
63 //std::cout << "stats c:" << c
64 // << " s:" << sessions
65 // << " p:" << packages
72 // local protected databases
74 std::map<std::string, TargetStat> m_target_stat;
75 std::map<unsigned long, std::string> m_session_target;
80 // define Pimpl wrapper forwarding to Impl
82 yf::LoadBalance::LoadBalance() : m_p(new Impl)
86 yf::LoadBalance::~LoadBalance()
87 { // must have a destructor because of boost::scoped_ptr
90 void yf::LoadBalance::configure(const xmlNode *xmlnode)
92 m_p->configure(xmlnode);
95 void yf::LoadBalance::process(mp::Package &package) const
97 m_p->process(package);
101 // define Implementation stuff
105 yf::LoadBalance::Impl::Impl()
109 yf::LoadBalance::Impl::~Impl()
113 void yf::LoadBalance::Impl::configure(const xmlNode *xmlnode)
117 void yf::LoadBalance::Impl::process(mp::Package &package)
120 bool is_closed_front = false;
121 bool is_closed_back = false;
123 // checking for closed front end packages
124 if (package.session().is_closed()){
125 is_closed_front = true;
128 Z_GDU *gdu_req = package.request().get();
130 // passing anything but z3950 packages
131 if (gdu_req && gdu_req->which == Z_GDU_Z3950){
133 // target selecting only on Z39.50 init request
134 if (gdu_req->u.z3950->which == Z_APDU_initRequest){
136 mp::odr odr_en(ODR_ENCODE);
137 Z_InitRequest *org_init = gdu_req->u.z3950->u.initRequest;
139 // extracting virtual hosts
140 std::list<std::string> vhosts;
142 mp::util::remove_vhost_otherinfo(&(org_init->otherInfo), vhosts);
144 // choosing one target according to load-balancing algorithm
148 unsigned int cost = std::numeric_limits<unsigned int>::max();
150 { //locking scope for local databases
151 boost::mutex::scoped_lock scoped_lock(m_mutex);
153 // load-balancing algorithm goes here
154 //target = *vhosts.begin();
155 for(std::list<std::string>::const_iterator ivh
159 if ((*ivh).size() != 0){
161 = yf::LoadBalance::Impl::cost(*ivh);
169 // updating local database
170 add_session(package.session().id(), target);
171 yf::LoadBalance::Impl::cost(target);
172 add_package(package.session().id());
175 // copying new target into init package
176 mp::util::set_vhost_otherinfo(&(org_init->otherInfo),
178 package.request() = gdu_req;
182 // frontend Z39.50 close request is added to statistics and marked
183 else if (gdu_req->u.z3950->which == Z_APDU_close){
184 is_closed_front = true;
185 boost::mutex::scoped_lock scoped_lock(m_mutex);
186 add_package(package.session().id());
188 // any other Z39.50 package is added to statistics
190 boost::mutex::scoped_lock scoped_lock(m_mutex);
191 add_package(package.session().id());
195 // moving all package types
199 // checking for closed back end packages
200 if (package.session().is_closed())
201 is_closed_back = true;
203 Z_GDU *gdu_res = package.response().get();
205 // passing anything but z3950 packages
206 if (gdu_res && gdu_res->which == Z_GDU_Z3950){
208 // session closing only on Z39.50 close response
209 if (gdu_res->u.z3950->which == Z_APDU_close){
210 is_closed_back = true;
211 boost::mutex::scoped_lock scoped_lock(m_mutex);
212 remove_package(package.session().id());
214 // any other Z39.50 package is removed from statistics
216 boost::mutex::scoped_lock scoped_lock(m_mutex);
217 remove_package(package.session().id());
221 // finally removing sessions and marking deads
222 if (is_closed_back || is_closed_front){
223 boost::mutex::scoped_lock scoped_lock(m_mutex);
225 // marking backend dead if backend closed without fronted close
226 if (is_closed_front == false)
227 add_dead(package.session().id());
229 remove_session(package.session().id());
231 // making sure that package is closed
232 package.session().close();
236 // getting timestamp for receiving of package
237 //boost::posix_time::ptime receive_time
238 // = boost::posix_time::microsec_clock::local_time();
239 // //<< receive_time << " "
240 // //<< to_iso_string(receive_time) << " "
241 //<< to_iso_extended_string(receive_time) << " "
244 // statistic manipulating functions,
245 void yf::LoadBalance::Impl::add_dead(unsigned long session_id){
248 std::string target = find_session_target(session_id);
250 if (target.size() != 0){
251 std::map<std::string, TargetStat>::iterator itarg;
252 itarg = m_target_stat.find(target);
253 if (itarg != m_target_stat.end()
254 && itarg->second.deads < std::numeric_limits<unsigned int>::max()){
255 itarg->second.deads += 1;
256 // std:.cout << "add_dead " << session_id << " " << target
257 // << " d:" << itarg->second.deads << "\n";
262 //void yf::LoadBalance::Impl::clear_dead(unsigned long session_id){
263 // std::cout << "clear_dead " << session_id << "\n";
266 void yf::LoadBalance::Impl::add_package(unsigned long session_id){
268 std::string target = find_session_target(session_id);
270 if (target.size() != 0){
271 std::map<std::string, TargetStat>::iterator itarg;
272 itarg = m_target_stat.find(target);
273 if (itarg != m_target_stat.end()
274 && itarg->second.packages
275 < std::numeric_limits<unsigned int>::max()){
276 itarg->second.packages += 1;
277 // std:.cout << "add_package " << session_id << " " << target
278 // << " p:" << itarg->second.packages << "\n";
283 void yf::LoadBalance::Impl::remove_package(unsigned long session_id){
284 std::string target = find_session_target(session_id);
286 if (target.size() != 0){
287 std::map<std::string, TargetStat>::iterator itarg;
288 itarg = m_target_stat.find(target);
289 if (itarg != m_target_stat.end()
290 && itarg->second.packages > 0){
291 itarg->second.packages -= 1;
292 // std:.cout << "remove_package " << session_id << " " << target
293 // << " p:" << itarg->second.packages << "\n";
298 void yf::LoadBalance::Impl::add_session(unsigned long session_id,
301 // finding and adding session
302 std::map<unsigned long, std::string>::iterator isess;
303 isess = m_session_target.find(session_id);
304 if (isess == m_session_target.end()){
305 m_session_target.insert(std::make_pair(session_id, target));
308 // finding and adding target statistics
309 std::map<std::string, TargetStat>::iterator itarg;
310 itarg = m_target_stat.find(target);
311 if (itarg == m_target_stat.end()){
314 stat.packages = 0; // no idea why the defaut constructor TargetStat()
315 stat.deads = 0; // is not initializig this correctly to zero ??
316 m_target_stat.insert(std::make_pair(target, stat));
317 // std:.cout << "add_session " << session_id << " " << target
320 else if (itarg->second.sessions < std::numeric_limits<unsigned int>::max())
322 itarg->second.sessions += 1;
323 // std:.cout << "add_session " << session_id << " " << target
324 // << " s:" << itarg->second.sessions << "\n";
328 void yf::LoadBalance::Impl::remove_session(unsigned long session_id){
333 std::map<unsigned long, std::string>::iterator isess;
334 isess = m_session_target.find(session_id);
335 if (isess == m_session_target.end())
338 target = isess->second;
340 // finding target statistics
341 std::map<std::string, TargetStat>::iterator itarg;
342 itarg = m_target_stat.find(target);
343 if (itarg == m_target_stat.end()){
344 m_session_target.erase(isess);
348 // counting session down
349 if (itarg->second.sessions > 0)
350 itarg->second.sessions -= 1;
352 // std:.cout << "remove_session " << session_id << " " << target
353 // << " s:" << itarg->second.sessions << "\n";
355 // clearing empty sessions and targets
356 if (itarg->second.sessions == 0 && itarg->second.deads == 0 ){
357 m_target_stat.erase(itarg);
358 m_session_target.erase(isess);
363 yf::LoadBalance::Impl::find_session_target(unsigned long session_id){
366 std::map<unsigned long, std::string>::iterator isess;
367 isess = m_session_target.find(session_id);
368 if (isess != m_session_target.end())
369 target = isess->second;
375 unsigned int yf::LoadBalance::Impl::cost(std::string target){
379 if (target.size() != 0){
380 std::map<std::string, TargetStat>::iterator itarg;
381 itarg = m_target_stat.find(target);
382 if (itarg != m_target_stat.end()){
383 cost = itarg->second.cost();
387 //std::cout << "cost " << target << " c:" << cost << "\n";
391 unsigned int yf::LoadBalance::Impl::dead(std::string target){
395 if (target.size() != 0){
396 std::map<std::string, TargetStat>::iterator itarg;
397 itarg = m_target_stat.find(target);
398 if (itarg != m_target_stat.end()){
399 dead = itarg->second.deads;
403 //std::cout << "dead " << target << " d:" << dead << "\n";
410 static mp::filter::Base* filter_creator()
412 return new mp::filter::LoadBalance;
416 struct metaproxy_1_filter_struct metaproxy_1_filter_load_balance = {
427 * indent-tabs-mode: nil
428 * c-file-style: "stroustrup"
430 * vim: shiftwidth=4 tabstop=8 expandtab