00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #include <ndb_global.h>
00018 #include <my_pthread.h>
00019 #include <my_sys.h>
00020
00021 #include "ndb_cluster_connection_impl.hpp"
00022 #include <mgmapi_configuration.hpp>
00023 #include <mgmapi_config_parameters.h>
00024 #include <TransporterFacade.hpp>
00025 #include <NdbOut.hpp>
00026 #include <NdbSleep.h>
00027 #include <NdbThread.h>
00028 #include <ndb_limits.h>
00029 #include <ConfigRetriever.hpp>
00030 #include <ndb_version.h>
00031 #include <mgmapi_debug.h>
00032 #include <mgmapi_internal.h>
00033 #include <md5_hash.hpp>
00034
00035 #include <EventLogger.hpp>
00036 EventLogger g_eventLogger;
00037
00038 static int g_run_connect_thread= 0;
00039
00040 #include <NdbMutex.h>
00041 NdbMutex *ndb_global_event_buffer_mutex= NULL;
00042 #ifdef VM_TRACE
00043 NdbMutex *ndb_print_state_mutex= NULL;
00044 #endif
00045
00046
00047
00048
00049
00050 Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string)
00051 : m_impl(* new Ndb_cluster_connection_impl(connect_string))
00052 {
00053 }
00054
00055 Ndb_cluster_connection::Ndb_cluster_connection
00056 (Ndb_cluster_connection_impl& impl) : m_impl(impl)
00057 {
00058 }
00059
00060 Ndb_cluster_connection::~Ndb_cluster_connection()
00061 {
00062 Ndb_cluster_connection_impl *tmp = &m_impl;
00063 if (this != tmp)
00064 delete tmp;
00065 }
00066
00067 int Ndb_cluster_connection::get_connected_port() const
00068 {
00069 if (m_impl.m_config_retriever)
00070 return m_impl.m_config_retriever->get_mgmd_port();
00071 return -1;
00072 }
00073
00074 const char *Ndb_cluster_connection::get_connected_host() const
00075 {
00076 if (m_impl.m_config_retriever)
00077 return m_impl.m_config_retriever->get_mgmd_host();
00078 return 0;
00079 }
00080
00081 const char *Ndb_cluster_connection::get_connectstring(char *buf,
00082 int buf_sz) const
00083 {
00084 if (m_impl.m_config_retriever)
00085 return m_impl.m_config_retriever->get_connectstring(buf,buf_sz);
00086 return 0;
00087 }
00088
00089 extern "C" pthread_handler_decl(run_ndb_cluster_connection_connect_thread, me)
00090 {
00091 g_run_connect_thread= 1;
00092 ((Ndb_cluster_connection_impl*) me)->connect_thread();
00093 return me;
00094 }
00095
00096 int Ndb_cluster_connection::start_connect_thread(int (*connect_callback)(void))
00097 {
00098 int r;
00099 DBUG_ENTER("Ndb_cluster_connection::start_connect_thread");
00100 m_impl.m_connect_callback= connect_callback;
00101 if ((r = connect(0,0,0)) == 1)
00102 {
00103 DBUG_PRINT("info",("starting thread"));
00104 m_impl.m_connect_thread=
00105 NdbThread_Create(run_ndb_cluster_connection_connect_thread,
00106 (void**)&m_impl, 32768, "ndb_cluster_connection",
00107 NDB_THREAD_PRIO_LOW);
00108 }
00109 else if (r < 0)
00110 {
00111 DBUG_RETURN(-1);
00112 }
00113 else if (m_impl.m_connect_callback)
00114 {
00115 (*m_impl.m_connect_callback)();
00116 }
00117 DBUG_RETURN(0);
00118 }
00119
00120 void Ndb_cluster_connection::set_optimized_node_selection(int val)
00121 {
00122 m_impl.m_optimized_node_selection= val;
00123 }
00124
00125 void
00126 Ndb_cluster_connection_impl::init_get_next_node
00127 (Ndb_cluster_connection_node_iter &iter)
00128 {
00129 if (iter.scan_state != (Uint8)~0)
00130 iter.cur_pos= iter.scan_state;
00131 if (iter.cur_pos >= no_db_nodes())
00132 iter.cur_pos= 0;
00133 iter.init_pos= iter.cur_pos;
00134 iter.scan_state= 0;
00135
00136 return;
00137 }
00138
00139 Uint32
00140 Ndb_cluster_connection_impl::get_next_node(Ndb_cluster_connection_node_iter &iter)
00141 {
00142 Uint32 cur_pos= iter.cur_pos;
00143 if (cur_pos >= no_db_nodes())
00144 return 0;
00145
00146 Ndb_cluster_connection_impl::Node *nodes= m_impl.m_all_nodes.getBase();
00147 Ndb_cluster_connection_impl::Node &node= nodes[cur_pos];
00148
00149 if (iter.scan_state != (Uint8)~0)
00150 {
00151 assert(iter.scan_state < no_db_nodes());
00152 if (nodes[iter.scan_state].group == node.group)
00153 iter.scan_state= ~0;
00154 else
00155 return nodes[iter.scan_state++].id;
00156 }
00157
00158
00159
00160 cur_pos++;
00161 Uint32 init_pos= iter.init_pos;
00162 if (cur_pos == node.next_group)
00163 {
00164 cur_pos= nodes[init_pos].this_group;
00165 }
00166
00167
00168 if (cur_pos != init_pos)
00169 iter.cur_pos= cur_pos;
00170 else
00171 {
00172 iter.cur_pos= node.next_group;
00173 iter.init_pos= node.next_group;
00174 }
00175 return node.id;
00176 }
00177
00178 unsigned
00179 Ndb_cluster_connection::no_db_nodes()
00180 {
00181 return m_impl.m_all_nodes.size();
00182 }
00183
00184 unsigned
00185 Ndb_cluster_connection::node_id()
00186 {
00187 return m_impl.m_transporter_facade->ownId();
00188 }
00189
00190
00191 int
00192 Ndb_cluster_connection::wait_until_ready(int timeout,
00193 int timeout_after_first_alive)
00194 {
00195 DBUG_ENTER("Ndb_cluster_connection::wait_until_ready");
00196 TransporterFacade *tp = TransporterFacade::instance();
00197 if (tp == 0)
00198 {
00199 DBUG_RETURN(-1);
00200 }
00201 if (tp->ownId() == 0)
00202 {
00203 DBUG_RETURN(-1);
00204 }
00205 int secondsCounter = 0;
00206 int milliCounter = 0;
00207 int noChecksSinceFirstAliveFound = 0;
00208 do {
00209 unsigned int foundAliveNode = 0;
00210 tp->lock_mutex();
00211 for(unsigned i= 0; i < no_db_nodes(); i++)
00212 {
00213
00214
00215
00216 if (tp->get_node_alive(m_impl.m_all_nodes[i].id) != 0) {
00217 foundAliveNode++;
00218 }
00219 }
00220 tp->unlock_mutex();
00221
00222 if (foundAliveNode == no_db_nodes())
00223 {
00224 DBUG_RETURN(0);
00225 }
00226 else if (foundAliveNode > 0)
00227 {
00228 noChecksSinceFirstAliveFound++;
00229
00230 if (noChecksSinceFirstAliveFound > 10*timeout_after_first_alive)
00231 DBUG_RETURN(1);
00232 }
00233 else if (secondsCounter >= timeout)
00234 {
00235 DBUG_RETURN(-1);
00236 }
00237 NdbSleep_MilliSleep(100);
00238 milliCounter += 100;
00239 if (milliCounter >= 1000) {
00240 secondsCounter++;
00241 milliCounter = 0;
00242 }
00243 } while (1);
00244 }
00245
00246
00247
00248
00249
00250
00251
00252 Ndb_cluster_connection_impl::Ndb_cluster_connection_impl(const char *
00253 connect_string)
00254 : Ndb_cluster_connection(*this),
00255 m_optimized_node_selection(1)
00256 {
00257 DBUG_ENTER("Ndb_cluster_connection");
00258 DBUG_PRINT("enter",("Ndb_cluster_connection this=0x%x", this));
00259
00260 g_eventLogger.createConsoleHandler();
00261 g_eventLogger.setCategory("NdbApi");
00262 g_eventLogger.enable(Logger::LL_ON, Logger::LL_ERROR);
00263
00264 m_connect_thread= 0;
00265 m_connect_callback= 0;
00266
00267 if (ndb_global_event_buffer_mutex == NULL)
00268 ndb_global_event_buffer_mutex= NdbMutex_Create();
00269
00270 #ifdef VM_TRACE
00271 if (ndb_print_state_mutex == NULL)
00272 ndb_print_state_mutex= NdbMutex_Create();
00273 #endif
00274 m_config_retriever=
00275 new ConfigRetriever(connect_string, NDB_VERSION, NODE_TYPE_API);
00276 if (m_config_retriever->hasError())
00277 {
00278 printf("Could not connect initialize handle to management server: %s",
00279 m_config_retriever->getErrorString());
00280 delete m_config_retriever;
00281 m_config_retriever= 0;
00282 }
00283
00284 m_transporter_facade=
00285 TransporterFacade::theFacadeInstance=
00286 new TransporterFacade();
00287
00288 DBUG_VOID_RETURN;
00289 }
00290
00291 Ndb_cluster_connection_impl::~Ndb_cluster_connection_impl()
00292 {
00293 DBUG_ENTER("~Ndb_cluster_connection");
00294 TransporterFacade::stop_instance();
00295 if (m_connect_thread)
00296 {
00297 void *status;
00298 g_run_connect_thread= 0;
00299 NdbThread_WaitFor(m_connect_thread, &status);
00300 NdbThread_Destroy(&m_connect_thread);
00301 m_connect_thread= 0;
00302 }
00303 if (m_transporter_facade != 0)
00304 {
00305 delete m_transporter_facade;
00306 if (m_transporter_facade != TransporterFacade::theFacadeInstance)
00307 abort();
00308 TransporterFacade::theFacadeInstance= 0;
00309 }
00310 if (m_config_retriever)
00311 {
00312 delete m_config_retriever;
00313 m_config_retriever= NULL;
00314 }
00315 if (ndb_global_event_buffer_mutex != NULL)
00316 {
00317 NdbMutex_Destroy(ndb_global_event_buffer_mutex);
00318 ndb_global_event_buffer_mutex= NULL;
00319 }
00320 #ifdef VM_TRACE
00321 if (ndb_print_state_mutex != NULL)
00322 {
00323 NdbMutex_Destroy(ndb_print_state_mutex);
00324 ndb_print_state_mutex= NULL;
00325 }
00326 #endif
00327 DBUG_VOID_RETURN;
00328 }
00329
00330 void
00331 Ndb_cluster_connection_impl::init_nodes_vector(Uint32 nodeid,
00332 const ndb_mgm_configuration
00333 &config)
00334 {
00335 DBUG_ENTER("Ndb_cluster_connection_impl::init_nodes_vector");
00336 ndb_mgm_configuration_iterator iter(config, CFG_SECTION_CONNECTION);
00337
00338 for(iter.first(); iter.valid(); iter.next())
00339 {
00340 Uint32 nodeid1, nodeid2, remoteNodeId, group= 5;
00341 const char * remoteHostName= 0, * localHostName= 0;
00342 if(iter.get(CFG_CONNECTION_NODE_1, &nodeid1)) continue;
00343 if(iter.get(CFG_CONNECTION_NODE_2, &nodeid2)) continue;
00344
00345 if(nodeid1 != nodeid && nodeid2 != nodeid) continue;
00346 remoteNodeId = (nodeid == nodeid1 ? nodeid2 : nodeid1);
00347
00348 iter.get(CFG_CONNECTION_GROUP, &group);
00349
00350 {
00351 const char * host1= 0, * host2= 0;
00352 iter.get(CFG_CONNECTION_HOSTNAME_1, &host1);
00353 iter.get(CFG_CONNECTION_HOSTNAME_2, &host2);
00354 localHostName = (nodeid == nodeid1 ? host1 : host2);
00355 remoteHostName = (nodeid == nodeid1 ? host2 : host1);
00356 }
00357
00358 Uint32 type = ~0;
00359 if(iter.get(CFG_TYPE_OF_SECTION, &type)) continue;
00360
00361 switch(type){
00362 case CONNECTION_TYPE_SHM:{
00363 break;
00364 }
00365 case CONNECTION_TYPE_SCI:{
00366 break;
00367 }
00368 case CONNECTION_TYPE_TCP:{
00369
00370
00371 if (SocketServer::tryBind(0,remoteHostName))
00372 group--;
00373 break;
00374 }
00375 case CONNECTION_TYPE_OSE:{
00376 break;
00377 }
00378 }
00379 m_impl.m_all_nodes.push_back(Node(group,remoteNodeId));
00380 DBUG_PRINT("info",("saved %d %d", group,remoteNodeId));
00381 for (int i= m_impl.m_all_nodes.size()-2;
00382 i >= 0 && m_impl.m_all_nodes[i].group > m_impl.m_all_nodes[i+1].group;
00383 i--)
00384 {
00385 Node tmp= m_impl.m_all_nodes[i];
00386 m_impl.m_all_nodes[i]= m_impl.m_all_nodes[i+1];
00387 m_impl.m_all_nodes[i+1]= tmp;
00388 }
00389 }
00390
00391 int i;
00392 Uint32 cur_group, i_group= 0;
00393 cur_group= ~0;
00394 for (i= (int)m_impl.m_all_nodes.size()-1; i >= 0; i--)
00395 {
00396 if (m_impl.m_all_nodes[i].group != cur_group)
00397 {
00398 cur_group= m_impl.m_all_nodes[i].group;
00399 i_group= i+1;
00400 }
00401 m_impl.m_all_nodes[i].next_group= i_group;
00402 }
00403 cur_group= ~0;
00404 for (i= 0; i < (int)m_impl.m_all_nodes.size(); i++)
00405 {
00406 if (m_impl.m_all_nodes[i].group != cur_group)
00407 {
00408 cur_group= m_impl.m_all_nodes[i].group;
00409 i_group= i;
00410 }
00411 m_impl.m_all_nodes[i].this_group= i_group;
00412 }
00413 #if 0
00414 for (i= 0; i < (int)m_impl.m_all_nodes.size(); i++)
00415 {
00416 fprintf(stderr, "[%d] %d %d %d %d\n",
00417 i,
00418 m_impl.m_all_nodes[i].id,
00419 m_impl.m_all_nodes[i].group,
00420 m_impl.m_all_nodes[i].this_group,
00421 m_impl.m_all_nodes[i].next_group);
00422 }
00423
00424 do_test();
00425 #endif
00426 DBUG_VOID_RETURN;
00427 }
00428
00429 void
00430 Ndb_cluster_connection_impl::do_test()
00431 {
00432 Ndb_cluster_connection_node_iter iter;
00433 int n= no_db_nodes()+5;
00434 Uint32 *nodes= new Uint32[n+1];
00435
00436 for (int g= 0; g < n; g++)
00437 {
00438 for (int h= 0; h < n; h++)
00439 {
00440 Uint32 id;
00441 Ndb_cluster_connection_node_iter iter2;
00442 {
00443 for (int j= 0; j < g; j++)
00444 {
00445 nodes[j]= get_next_node(iter2);
00446 }
00447 }
00448
00449 for (int i= 0; i < n; i++)
00450 {
00451 init_get_next_node(iter);
00452 fprintf(stderr, "%d dead:(", g);
00453 id= 0;
00454 while (id == 0)
00455 {
00456 if ((id= get_next_node(iter)) == 0)
00457 break;
00458 for (int j= 0; j < g; j++)
00459 {
00460 if (nodes[j] == id)
00461 {
00462 fprintf(stderr, " %d", id);
00463 id= 0;
00464 break;
00465 }
00466 }
00467 }
00468 fprintf(stderr, ")");
00469 if (id == 0)
00470 {
00471 break;
00472 }
00473 fprintf(stderr, " %d\n", id);
00474 }
00475 fprintf(stderr, "\n");
00476 }
00477 }
00478 delete [] nodes;
00479 }
00480
00481 int Ndb_cluster_connection::connect(int no_retries, int retry_delay_in_seconds,
00482 int verbose)
00483 {
00484 struct ndb_mgm_reply mgm_reply;
00485
00486 DBUG_ENTER("Ndb_cluster_connection::connect");
00487 const char* error = 0;
00488 do {
00489 if (m_impl.m_config_retriever == 0)
00490 DBUG_RETURN(-1);
00491 if (m_impl.m_config_retriever->do_connect(no_retries,
00492 retry_delay_in_seconds,
00493 verbose))
00494 DBUG_RETURN(1);
00495
00496 Uint32 nodeId = m_impl.m_config_retriever->allocNodeId(4,
00497 3);
00498 if(nodeId == 0)
00499 break;
00500 ndb_mgm_configuration * props = m_impl.m_config_retriever->getConfig();
00501 if(props == 0)
00502 break;
00503
00504 m_impl.m_transporter_facade->start_instance(nodeId, props);
00505 m_impl.init_nodes_vector(nodeId, *props);
00506
00507 for(unsigned i=0;
00508 i<m_impl.m_transporter_facade->get_registry()->m_transporter_interface.size();
00509 i++)
00510 ndb_mgm_set_connection_int_parameter(m_impl.m_config_retriever->get_mgmHandle(),
00511 nodeId,
00512 m_impl.m_transporter_facade->get_registry()
00513 ->m_transporter_interface[i]
00514 .m_remote_nodeId,
00515 CFG_CONNECTION_SERVER_PORT,
00516 m_impl.m_transporter_facade->get_registry()
00517 ->m_transporter_interface[i]
00518 .m_s_service_port,
00519 &mgm_reply);
00520
00521 ndb_mgm_destroy_configuration(props);
00522 m_impl.m_transporter_facade->connected();
00523 DBUG_RETURN(0);
00524 } while(0);
00525
00526 ndbout << "Configuration error: ";
00527 const char* erString = m_impl.m_config_retriever->getErrorString();
00528 if (erString == 0) {
00529 erString = "No error specified!";
00530 }
00531 ndbout << erString << endl;
00532 DBUG_RETURN(-1);
00533 }
00534
00535 void Ndb_cluster_connection_impl::connect_thread()
00536 {
00537 DBUG_ENTER("Ndb_cluster_connection_impl::connect_thread");
00538 int r;
00539 do {
00540 NdbSleep_SecSleep(1);
00541 if ((r = connect(0,0,0)) == 0)
00542 break;
00543 if (r == -1) {
00544 printf("Ndb_cluster_connection::connect_thread error\n");
00545 DBUG_ASSERT(false);
00546 g_run_connect_thread= 0;
00547 } else {
00548
00549 NdbSleep_SecSleep(1);
00550 }
00551 } while (g_run_connect_thread);
00552 if (m_connect_callback)
00553 (*m_connect_callback)();
00554 DBUG_VOID_RETURN;
00555 }
00556
00557 template class Vector<Ndb_cluster_connection_impl::Node>;
00558