dbType = $db_type; $this->errorHandler = $error_handler; $this->DBClusterTimeout *= 1e6; // convert to milliseconds } /** * Setups load balancer according given configuration. * * @param Array $config * @return void * @access public */ public function setup($config) { $this->servers = Array (); $this->servers[0] = Array ( 'DBHost' => $config['Database']['DBHost'], 'DBUser' => $config['Database']['DBUser'], 'DBUserPassword' => $config['Database']['DBUserPassword'], 'DBName' => $config['Database']['DBName'], 'DBLoad' => 0, ); if ( isset($config['Databases']) ) { $this->servers = array_merge($this->servers, $config['Databases']); } foreach ($this->servers as $server_index => $server_setting) { $this->serverLoads[$server_index] = $server_setting['DBLoad']; } } /** * Returns connection index to master database * * @return int * @access protected */ protected function getMasterIndex() { return 0; } /** * Returns connection index to slave database. This takes into account load ratios and lag times. * Side effect: opens connections to databases * * @return int * @access protected */ protected function getSlaveIndex() { if ( count($this->servers) == 1 || $this->Application->isAdmin || defined('CRON') ) { // Skip the load balancing if there's only one server OR in admin console OR in CRON. return 0; } elseif ( $this->slaveIndex !== false ) { // shortcut if generic reader exists already return $this->slaveIndex; } $total_elapsed = 0; $non_error_loads = $this->serverLoads; $i = $found = $lagged_slave_mode = false; // first try quickly looking through the available servers for a server that meets our criteria do { $current_loads = $non_error_loads; $overloaded_servers = $total_threads_connected = 0; while ($current_loads) { if ( $lagged_slave_mode ) { // when all slave servers are too lagged, then ignore lag and pick random server $i = $this->pickRandom($current_loads); } else { $i = $this->getRandomNonLagged($current_loads); if ( $i === false && $current_loads ) { // all slaves lagged -> pick random lagged slave then $lagged_slave_mode = true; $i = $this->pickRandom( $current_loads ); } } if ( $i === false ) { // all slaves are down -> use master as a slave $this->slaveIndex = $this->getMasterIndex(); return $this->slaveIndex; } $conn =& $this->openConnection($i); if ( !$conn ) { unset($non_error_loads[$i], $current_loads[$i]); continue; } // Perform post-connection backoff $threshold = isset($this->servers[$i]['DBMaxThreads']) ? $this->servers[$i]['DBMaxThreads'] : false; $backoff = $this->postConnectionBackoff($conn, $threshold); if ( $backoff ) { // post-connection overload, don't use this server for now $total_threads_connected += $backoff; $overloaded_servers++; unset( $current_loads[$i] ); } else { // return this server break 2; } } // no server found yet $i = false; // if all servers were down, quit now if ( !$non_error_loads ) { break; } // back off for a while // scale the sleep time by the number of connected threads, to produce a roughly constant global poll rate $avg_threads = $total_threads_connected / $overloaded_servers; usleep($this->DBAvgStatusPoll * $avg_threads); $total_elapsed += $this->DBAvgStatusPoll * $avg_threads; } while ( $total_elapsed < $this->DBClusterTimeout ); if ( $i !== false ) { // slave connection successful if ( $this->slaveIndex <= 0 && $this->serverLoads[$i] > 0 && $i !== false ) { $this->slaveIndex = $i; } } return $i; } /** * Returns random non-lagged server * * @param Array $loads * @return int * @access protected */ protected function getRandomNonLagged($loads) { // unset excessively lagged servers $lags = $this->getLagTimes(); foreach ($lags as $i => $lag) { if ( $i != 0 && isset($this->servers[$i]['DBMaxLag']) ) { if ( $lag === false ) { unset( $loads[$i] ); // server is not replicating } elseif ( $lag > $this->servers[$i]['DBMaxLag'] ) { unset( $loads[$i] ); // server is excessively lagged } } } // find out if all the slaves with non-zero load are lagged if ( !$loads || array_sum($loads) == 0 ) { return false; } // return a random representative of the remainder return $this->pickRandom($loads); } /** * Select an element from an array of non-normalised probabilities * * @param Array $weights * @return int * @access protected */ protected function pickRandom($weights) { if ( !is_array($weights) || !$weights ) { return false; } $sum = array_sum($weights); if ( $sum == 0 ) { return false; } $max = mt_getrandmax(); $rand = mt_rand(0, $max) / $max * $sum; $index = $sum = 0; foreach ($weights as $index => $weight) { $sum += $weight; if ( $sum >= $rand ) { break; } } return $index; } /** * Get lag time for each server * Results are cached for a short time in memcached, and indefinitely in the process cache * * @return Array * @access protected */ protected function getLagTimes() { if ( $this->serverLagTimes ) { return $this->serverLagTimes; } $expiry = 5; $request_rate = 10; $cache_key = 'lag_times:' . $this->servers[0]['DBHost']; $times = $this->Application->getCache($cache_key); if ( $times ) { // randomly recache with probability rising over $expiry $elapsed = adodb_mktime() - $times['timestamp']; $chance = max(0, ($expiry - $elapsed) * $request_rate); if ( mt_rand(0, $chance) != 0 ) { unset( $times['timestamp'] ); $this->serverLagTimes = $times; return $times; } } // cache key missing or expired $times = Array(); foreach ($this->servers as $index => $server) { if ($index == 0) { $times[$index] = 0; // master } else { $conn =& $this->openConnection($index); if ($conn !== false) { $times[$index] = $conn->getSlaveLag(); } } } // add a timestamp key so we know when it was cached $times['timestamp'] = adodb_mktime(); $this->Application->setCache($cache_key, $times, $expiry); // but don't give the timestamp to the caller unset($times['timestamp']); $this->serverLagTimes = $times; return $this->serverLagTimes; } /** * Determines whatever server should not be used, even, when connection was made * * @param kDBConnection $conn * @param int $threshold * @return int * @access protected */ protected function postConnectionBackoff(&$conn, $threshold) { if ( !$threshold ) { return 0; } $status = $conn->getStatus('Thread%'); return $status['Threads_running'] > $threshold ? $status['Threads_connected'] : 0; } /** * Open a connection to the server given by the specified index * Index must be an actual index into the array. * If the server is already open, returns it. * * On error, returns false. * * @param integer $i Server index * @return kDBConnection|false * @access protected */ protected function &openConnection($i) { if ( isset($this->connections[$i]) ) { $conn =& $this->connections[$i]; $this->lastUsedIndex = $i; } else { $server = $this->servers[$i]; $server['serverIndex'] = $i; $conn =& $this->reallyOpenConnection($server, $i == $this->getMasterIndex()); if ( $conn->connectionOpened ) { $this->connections[$i] =& $conn; $this->lastUsedIndex = $i; } else { $conn = false; } } if ( is_object($conn) ) { $conn->noDebuggingState = $this->noDebuggingState; if ( $this->nextQueryCachable ) { $conn->nextQueryCachable = true; $this->nextQueryCachable = false; } } return $conn; } /** * Checks if previous query execution raised an error. * * @return bool * @access public */ public function hasError() { $conn =& $this->openConnection($this->lastUsedIndex); return $conn->hasError(); } /** * Checks if connection to the server given by the specified index is opened. * * @param integer $i Server index * @return bool * @access public */ public function connectionOpened($i = null) { $conn =& $this->openConnection(isset($i) ? $i : $this->getMasterIndex()); return $conn ? $conn->connectionOpened : false; } /** * Really opens a connection. * Returns a database object whether or not the connection was successful. * * @param Array $server * @param bool $is_master * @return kDBConnection */ protected function &reallyOpenConnection($server, $is_master) { $debug_mode = $this->Application->isDebugMode(); $db_class = $debug_mode ? 'kDBConnectionDebug' : 'kDBConnection'; /** @var kDBConnection $db */ $db = $this->Application->makeClass($db_class, Array ($this->dbType, $this->errorHandler, $server['serverIndex'])); $db->debugMode = $debug_mode; $db->Connect($server['DBHost'], $server['DBUser'], $server['DBUserPassword'], $this->servers[0]['DBName'], !$is_master); return $db; } /** * Returns first field of first line of recordset if query ok or false otherwise. * * @param string $sql * @param int $offset * @return string * @access public */ public function GetOne($sql, $offset = 0) { $conn =& $this->chooseConnection($sql); return $conn->GetOne($sql, $offset); } /** * Returns first row of recordset if query ok, false otherwise. * * @param string $sql * @param int $offset * @return Array * @access public */ public function GetRow($sql, $offset = 0) { $conn =& $this->chooseConnection($sql); return $conn->GetRow($sql, $offset); } /** * Returns 1st column of recordset as one-dimensional array or false otherwise. * * Optional parameter $key_field can be used to set field name to be used as resulting array key. * * @param string $sql * @param string $key_field * @return Array * @access public */ public function GetCol($sql, $key_field = null) { $conn =& $this->chooseConnection($sql); return $conn->GetCol($sql, $key_field); } /** * Returns iterator for 1st column of a recordset or false in case of error. * * Optional parameter $key_field can be used to set field name to be used as resulting array key. * * @param string $sql * @param string $key_field * @return bool|kMySQLQueryCol */ public function GetColIterator($sql, $key_field = null) { $conn =& $this->chooseConnection($sql); return $conn->GetColIterator($sql, $key_field); } /** * Queries db with $sql query supplied and returns rows selected if any, false otherwise. * * Optional parameter $key_field allows to set one of the query fields value as key in string array. * * @param string $sql * @param string $key_field * @param boolean|null $no_debug * @return Array * @access public */ public function Query($sql, $key_field = null, $no_debug = null) { $conn =& $this->chooseConnection($sql); return $conn->Query($sql, $key_field, $no_debug); } /** * Returns iterator to a recordset, produced from running $sql query. * * Queries db with $sql query supplied and returns kMySQLQuery iterator or false in case of error. * Optional parameter $key_field allows to set one of the query fields value as key in string array. * * @param string $sql * @param string $key_field * @param boolean|null $no_debug * @param string $iterator_class * @return kMySQLQuery|bool * @access public */ public function GetIterator($sql, $key_field = null, $no_debug = null, $iterator_class = 'kMySQLQuery') { $conn =& $this->chooseConnection($sql); return $conn->GetIterator($sql, $key_field, $no_debug, $iterator_class); } /** * Free memory used to hold recordset handle. * * @access public */ public function Destroy() { $conn =& $this->openConnection($this->lastUsedIndex); $conn->Destroy(); } /** * Performs sql query, that will change database content. * * @param string $sql * @return bool * @access public */ public function ChangeQuery($sql) { $conn =& $this->chooseConnection($sql); return $conn->ChangeQuery($sql); } /** * Returns auto increment field value from insert like operation if any, zero otherwise. * * @return int * @access public */ public function getInsertID() { $conn =& $this->openConnection($this->lastUsedIndex); return $conn->getInsertID(); } /** * Returns row count affected by last query. * * @return int * @access public */ public function getAffectedRows() { $conn =& $this->openConnection($this->lastUsedIndex); return $conn->getAffectedRows(); } /** * Returns LIMIT sql clause part for specific db. * * @param int $offset * @param int $rows * @return string * @access public */ public function getLimitClause($offset, $rows) { $conn =& $this->openConnection($this->lastUsedIndex); return $conn->getLimitClause($offset, $rows); } /** * If it's a string, adds quotes and backslashes. Otherwise returns as-is. * * @param mixed $string * @return string * @access public */ public function qstr($string) { $conn =& $this->openConnection($this->lastUsedIndex); return $conn->qstr($string); } /** * Calls "qstr" function for each given array element. * * @param Array $array * @param string $function * @return Array */ public function qstrArray($array, $function = 'qstr') { $conn =& $this->openConnection($this->lastUsedIndex); return $conn->qstrArray($array, $function); } /** * Escapes string. * * @param mixed $string * @return string * @access public */ public function escape($string) { $conn =& $this->openConnection($this->lastUsedIndex); return $conn->escape($string); } /** * Returns last error code occurred. * * @return int * @access public */ public function getErrorCode() { $conn =& $this->openConnection($this->lastUsedIndex); return $conn->getErrorCode(); } /** * Returns last error message. * * @return string * @access public */ public function getErrorMsg() { $conn =& $this->openConnection($this->lastUsedIndex); return $conn->getErrorMsg(); } /** * Performs insert of given data (useful with small number of queries) * or stores it to perform multiple insert later (useful with large number of queries). * * @param Array $fields_hash * @param string $table * @param string $type * @param bool $insert_now * @return bool * @access public */ public function doInsert($fields_hash, $table, $type = 'INSERT', $insert_now = true) { $conn =& $this->openConnection( $this->getMasterIndex() ); return $conn->doInsert($fields_hash, $table, $type, $insert_now); } /** * Update given field values to given record using $key_clause. * * @param Array $fields_hash * @param string $table * @param string $key_clause * @return bool * @access public */ public function doUpdate($fields_hash, $table, $key_clause) { $conn =& $this->openConnection( $this->getMasterIndex() ); return $conn->doUpdate($fields_hash, $table, $key_clause); } /** * Allows to detect table's presence in database. * * @param string $table_name * @param bool $force * @return bool * @access public */ public function TableFound($table_name, $force = false) { $conn =& $this->openConnection($this->lastUsedIndex); return $conn->TableFound($table_name, $force); } /** * Returns query processing statistics. * * @return Array * @access public */ public function getQueryStatistics() { $conn =& $this->openConnection($this->lastUsedIndex); return $conn->getQueryStatistics(); } /** * Get status information from SHOW STATUS in an associative array. * * @param string $which * @return Array * @access public */ public function getStatus($which = '%') { $conn =& $this->openConnection($this->lastUsedIndex); return $conn->getStatus($which); } /** * When undefined method is called, then send it directly to last used slave server connection * * @param string $name * @param Array $arguments * @return mixed * @access public */ public function __call($name, $arguments) { $conn =& $this->openConnection($this->lastUsedIndex); return call_user_func_array( Array (&$conn, $name), $arguments ); } /** * Returns appropriate connection based on sql * * @param string $sql * @return kDBConnection * @access protected */ protected function &chooseConnection($sql) { if ( $this->nextQueryFromMaster ) { $this->nextQueryFromMaster = false; $index = $this->getMasterIndex(); } else { $sid = isset($this->Application->Session) ? $this->Application->GetSID() : '9999999999999999999999'; if ( preg_match('/(^[ \t\r\n]*(ALTER|CREATE|DROP|RENAME|DELETE|DO|INSERT|LOAD|REPLACE|TRUNCATE|UPDATE))|ses_' . $sid . '/', $sql) ) { $index = $this->getMasterIndex(); } else { $index = $this->getSlaveIndex(); } } $this->lastUsedIndex = $index; $conn =& $this->openConnection($index); return $conn; } /** * Get slave replication lag. It will only work if the DB user has the PROCESS privilege. * * @return int * @access public */ public function getSlaveLag() { $conn =& $this->openConnection($this->lastUsedIndex); return $conn->getSlaveLag(); } }