ndb_cluster_connection.cpp

Go to the documentation of this file.
00001 /* Copyright (C) 2003 MySQL AB
00002 
00003    This program is free software; you can redistribute it and/or modify
00004    it under the terms of the GNU General Public License as published by
00005    the Free Software Foundation; either version 2 of the License, or
00006    (at your option) any later version.
00007 
00008    This program is distributed in the hope that it will be useful,
00009    but WITHOUT ANY WARRANTY; without even the implied warranty of
00010    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00011    GNU General Public License for more details.
00012 
00013    You should have received a copy of the GNU General Public License
00014    along with this program; if not, write to the Free Software
00015    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
00016 
00017 #include <ndb_global.h>
00018 #include <my_pthread.h>
00019 #include <my_sys.h>
00020 
00021 #include "ndb_cluster_connection_impl.hpp"
00022 #include <mgmapi_configuration.hpp>
00023 #include <mgmapi_config_parameters.h>
00024 #include <TransporterFacade.hpp>
00025 #include <NdbOut.hpp>
00026 #include <NdbSleep.h>
00027 #include <NdbThread.h>
00028 #include <ndb_limits.h>
00029 #include <ConfigRetriever.hpp>
00030 #include <ndb_version.h>
00031 #include <mgmapi_debug.h>
00032 #include <mgmapi_internal.h>
00033 #include <md5_hash.hpp>
00034 
00035 #include <EventLogger.hpp>
00036 EventLogger g_eventLogger;
00037 
00038 static int g_run_connect_thread= 0;
00039 
00040 #include <NdbMutex.h>
00041 NdbMutex *ndb_global_event_buffer_mutex= NULL;
00042 #ifdef VM_TRACE
00043 NdbMutex *ndb_print_state_mutex= NULL;
00044 #endif
00045 
00046 /*
00047  * Ndb_cluster_connection
00048  */
00049 
00050 Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string)
00051   : m_impl(* new Ndb_cluster_connection_impl(connect_string))
00052 {
00053 }
00054 
00055 Ndb_cluster_connection::Ndb_cluster_connection
00056 (Ndb_cluster_connection_impl& impl) : m_impl(impl)
00057 {
00058 }
00059 
00060 Ndb_cluster_connection::~Ndb_cluster_connection()
00061 {
00062   Ndb_cluster_connection_impl *tmp = &m_impl;
00063   if (this != tmp)
00064     delete tmp;
00065 }
00066 
00067 int Ndb_cluster_connection::get_connected_port() const
00068 {
00069   if (m_impl.m_config_retriever)
00070     return m_impl.m_config_retriever->get_mgmd_port();
00071   return -1;
00072 }
00073 
00074 const char *Ndb_cluster_connection::get_connected_host() const
00075 {
00076   if (m_impl.m_config_retriever)
00077     return m_impl.m_config_retriever->get_mgmd_host();
00078   return 0;
00079 }
00080 
00081 const char *Ndb_cluster_connection::get_connectstring(char *buf,
00082                                                       int buf_sz) const
00083 {
00084   if (m_impl.m_config_retriever)
00085     return m_impl.m_config_retriever->get_connectstring(buf,buf_sz);
00086   return 0;
00087 }
00088 
00089 extern "C" pthread_handler_decl(run_ndb_cluster_connection_connect_thread, me)
00090 {
00091   g_run_connect_thread= 1;
00092   ((Ndb_cluster_connection_impl*) me)->connect_thread();
00093   return me;
00094 }
00095 
00096 int Ndb_cluster_connection::start_connect_thread(int (*connect_callback)(void))
00097 {
00098   int r;
00099   DBUG_ENTER("Ndb_cluster_connection::start_connect_thread");
00100   m_impl.m_connect_callback= connect_callback;
00101   if ((r = connect(0,0,0)) == 1)
00102   {
00103     DBUG_PRINT("info",("starting thread"));
00104     m_impl.m_connect_thread= 
00105       NdbThread_Create(run_ndb_cluster_connection_connect_thread,
00106                        (void**)&m_impl, 32768, "ndb_cluster_connection",
00107                        NDB_THREAD_PRIO_LOW);
00108   }
00109   else if (r < 0)
00110   {
00111     DBUG_RETURN(-1);
00112   }
00113   else if (m_impl.m_connect_callback)
00114   { 
00115     (*m_impl.m_connect_callback)();
00116   }
00117   DBUG_RETURN(0);
00118 }
00119 
00120 void Ndb_cluster_connection::set_optimized_node_selection(int val)
00121 {
00122   m_impl.m_optimized_node_selection= val;
00123 }
00124 
00125 void
00126 Ndb_cluster_connection_impl::init_get_next_node
00127 (Ndb_cluster_connection_node_iter &iter)
00128 {
00129   if (iter.scan_state != (Uint8)~0)
00130     iter.cur_pos= iter.scan_state;
00131   if (iter.cur_pos >= no_db_nodes())
00132     iter.cur_pos= 0;
00133   iter.init_pos= iter.cur_pos;
00134   iter.scan_state= 0;
00135   //  fprintf(stderr,"[init %d]",iter.init_pos);
00136   return;
00137 }
00138 
00139 Uint32
00140 Ndb_cluster_connection_impl::get_next_node(Ndb_cluster_connection_node_iter &iter)
00141 {
00142   Uint32 cur_pos= iter.cur_pos;
00143   if (cur_pos >= no_db_nodes())
00144     return 0;
00145 
00146   Ndb_cluster_connection_impl::Node *nodes= m_impl.m_all_nodes.getBase();
00147   Ndb_cluster_connection_impl::Node &node=  nodes[cur_pos];
00148 
00149   if (iter.scan_state != (Uint8)~0)
00150   {
00151     assert(iter.scan_state < no_db_nodes());
00152     if (nodes[iter.scan_state].group == node.group)
00153       iter.scan_state= ~0;
00154     else
00155       return nodes[iter.scan_state++].id;
00156   }
00157 
00158   //  fprintf(stderr,"[%d]",node.id);
00159 
00160   cur_pos++;
00161   Uint32 init_pos= iter.init_pos;
00162   if (cur_pos == node.next_group)
00163   {
00164     cur_pos= nodes[init_pos].this_group;
00165   }
00166 
00167   //  fprintf(stderr,"[cur_pos %d]",cur_pos);
00168   if (cur_pos != init_pos)
00169     iter.cur_pos= cur_pos;
00170   else
00171   {
00172     iter.cur_pos= node.next_group;
00173     iter.init_pos= node.next_group;
00174   }
00175   return node.id;
00176 }
00177 
00178 unsigned
00179 Ndb_cluster_connection::no_db_nodes()
00180 {
00181   return m_impl.m_all_nodes.size();
00182 }
00183 
00184 unsigned
00185 Ndb_cluster_connection::node_id()
00186 {
00187   return m_impl.m_transporter_facade->ownId();
00188 }
00189 
00190 
00191 int
00192 Ndb_cluster_connection::wait_until_ready(int timeout,
00193                                          int timeout_after_first_alive)
00194 {
00195   DBUG_ENTER("Ndb_cluster_connection::wait_until_ready");
00196   TransporterFacade *tp = TransporterFacade::instance();
00197   if (tp == 0)
00198   {
00199     DBUG_RETURN(-1);
00200   }
00201   if (tp->ownId() == 0)
00202   {
00203     DBUG_RETURN(-1);
00204   }
00205   int secondsCounter = 0;
00206   int milliCounter = 0;
00207   int noChecksSinceFirstAliveFound = 0;
00208   do {
00209     unsigned int foundAliveNode = 0;
00210     tp->lock_mutex();
00211     for(unsigned i= 0; i < no_db_nodes(); i++)
00212     {
00213       //************************************************
00214       // If any node is answering, ndb is answering
00215       //************************************************
00216       if (tp->get_node_alive(m_impl.m_all_nodes[i].id) != 0) {
00217         foundAliveNode++;
00218       }
00219     }
00220     tp->unlock_mutex();
00221 
00222     if (foundAliveNode == no_db_nodes())
00223     {
00224       DBUG_RETURN(0);
00225     }
00226     else if (foundAliveNode > 0)
00227     {
00228       noChecksSinceFirstAliveFound++;
00229       // 100 ms delay -> 10*
00230       if (noChecksSinceFirstAliveFound > 10*timeout_after_first_alive)
00231         DBUG_RETURN(1);
00232     }
00233     else if (secondsCounter >= timeout)
00234     { // no alive nodes and timed out
00235       DBUG_RETURN(-1);
00236     }
00237     NdbSleep_MilliSleep(100);
00238     milliCounter += 100;
00239     if (milliCounter >= 1000) {
00240       secondsCounter++;
00241       milliCounter = 0;
00242     }//if
00243   } while (1);
00244 }
00245 
00246 
00247 
00248 /*
00249  * Ndb_cluster_connection_impl
00250  */
00251 
00252 Ndb_cluster_connection_impl::Ndb_cluster_connection_impl(const char *
00253                                                          connect_string)
00254   : Ndb_cluster_connection(*this),
00255     m_optimized_node_selection(1)
00256 {
00257   DBUG_ENTER("Ndb_cluster_connection");
00258   DBUG_PRINT("enter",("Ndb_cluster_connection this=0x%x", this));
00259 
00260   g_eventLogger.createConsoleHandler();
00261   g_eventLogger.setCategory("NdbApi");
00262   g_eventLogger.enable(Logger::LL_ON, Logger::LL_ERROR);
00263 
00264   m_connect_thread= 0;
00265   m_connect_callback= 0;
00266 
00267   if (ndb_global_event_buffer_mutex == NULL)
00268     ndb_global_event_buffer_mutex= NdbMutex_Create();
00269 
00270 #ifdef VM_TRACE
00271   if (ndb_print_state_mutex == NULL)
00272     ndb_print_state_mutex= NdbMutex_Create();
00273 #endif
00274   m_config_retriever=
00275     new ConfigRetriever(connect_string, NDB_VERSION, NODE_TYPE_API);
00276   if (m_config_retriever->hasError())
00277   {
00278     printf("Could not connect initialize handle to management server: %s",
00279            m_config_retriever->getErrorString());
00280     delete m_config_retriever;
00281     m_config_retriever= 0;
00282   }
00283 
00284   m_transporter_facade=
00285     TransporterFacade::theFacadeInstance= 
00286     new TransporterFacade();
00287   
00288   DBUG_VOID_RETURN;
00289 }
00290 
00291 Ndb_cluster_connection_impl::~Ndb_cluster_connection_impl()
00292 {
00293   DBUG_ENTER("~Ndb_cluster_connection");
00294   TransporterFacade::stop_instance();
00295   if (m_connect_thread)
00296   {
00297     void *status;
00298     g_run_connect_thread= 0;
00299     NdbThread_WaitFor(m_connect_thread, &status);
00300     NdbThread_Destroy(&m_connect_thread);
00301     m_connect_thread= 0;
00302   }
00303   if (m_transporter_facade != 0)
00304   {
00305     delete m_transporter_facade;
00306     if (m_transporter_facade != TransporterFacade::theFacadeInstance)
00307       abort();
00308     TransporterFacade::theFacadeInstance= 0;
00309   }
00310   if (m_config_retriever)
00311   {
00312     delete m_config_retriever;
00313     m_config_retriever= NULL;
00314   }
00315   if (ndb_global_event_buffer_mutex != NULL)
00316   {
00317     NdbMutex_Destroy(ndb_global_event_buffer_mutex);
00318     ndb_global_event_buffer_mutex= NULL;
00319   }
00320 #ifdef VM_TRACE
00321   if (ndb_print_state_mutex != NULL)
00322   {
00323     NdbMutex_Destroy(ndb_print_state_mutex);
00324     ndb_print_state_mutex= NULL;
00325   }
00326 #endif
00327   DBUG_VOID_RETURN;
00328 }
00329 
00330 void
00331 Ndb_cluster_connection_impl::init_nodes_vector(Uint32 nodeid,
00332                                                const ndb_mgm_configuration 
00333                                                &config)
00334 {
00335   DBUG_ENTER("Ndb_cluster_connection_impl::init_nodes_vector");
00336   ndb_mgm_configuration_iterator iter(config, CFG_SECTION_CONNECTION);
00337   
00338   for(iter.first(); iter.valid(); iter.next())
00339   {
00340     Uint32 nodeid1, nodeid2, remoteNodeId, group= 5;
00341     const char * remoteHostName= 0, * localHostName= 0;
00342     if(iter.get(CFG_CONNECTION_NODE_1, &nodeid1)) continue;
00343     if(iter.get(CFG_CONNECTION_NODE_2, &nodeid2)) continue;
00344 
00345     if(nodeid1 != nodeid && nodeid2 != nodeid) continue;
00346     remoteNodeId = (nodeid == nodeid1 ? nodeid2 : nodeid1);
00347 
00348     iter.get(CFG_CONNECTION_GROUP, &group);
00349 
00350     {
00351       const char * host1= 0, * host2= 0;
00352       iter.get(CFG_CONNECTION_HOSTNAME_1, &host1);
00353       iter.get(CFG_CONNECTION_HOSTNAME_2, &host2);
00354       localHostName  = (nodeid == nodeid1 ? host1 : host2);
00355       remoteHostName = (nodeid == nodeid1 ? host2 : host1);
00356     }
00357 
00358     Uint32 type = ~0;
00359     if(iter.get(CFG_TYPE_OF_SECTION, &type)) continue;
00360 
00361     switch(type){
00362     case CONNECTION_TYPE_SHM:{
00363       break;
00364     }
00365     case CONNECTION_TYPE_SCI:{
00366       break;
00367     }
00368     case CONNECTION_TYPE_TCP:{
00369       // connecting through localhost
00370       // check if config_hostname is local
00371       if (SocketServer::tryBind(0,remoteHostName))
00372         group--; // upgrade group value
00373       break;
00374     }
00375     case CONNECTION_TYPE_OSE:{
00376       break;
00377     }
00378     }
00379     m_impl.m_all_nodes.push_back(Node(group,remoteNodeId));
00380     DBUG_PRINT("info",("saved %d %d", group,remoteNodeId));
00381     for (int i= m_impl.m_all_nodes.size()-2;
00382          i >= 0 && m_impl.m_all_nodes[i].group > m_impl.m_all_nodes[i+1].group;
00383          i--)
00384     {
00385       Node tmp= m_impl.m_all_nodes[i];
00386       m_impl.m_all_nodes[i]= m_impl.m_all_nodes[i+1];
00387       m_impl.m_all_nodes[i+1]= tmp;
00388     }
00389   }
00390 
00391   int i;
00392   Uint32 cur_group, i_group= 0;
00393   cur_group= ~0;
00394   for (i= (int)m_impl.m_all_nodes.size()-1; i >= 0; i--)
00395   {
00396     if (m_impl.m_all_nodes[i].group != cur_group)
00397     {
00398       cur_group= m_impl.m_all_nodes[i].group;
00399       i_group= i+1;
00400     }
00401     m_impl.m_all_nodes[i].next_group= i_group;
00402   }
00403   cur_group= ~0;
00404   for (i= 0; i < (int)m_impl.m_all_nodes.size(); i++)
00405   {
00406     if (m_impl.m_all_nodes[i].group != cur_group)
00407     {
00408       cur_group= m_impl.m_all_nodes[i].group;
00409       i_group= i;
00410     }
00411     m_impl.m_all_nodes[i].this_group= i_group;
00412   }
00413 #if 0
00414   for (i= 0; i < (int)m_impl.m_all_nodes.size(); i++)
00415   {
00416     fprintf(stderr, "[%d] %d %d %d %d\n",
00417            i,
00418            m_impl.m_all_nodes[i].id,
00419            m_impl.m_all_nodes[i].group,
00420            m_impl.m_all_nodes[i].this_group,
00421            m_impl.m_all_nodes[i].next_group);
00422   }
00423 
00424   do_test();
00425 #endif
00426   DBUG_VOID_RETURN;
00427 }
00428 
00429 void
00430 Ndb_cluster_connection_impl::do_test()
00431 {
00432   Ndb_cluster_connection_node_iter iter;
00433   int n= no_db_nodes()+5;
00434   Uint32 *nodes= new Uint32[n+1];
00435 
00436   for (int g= 0; g < n; g++)
00437   {
00438     for (int h= 0; h < n; h++)
00439     {
00440       Uint32 id;
00441       Ndb_cluster_connection_node_iter iter2;
00442       {
00443         for (int j= 0; j < g; j++)
00444         {
00445           nodes[j]= get_next_node(iter2);
00446         }
00447       }
00448 
00449       for (int i= 0; i < n; i++)
00450       {
00451         init_get_next_node(iter);
00452         fprintf(stderr, "%d dead:(", g);
00453         id= 0;
00454         while (id == 0)
00455         {
00456           if ((id= get_next_node(iter)) == 0)
00457             break;
00458           for (int j= 0; j < g; j++)
00459           {
00460             if (nodes[j] == id)
00461             {
00462               fprintf(stderr, " %d", id);
00463               id= 0;
00464               break;
00465             }
00466           }
00467         }
00468         fprintf(stderr, ")");
00469         if (id == 0)
00470         {
00471           break;
00472         }
00473         fprintf(stderr, " %d\n", id);
00474       }
00475       fprintf(stderr, "\n");
00476     }
00477   }
00478   delete [] nodes;
00479 }
00480 
00481 int Ndb_cluster_connection::connect(int no_retries, int retry_delay_in_seconds,
00482                                     int verbose)
00483 {
00484   struct ndb_mgm_reply mgm_reply;
00485 
00486   DBUG_ENTER("Ndb_cluster_connection::connect");
00487   const char* error = 0;
00488   do {
00489     if (m_impl.m_config_retriever == 0)
00490       DBUG_RETURN(-1);
00491     if (m_impl.m_config_retriever->do_connect(no_retries,
00492                                               retry_delay_in_seconds,
00493                                               verbose))
00494       DBUG_RETURN(1); // mgmt server not up yet
00495 
00496     Uint32 nodeId = m_impl.m_config_retriever->allocNodeId(4/*retries*/,
00497                                                            3/*delay*/);
00498     if(nodeId == 0)
00499       break;
00500     ndb_mgm_configuration * props = m_impl.m_config_retriever->getConfig();
00501     if(props == 0)
00502       break;
00503 
00504     m_impl.m_transporter_facade->start_instance(nodeId, props);
00505     m_impl.init_nodes_vector(nodeId, *props);
00506 
00507     for(unsigned i=0;
00508         i<m_impl.m_transporter_facade->get_registry()->m_transporter_interface.size();
00509         i++)
00510       ndb_mgm_set_connection_int_parameter(m_impl.m_config_retriever->get_mgmHandle(),
00511                                            nodeId,
00512                                            m_impl.m_transporter_facade->get_registry()
00513                                              ->m_transporter_interface[i]
00514                                              .m_remote_nodeId,
00515                                            CFG_CONNECTION_SERVER_PORT,
00516                                            m_impl.m_transporter_facade->get_registry()
00517                                              ->m_transporter_interface[i]
00518                                              .m_s_service_port,
00519                                            &mgm_reply);
00520 
00521     ndb_mgm_destroy_configuration(props);
00522     m_impl.m_transporter_facade->connected();
00523     DBUG_RETURN(0);
00524   } while(0);
00525   
00526   ndbout << "Configuration error: ";
00527   const char* erString = m_impl.m_config_retriever->getErrorString();
00528   if (erString == 0) {
00529     erString = "No error specified!";
00530   }
00531   ndbout << erString << endl;
00532   DBUG_RETURN(-1);
00533 }
00534 
00535 void Ndb_cluster_connection_impl::connect_thread()
00536 {
00537   DBUG_ENTER("Ndb_cluster_connection_impl::connect_thread");
00538   int r;
00539   do {
00540     NdbSleep_SecSleep(1);
00541     if ((r = connect(0,0,0)) == 0)
00542       break;
00543     if (r == -1) {
00544       printf("Ndb_cluster_connection::connect_thread error\n");
00545       DBUG_ASSERT(false);
00546       g_run_connect_thread= 0;
00547     } else {
00548       // Wait before making a new connect attempt
00549       NdbSleep_SecSleep(1);
00550     }
00551   } while (g_run_connect_thread);
00552   if (m_connect_callback)
00553     (*m_connect_callback)();
00554   DBUG_VOID_RETURN;
00555 }
00556 
00557 template class Vector<Ndb_cluster_connection_impl::Node>;
00558 

Generated on Wed Jul 20 21:05:44 2005 for MySQL 5.0.9 Beta by  doxygen 1.4.3