Grep.cpp

Go to the documentation of this file.
00001 /* Copyright (C) 2003 MySQL AB
00002 
00003    This program is free software; you can redistribute it and/or modify
00004    it under the terms of the GNU General Public License as published by
00005    the Free Software Foundation; either version 2 of the License, or
00006    (at your option) any later version.
00007 
00008    This program is distributed in the hope that it will be useful,
00009    but WITHOUT ANY WARRANTY; without even the implied warranty of
00010    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00011    GNU General Public License for more details.
00012 
00013    You should have received a copy of the GNU General Public License
00014    along with this program; if not, write to the Free Software
00015    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
00016 
00017 #include "Grep.hpp"
00018 #include <ndb_version.h>
00019 
00020 #include <NdbTCP.h>
00021 #include <Bitmask.hpp>
00022 
00023 #include <signaldata/NodeFailRep.hpp>
00024 #include <signaldata/ReadNodesConf.hpp>
00025 #include <signaldata/CheckNodeGroups.hpp>
00026 #include <signaldata/GrepImpl.hpp>
00027 #include <signaldata/RepImpl.hpp>
00028 #include <signaldata/EventReport.hpp>
00029 #include <signaldata/DictTabInfo.hpp>
00030 #include <signaldata/GetTabInfo.hpp>
00031 #include <signaldata/WaitGCP.hpp>
00032 #include <GrepEvent.hpp>
00033 #include <AttributeHeader.hpp>
00034 
00035 #define CONTINUEB_DELAY 500
00036 #define SSREPBLOCKNO 2  
00037 #define PSREPBLOCKNO 2
00038 
00039 //#define DEBUG_GREP
00040 //#define DEBUG_GREP_SUBSCRIPTION
00041 //#define DEBUG_GREP_TRANSFER
00042 //#define DEBUG_GREP_APPLY
00043 //#define DEBUG_GREP_DELETE
00044 
00045 /**************************************************************************
00046  * ------------------------------------------------------------------------
00047  *  MODULE:    STARTUP of GREP Block, etc
00048  * ------------------------------------------------------------------------
00049  **************************************************************************/
00050 static Uint32 g_TypeOfStart = NodeState::ST_ILLEGAL_TYPE;
00051 void
00052 Grep::getNodeGroupMembers(Signal* signal) {
00053   jam();
00057   CheckNodeGroups * sd = (CheckNodeGroups*)signal->getDataPtrSend();
00058   sd->blockRef = reference();
00059   sd->requestType =
00060     CheckNodeGroups::Direct |
00061     CheckNodeGroups::GetNodeGroupMembers;
00062   sd->nodeId = getOwnNodeId();
00063   EXECUTE_DIRECT(DBDIH, GSN_CHECKNODEGROUPSREQ, signal, 
00064                  CheckNodeGroups::SignalLength);
00065   jamEntry();
00066   
00067   c_nodeGroup = sd->output;
00068   c_noNodesInGroup = 0;
00069   for (int i = 0; i < MAX_NDB_NODES; i++) {
00070     if (sd->mask.get(i)) {
00071       if (i == getOwnNodeId()) c_idInNodeGroup = c_noNodesInGroup;
00072       c_nodesInGroup[c_noNodesInGroup] = i;
00073       c_noNodesInGroup++;
00074     }
00075   }
00076   ndbrequire(c_noNodesInGroup > 0); // at least 1 node in the nodegroup
00077 
00078 #ifdef NODEFAIL_DEBUG
00079   for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
00080     ndbout_c ("Grep: NodeGroup %u, me %u, me in group %u, member[%u] %u",
00081               c_nodeGroup, getOwnNodeId(), c_idInNodeGroup,
00082               i, c_nodesInGroup[i]);
00083   }
00084 #endif
00085 }
00086 
00087 
00088 void
00089 Grep::execSTTOR(Signal* signal) 
00090 {
00091   jamEntry();                            
00092   const Uint32 startphase  = signal->theData[1];
00093   const Uint32 typeOfStart = signal->theData[7];
00094   if (startphase == 3) 
00095   {
00096     jam();
00097     signal->theData[0] = reference();
00098     g_TypeOfStart = typeOfStart;
00099     sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB);
00100     return;
00101   }
00102   if(startphase == 5) {
00103     jam();
00108     if (g_TypeOfStart == NodeState::ST_NODE_RESTART) {
00109       jam();
00110       pspart.m_recoveryMode =  true;
00111       getNodeGroupMembers(signal);
00112       for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
00113         Uint32 ref =numberToRef(GREP, c_nodesInGroup[i]);
00114         if (ref != reference())
00115           sendSignal(ref, GSN_GREP_START_ME, signal,
00116                      1 /*SumaStartMe::SignalLength*/, JBB);
00117       }
00118     } else  pspart.m_recoveryMode =  false;
00119 
00120   }
00121  
00122   if(startphase == 7) {
00123       jam();
00124     if (g_TypeOfStart == NodeState::ST_NODE_RESTART) {
00125       pspart.m_recoveryMode =  false;
00126     }
00127   }
00128   
00129   sendSTTORRY(signal);
00130 }
00131 
00132 
00133 void 
00134 Grep::PSPart::execSTART_ME(Signal* signal)
00135 {
00136   jamEntry();
00137   GrepStartMe *   me =(GrepStartMe*)signal->getDataPtr();
00138   BlockReference ref = me->senderRef;
00139   GrepAddSubReq* const addReq = (GrepAddSubReq *)signal->getDataPtr();  
00140 
00141 
00142   SubscriptionPtr subPtr;
00143   c_subscriptions.first(c_subPtr);
00144   for(; !c_subPtr.isNull(); c_subscriptions.next(c_subPtr)) {
00145     jam();
00146     subPtr.i = c_subPtr.curr.i;
00147     subPtr.p = c_subscriptions.getPtr(subPtr.i);
00148     addReq->subscriptionId   = subPtr.p->m_subscriptionId;
00149     addReq->subscriptionKey  = subPtr.p->m_subscriptionKey;
00150     addReq->subscriberData   = subPtr.p->m_subscriberData;
00151     addReq->subscriptionType = subPtr.p->m_subscriptionType;
00152     addReq->senderRef        = subPtr.p->m_coordinatorRef;
00153     addReq->subscriberRef    =subPtr.p->m_subscriberRef;
00154 
00155     sendSignal(ref, 
00156                GSN_GREP_ADD_SUB_REQ, 
00157                signal, 
00158                GrepAddSubReq::SignalLength,
00159                JBB);
00160   }
00161   
00162   addReq->subscriptionId   = 0;
00163   addReq->subscriptionKey  = 0;
00164   addReq->subscriberData   = 0;
00165   addReq->subscriptionType = 0;
00166   addReq->senderRef        = 0;
00167   addReq->subscriberRef    = 0;
00168 
00169   sendSignal(ref, 
00170              GSN_GREP_ADD_SUB_REQ, 
00171              signal, 
00172              GrepAddSubReq::SignalLength,
00173              JBB);
00174 }
00175 
00176 void 
00177 Grep::PSPart::execGREP_ADD_SUB_REQ(Signal* signal)
00178 {
00179   jamEntry();
00180   GrepAddSubReq * const grepReq = (GrepAddSubReq *)signal->getDataPtr();
00181   const Uint32 subId          = grepReq->subscriptionId;
00182   const Uint32 subKey         = grepReq->subscriptionKey;
00183   const Uint32 subData        = grepReq->subscriberData;
00184   const Uint32 subType        = grepReq->subscriptionType;
00185   const Uint32 coordinatorRef = grepReq->senderRef;
00186 
00190   const Uint32 subRef         = grepReq->subscriberRef;
00191 
00192   if(subId!=0 && subKey!=0) {
00193     jam();
00194     SubscriptionPtr subPtr;
00195     ndbrequire( c_subscriptionPool.seize(subPtr));
00196     subPtr.p->m_coordinatorRef    = coordinatorRef;
00197     subPtr.p->m_subscriptionId    = subId;
00198     subPtr.p->m_subscriptionKey   = subKey;
00199     subPtr.p->m_subscriberRef     = subRef;
00200     subPtr.p->m_subscriberData    = subData;
00201     subPtr.p->m_subscriptionType  = subType;
00202     
00203     c_subscriptions.add(subPtr);
00204   }
00205   else  {
00206     jam();
00207     GrepAddSubConf * conf = (GrepAddSubConf *)grepReq;
00208     conf->noOfSub = 
00209       c_subscriptionPool.getSize()-c_subscriptionPool.getNoOfFree();
00210     sendSignal(signal->getSendersBlockRef(),
00211                GSN_GREP_ADD_SUB_CONF, 
00212                signal, 
00213                GrepAddSubConf::SignalLength, 
00214                JBB);
00215   }
00216 }
00217 
00218 void 
00219 Grep::PSPart::execGREP_ADD_SUB_REF(Signal* signal)
00220 {
00224 }
00225 
00226 void 
00227 Grep::PSPart::execGREP_ADD_SUB_CONF(Signal* signal)
00228 {
00229   jamEntry();
00230   GrepAddSubConf* const conf = (GrepAddSubConf *)signal->getDataPtr();
00231   Uint32 noOfSubscriptions = conf->noOfSub;
00232   Uint32 noOfRestoredSubscriptions = 
00233     c_subscriptionPool.getSize()-c_subscriptionPool.getNoOfFree();
00234   if(noOfSubscriptions!=noOfRestoredSubscriptions) {
00235     jam();
00239     ndbrequire(false);
00240   }
00241 }
00242 
00243 void
00244 Grep::execREAD_NODESCONF(Signal* signal) 
00245 {
00246   jamEntry();
00247   ReadNodesConf * conf = (ReadNodesConf *)signal->getDataPtr();
00248   
00249 #if 0
00250   ndbout_c("Grep: Recd READ_NODESCONF");
00251 #endif
00252   
00253   /******************************
00254    * Check which REP nodes exist
00255    ******************************/
00256   Uint32 i;
00257   for (i = 1; i < MAX_NODES; i++) 
00258   {
00259     jam();
00260 #if 0
00261     ndbout_c("Grep: Found node %d of type %d", i, getNodeInfo(i).getType());
00262 #endif
00263     if (getNodeInfo(i).getType() == NodeInfo::REP)
00264     {
00265       jam();
00269       pscoord.m_repRef = numberToRef(PSREPBLOCKNO, i);
00270       pspart.m_repRef = numberToRef(PSREPBLOCKNO, i);
00271 #if 0
00272       ndbout_c("Grep: REP node %d detected", i);
00273 #endif
00274     }
00275   }
00276   
00277   /*****************************
00278    * Check which DB nodes exist
00279    *****************************/
00280   m_aliveNodes.clear();
00281 
00282   Uint32 count = 0;
00283   for(i = 0; i<MAX_NDB_NODES; i++) 
00284   {
00285     if (NodeBitmask::get(conf->allNodes, i)) 
00286     {
00287       jam();
00288       count++;
00289 
00290       NodePtr node;
00291       ndbrequire(m_nodes.seize(node));
00292       
00293       node.p->nodeId = i;
00294       if (NodeBitmask::get(conf->inactiveNodes, i)) 
00295       {
00296         node.p->alive = 0;
00297       } 
00298       else 
00299       {
00300         node.p->alive = 1;
00301         m_aliveNodes.set(i);
00302       }
00303     }
00304   }
00305   m_masterNodeId = conf->masterNodeId;
00306   ndbrequire(count == conf->noOfNodes);
00307   sendSTTORRY(signal);
00308 }
00309 
00310 void
00311 Grep::sendSTTORRY(Signal* signal) 
00312 {
00313   signal->theData[0] = 0;
00314   signal->theData[3] = 1;
00315   signal->theData[4] = 3;
00316   signal->theData[5] = 5;
00317   signal->theData[6] = 7;
00318   signal->theData[7] = 255; // No more start phases from missra
00319   sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 8, JBB);
00320 }
00321 
00322 void
00323 Grep::execNDB_STTOR(Signal* signal) 
00324 {
00325   jamEntry();                            
00326 }
00327 
00328 void
00329 Grep::execDUMP_STATE_ORD(Signal* signal) 
00330 {
00331   jamEntry();
00332   //Uint32 tCase = signal->theData[0];
00333 
00334 #if 0
00335   if(sscoord.m_repRef == 0) 
00336   {
00337     ndbout << "Grep: Recd DUMP signal but has no connection with REP node"
00338            << endl;
00339     return;
00340   }
00341 #endif 
00342 
00343   /*
00344   switch (tCase) 
00345   {
00346   case 8100: sscoord.grepReq(signal, GrepReq::START_SUBSCR); break;
00347   case 8102: sscoord.grepReq(signal, GrepReq::START_METALOG); break;
00348   case 8104: sscoord.grepReq(signal, GrepReq::START_METASCAN); break;
00349   case 8106: sscoord.grepReq(signal, GrepReq::START_DATALOG); break;
00350   case 8108: sscoord.grepReq(signal, GrepReq::START_DATASCAN); break;
00351   case 8110: sscoord.grepReq(signal, GrepReq::STOP_SUBSCR); break;
00352   case 8500: sscoord.grepReq(signal, GrepReq::REMOVE_BUFFERS); break;
00353   case 8300: sscoord.grepReq(signal, GrepReq::SLOWSTOP); break;
00354   case 8400: sscoord.grepReq(signal, GrepReq::FASTSTOP); break;
00355   case 8600: sscoord.grepReq(signal, GrepReq::CREATE_SUBSCR); break;
00356   case 8700: sscoord.dropTable(signal,(Uint32)signal->theData[1]);break;
00357   default: break;
00358   }
00359   */
00360 }
00361 
00365 void 
00366 Grep::execAPI_FAILREQ(Signal* signal) 
00367 {
00368   jamEntry();
00369   //Uint32          failedApiNode = signal->theData[0];
00370   //BlockReference  retRef = signal->theData[1];
00371   
00377 #if 0
00378   ndbout_c("Grep: API_FAILREQ received for API node %d.", failedApiNode);
00379 #endif
00380   
00386 }
00387 
00388 /**************************************************************************
00389  * ------------------------------------------------------------------------
00390  *  MODULE:    GREP Control
00391  * ------------------------------------------------------------------------
00392  **************************************************************************/
00393 void
00394 Grep::execGREP_REQ(Signal* signal) 
00395 {
00396   jamEntry();
00397   
00398   //GrepReq * req = (GrepReq *)signal->getDataPtr();
00399   
00405   ndbout_c("Warning! REP commands can only be executed at REP SERVER prompt!");
00406 }
00407 
00408 
00409 /**************************************************************************
00410  * ------------------------------------------------------------------------
00411  *  MODULE:    NODE STATE HANDLING
00412  * ------------------------------------------------------------------------
00413  **************************************************************************/
00414 void
00415 Grep::execNODE_FAILREP(Signal* signal) 
00416 {
00417   jamEntry();
00418   NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr();
00419   bool changed = false;
00420 
00421   NodePtr nodePtr;
00422   for(m_nodes.first(nodePtr); nodePtr.i != RNIL; m_nodes.next(nodePtr)) 
00423   {
00424     jam();
00425     if (NodeBitmask::get(rep->theNodes, nodePtr.p->nodeId)) 
00426     {
00427       jam();
00428       
00429       if (nodePtr.p->alive) 
00430       {
00431         jam();
00432         ndbassert(m_aliveNodes.get(nodePtr.p->nodeId));
00433         changed = true;
00434       } 
00435       else 
00436       {
00437         ndbassert(!m_aliveNodes.get(nodePtr.p->nodeId));
00438       }
00439       
00440       nodePtr.p->alive = 0;
00441       m_aliveNodes.clear(nodePtr.p->nodeId);
00442     }
00443   }
00444 
00445 
00460 }
00461 
00462 void
00463 Grep::execINCL_NODEREQ(Signal* signal) 
00464 {
00465   jamEntry();
00466   
00467   //const Uint32 senderRef = signal->theData[0];
00468   const Uint32 inclNode  = signal->theData[1];
00469 
00470   NodePtr node;
00471   for(m_nodes.first(node); node.i != RNIL; m_nodes.next(node)) 
00472   {
00473     jam();
00474     const Uint32 nodeId = node.p->nodeId;
00475     if (inclNode == nodeId) {
00476       jam();
00477       
00478       ndbrequire(node.p->alive == 0);
00479       ndbassert(!m_aliveNodes.get(nodeId));
00480       
00481       node.p->alive = 1;
00482       m_aliveNodes.set(nodeId);
00483       
00484       break;
00485     }
00486   }
00487 
00491 #if 0 
00492   signal->theData[0] = reference();
00493   
00494   sendSignal(senderRef, GSN_INCL_NODECONF, signal, 1, JBB);
00495 #endif  
00496 }
00497 
00498 
00502 void 
00503 Grep::PSCoord::prepareOperationRec(SubCoordinatorPtr subPtr, 
00504                                    BlockReference subscriber,
00505                                    Uint32 subId,
00506                                    Uint32 subKey,
00507                                    Uint32 request) 
00508 {
00509   subPtr.p->m_coordinatorRef     = reference();
00510   subPtr.p->m_subscriberRef      = subscriber;
00511   subPtr.p->m_subscriberData     = subPtr.i;
00512   subPtr.p->m_subscriptionId     = subId;
00513   subPtr.p->m_subscriptionKey    = subKey;
00514   subPtr.p->m_outstandingRequest = request;
00515 }
00516 
00517 
00518 /**************************************************************************
00519  * ------------------------------------------------------------------------
00520  *  MODULE:    CREATE SUBSCRIPTION ID
00521  * ------------------------------------------------------------------------
00522  * 
00523  *  Requests SUMA to create a unique subscription id 
00524  **************************************************************************/
00525 
00526 void 
00527 Grep::PSCoord::execGREP_CREATE_SUBID_REQ(Signal* signal) 
00528 {
00529   jamEntry();
00530 
00531   CreateSubscriptionIdReq * req = 
00532     (CreateSubscriptionIdReq*)signal->getDataPtr();
00533   BlockReference ref = signal->getSendersBlockRef();
00534   
00535   SubCoordinatorPtr subPtr;
00536   if( !c_subCoordinatorPool.seize(subPtr)) {
00537     jam();
00538     SubCoordinator sub;
00539     sub.m_subscriberRef   = ref;
00540     sub.m_subscriptionId  = 0;
00541     sub.m_subscriptionKey = 0;
00542     sendRefToSS(signal, sub, GrepError::SUBSCRIPTION_ID_NOMEM );
00543     return;
00544   }
00545   prepareOperationRec(subPtr,
00546                       ref,
00547                       0,0,
00548                       GSN_CREATE_SUBID_REQ);
00549 
00550   
00551   ndbout_c("SUBID_REQ  Ref %d",ref);
00552   req->senderData=subPtr.p->m_subscriberData;
00553 
00554   sendSignal(SUMA_REF, GSN_CREATE_SUBID_REQ, signal, 
00555              SubCreateReq::SignalLength, JBB);    
00556 
00557 #if 1 //def DEBUG_GREP_SUBSCRIPTION
00558   ndbout_c("Grep::PSCoord: Sent CREATE_SUBID_REQ to SUMA");
00559 #endif
00560 }
00561 
00562 void 
00563 Grep::PSCoord::execCREATE_SUBID_CONF(Signal* signal) 
00564 {
00565   jamEntry();
00566   CreateSubscriptionIdConf const * conf = 
00567     (CreateSubscriptionIdConf *)signal->getDataPtr();
00568   Uint32 subId    = conf->subscriptionId;
00569   Uint32 subKey   = conf->subscriptionKey;
00570   Uint32 subData  = conf->subscriberData;
00571 
00572 #if 1 //def DEBUG_GREP_SUBSCRIPTION
00573   ndbout_c("Grep::PSCoord: Recd GREP_SUBID_CONF (subId:%d, subKey:%d)", 
00574            subId, subKey);
00575 #endif
00576 
00577   SubCoordinatorPtr subPtr;
00578   c_subCoordinatorPool.getPtr(subPtr, subData);
00579   BlockReference repRef = subPtr.p->m_subscriberRef;
00580   
00581   { // Check that id/key is unique
00582     SubCoordinator key;
00583     SubCoordinatorPtr tmp;
00584     key.m_subscriptionId  = subId;
00585     key.m_subscriptionKey = subKey;
00586     if(c_runningSubscriptions.find(tmp, key)){
00587       jam();
00588       SubCoordinator sub;
00589       sub.m_subscriberRef=repRef;
00590       sub.m_subscriptionId = subId;
00591       sub.m_subscriptionKey = subKey;
00592       sendRefToSS(signal,sub, GrepError::SUBSCRIPTION_ID_NOT_UNIQUE );
00593       return;
00594     }
00595   }
00596   
00597   sendSignal(subPtr.p->m_subscriberRef, GSN_GREP_CREATE_SUBID_CONF, signal, 
00598              CreateSubscriptionIdConf::SignalLength, JBB);
00599   c_subCoordinatorPool.release(subData);
00600   
00601   m_grep->sendEventRep(signal,
00602                          NDB_LE_GrepSubscriptionInfo, 
00603                          GrepEvent::GrepPS_CreateSubIdConf,
00604                          subId,
00605                          subKey,
00606                          (Uint32)GrepError::GE_NO_ERROR);   
00607 }
00608 
00609 void 
00610 Grep::PSCoord::execCREATE_SUBID_REF(Signal* signal) {
00611   jamEntry();
00612   CreateSubscriptionIdRef const * ref = 
00613     (CreateSubscriptionIdRef *)signal->getDataPtr();
00614   Uint32 subData = ref->subscriberData;
00615   GrepError::GE_Code err;
00616   
00617   Uint32 sendersBlockRef = signal->getSendersBlockRef();
00618   if(sendersBlockRef == SUMA_REF) 
00619   {
00620     jam();
00621     err = GrepError::SUBSCRIPTION_ID_SUMA_FAILED_CREATE;
00622   } else {
00623     jam();
00624     ndbrequire(false); /* Added since errorcode err unhandled
00625                         * TODO: fix correct errorcode
00626                         */
00627     err= GrepError::GE_NO_ERROR; // remove compiler warning
00628   }
00629 
00630   SubCoordinatorPtr subPtr;
00631   c_runningSubscriptions.getPtr(subPtr, subData);
00632   BlockReference repref = subPtr.p->m_subscriberRef;
00633   
00634   SubCoordinator sub;
00635   sub.m_subscriberRef   = repref;
00636   sub.m_subscriptionId  = 0;
00637   sub.m_subscriptionKey = 0;
00638   sendRefToSS(signal,sub, err);
00639 
00640 }
00641 
00642 
00643 /**************************************************************************
00644  * ------------------------------------------------------------------------
00645  *  MODULE:    CREATE SUBSCRIPTION
00646  * ------------------------------------------------------------------------
00647  * 
00648  *  Creates a subscription for every GREP to its local SUMA.
00649  *  GREP node that executes createSubscription becomes the GREP Coord.
00650  **************************************************************************/
00651 
00655 void
00656 Grep::PSCoord::execGREP_SUB_CREATE_REQ(Signal* signal) 
00657 {
00658   jamEntry();
00659   GrepSubCreateReq const * grepReq = (GrepSubCreateReq *)signal->getDataPtr();
00660   Uint32 subId           = grepReq->subscriptionId;
00661   Uint32 subKey          = grepReq->subscriptionKey;
00662   Uint32 subType         = grepReq->subscriptionType;
00663   BlockReference rep     = signal->getSendersBlockRef();
00664 
00665   GrepCreateReq * req    =(GrepCreateReq*)grepReq;
00666 
00667   SubCoordinatorPtr subPtr;
00668 
00669   if( !c_subCoordinatorPool.seize(subPtr)) {
00670     jam();
00671     SubCoordinator sub;
00672     sub.m_subscriberRef = rep;
00673     sub.m_subscriptionId = 0;
00674     sub.m_subscriptionKey = 0;
00675     sub.m_outstandingRequest = GSN_GREP_CREATE_REQ;
00676     sendRefToSS(signal, sub, GrepError::NOSPACE_IN_POOL);
00677     return;
00678   }
00679   prepareOperationRec(subPtr,
00680                       numberToRef(PSREPBLOCKNO, refToNode(rep)), subId, subKey,
00681                       GSN_GREP_CREATE_REQ);
00682 
00683   /* Get the payload of the signal.
00684    */
00685   SegmentedSectionPtr selectedTablesPtr;
00686   if(subType == SubCreateReq::SelectiveTableSnapshot) {
00687     jam();
00688     ndbrequire(signal->getNoOfSections()==1);    
00689     signal->getSection(selectedTablesPtr,0);
00690     signal->header.m_noOfSections = 0;
00691   }
00695   subPtr.p->m_subscriptionType = subType;
00696   req->senderRef        = reference();
00697   req->subscriberRef    = numberToRef(PSREPBLOCKNO, refToNode(rep));
00698   req->subscriberData   = subPtr.p->m_subscriberData;
00699   req->subscriptionId   = subId; 
00700   req->subscriptionKey  = subKey; 
00701   req->subscriptionType = subType;
00702 
00703   /*add payload if it is a selectivetablesnap*/
00704   if(subType == SubCreateReq::SelectiveTableSnapshot) {
00705     jam();
00706     signal->setSection(selectedTablesPtr, 0);
00707   }
00708 
00709   /******************************
00710    * Send to all PS participants
00711    ******************************/
00712   NodeReceiverGroup rg(GREP,  m_grep->m_aliveNodes);
00713   subPtr.p->m_outstandingParticipants = rg;
00714   sendSignal(rg,
00715              GSN_GREP_CREATE_REQ, signal, 
00716              GrepCreateReq::SignalLength, JBB);
00717 
00718 
00719 #ifdef DEBUG_GREP_SUBSCRIPTION
00720   ndbout_c("Grep::PSCoord: Sent GREP_CREATE_REQ "
00721            "(subId:%d, subKey:%d, subData:%d, subType:%d) to parts",
00722            subId, subKey, subPtr.p->m_subscriberData, subType);
00723 #endif
00724 }
00725 
00726 void 
00727 Grep::PSPart::execGREP_CREATE_REQ(Signal* signal) 
00728 {
00729   jamEntry();
00730   GrepCreateReq * const grepReq = (GrepCreateReq *)signal->getDataPtr();
00731   const Uint32 subId          = grepReq->subscriptionId;
00732   const Uint32 subKey         = grepReq->subscriptionKey;
00733   const Uint32 subData        = grepReq->subscriberData;
00734   const Uint32 subType        = grepReq->subscriptionType;
00735   const Uint32 coordinatorRef = grepReq->senderRef;
00736   const Uint32 subRef         = grepReq->subscriberRef; //this is ref to the
00737                                                         //REP node for this 
00738                                                         //subscription.
00739 
00740   SubscriptionPtr subPtr;
00741   ndbrequire( c_subscriptionPool.seize(subPtr));
00742   subPtr.p->m_coordinatorRef     = coordinatorRef;
00743   subPtr.p->m_subscriptionId     = subId;
00744   subPtr.p->m_subscriptionKey    = subKey;
00745   subPtr.p->m_subscriberRef      = subRef;
00746   subPtr.p->m_subscriberData     = subPtr.i;
00747   subPtr.p->m_subscriptionType   = subType;
00748   subPtr.p->m_outstandingRequest = GSN_GREP_CREATE_REQ; 
00749   subPtr.p->m_operationPtrI      = subData;
00750 
00751   c_subscriptions.add(subPtr);
00752 
00753   SegmentedSectionPtr selectedTablesPtr;
00754   if(subType == SubCreateReq::SelectiveTableSnapshot) {
00755     jam();
00756     ndbrequire(signal->getNoOfSections()==1);
00757     signal->getSection(selectedTablesPtr,0);// SubCreateReq::TABLE_LIST);
00758     signal->header.m_noOfSections = 0;
00759   }
00760 
00764   SubCreateReq * sumaReq = (SubCreateReq *)grepReq;
00765   sumaReq->subscriberRef    = GREP_REF;
00766   sumaReq->subscriberData   = subPtr.p->m_subscriberData;
00767   sumaReq->subscriptionId   = subPtr.p->m_subscriptionId; 
00768   sumaReq->subscriptionKey  = subPtr.p->m_subscriptionKey;
00769   sumaReq->subscriptionType = subPtr.p->m_subscriptionType;
00770   /*add payload if it is a selectivetablesnap*/
00771   if(subType == SubCreateReq::SelectiveTableSnapshot) {
00772     jam();
00773     signal->setSection(selectedTablesPtr, 0);
00774   }  
00775   sendSignal(SUMA_REF, 
00776              GSN_SUB_CREATE_REQ, 
00777              signal, 
00778              SubCreateReq::SignalLength, 
00779              JBB);
00780 }
00781 
00782 void
00783 Grep::PSPart::execSUB_CREATE_CONF(Signal* signal) 
00784 {
00785   jamEntry();  
00786 
00787   SubCreateConf * const conf = (SubCreateConf *)signal->getDataPtr();
00788   Uint32 subData             = conf->subscriberData;
00789 
00790   SubscriptionPtr subPtr;
00791   c_subscriptions.getPtr(subPtr, subData);
00798 #ifdef DEBUG_GREP_SUBSCRIPTION
00799   ndbout_c("Grep::PSPart: Recd SUB_CREATE_CONF "
00800            "(subId:%d, subKey:%d) from SUMA",
00801            conf->subscriptionId, conf->subscriptionKey);
00802 #endif
00803 
00804   /*********************
00805    * Send conf to coord
00806    *********************/
00807   GrepCreateConf * grepConf = (GrepCreateConf*)conf;
00808   grepConf->senderNodeId = getOwnNodeId();
00809   grepConf->senderData = subPtr.p->m_operationPtrI;
00810   sendSignal(subPtr.p->m_coordinatorRef, GSN_GREP_CREATE_CONF, signal, 
00811              GrepCreateConf::SignalLength, JBB);    
00812   subPtr.p->m_outstandingRequest = 0; 
00813 }
00814 
00821 void 
00822 Grep::PSPart::execSUB_CREATE_REF(Signal* signal) 
00823 {
00824   jamEntry();
00825   SubCreateRef * const ref = (SubCreateRef *)signal->getDataPtr();
00826   Uint32 subData           = ref->subscriberData;
00827   GrepError::GE_Code err      = (GrepError::GE_Code)ref->err;
00828   SubscriptionPtr subPtr;
00829   c_subscriptions.getPtr(subPtr, subData);
00830   sendRefToPSCoord(signal, *subPtr.p, err /*error*/);
00831   subPtr.p->m_outstandingRequest = 0;
00832 }
00833 
00834 void 
00835 Grep::PSCoord::execGREP_CREATE_CONF(Signal* signal) 
00836 {
00837   jamEntry();
00838   GrepCreateConf const * conf = (GrepCreateConf *)signal->getDataPtr();
00839   Uint32 subData       = conf->senderData;
00840   Uint32 nodeId        = conf->senderNodeId;
00841 
00842   SubCoordinatorPtr subPtr;
00843   c_subCoordinatorPool.getPtr(subPtr, subData);
00844   
00845   ndbrequire(subPtr.p->m_outstandingRequest == GSN_GREP_CREATE_REQ);
00846   
00847   subPtr.p->m_outstandingParticipants.clearWaitingFor(nodeId);
00848   
00849   if(!subPtr.p->m_outstandingParticipants.done()) return;
00850   /********************************
00851    * All participants have CONF:ed
00852    ********************************/
00853   Uint32 subId = subPtr.p->m_subscriptionId;
00854   Uint32 subKey = subPtr.p->m_subscriptionKey;
00855     
00856   GrepSubCreateConf * grepConf = (GrepSubCreateConf *)signal->getDataPtr();
00857   grepConf->subscriptionId  = subId;
00858   grepConf->subscriptionKey = subKey;
00859   sendSignal(subPtr.p->m_subscriberRef, GSN_GREP_SUB_CREATE_CONF, signal, 
00860              GrepSubCreateConf::SignalLength, JBB);
00861 
00865   m_grep->sendEventRep(signal,
00866                        NDB_LE_GrepSubscriptionInfo,
00867                        GrepEvent::GrepPS_SubCreateConf,
00868                        subId,
00869                        subKey,
00870                        (Uint32)GrepError::GE_NO_ERROR);
00871 
00872   c_subCoordinatorPool.release(subPtr);
00873 
00874 }
00875 
00882 void 
00883 Grep::PSCoord::execGREP_CREATE_REF(Signal* signal) 
00884 {
00885   jamEntry();
00886   GrepCreateRef * const ref = (GrepCreateRef *)signal->getDataPtr();
00887   Uint32 subData = ref->senderData;
00888   Uint32 err     = ref->err;
00889   SubCoordinatorPtr subPtr;
00890   c_runningSubscriptions.getPtr(subPtr, subData);  
00891  
00892   sendRefToSS(signal, *subPtr.p, (GrepError::GE_Code)err /*error*/);
00893 }
00894 
00895 
00896 /**************************************************************************
00897  * ------------------------------------------------------------------------
00898  *  MODULE:    START SUBSCRIPTION
00899  * ------------------------------------------------------------------------
00900  * 
00901  *  Starts a subscription at SUMA.  
00902  *  Each participant starts its own subscription.
00903  **************************************************************************/
00904 
00908 void
00909 Grep::PSCoord::execGREP_SUB_START_REQ(Signal* signal) 
00910 {
00911   jamEntry();
00912   GrepSubStartReq * const subReq = (GrepSubStartReq *)signal->getDataPtr();
00913   SubscriptionData::Part part    = (SubscriptionData::Part) subReq->part;
00914   Uint32 subId                   = subReq->subscriptionId;
00915   Uint32 subKey                  = subReq->subscriptionKey;
00916   BlockReference rep             = signal->getSendersBlockRef();
00917 
00918   SubCoordinatorPtr subPtr;
00919 
00920   if(!c_subCoordinatorPool.seize(subPtr)) {
00921     jam();
00922     SubCoordinator sub;
00923     sub.m_subscriberRef = rep;
00924     sub.m_subscriptionId = 0;
00925     sub.m_subscriptionKey = 0;
00926     sub.m_outstandingRequest = GSN_GREP_START_REQ;
00927     sendRefToSS(signal, sub, GrepError::NOSPACE_IN_POOL);
00928     return;
00929   }
00930   
00931   prepareOperationRec(subPtr,
00932                       numberToRef(PSREPBLOCKNO, refToNode(rep)), 
00933                       subId, subKey,
00934                       GSN_GREP_START_REQ);
00935  
00936   GrepStartReq * const req    = (GrepStartReq *) subReq;
00937   req->part                   = (Uint32) part;
00938   req->subscriptionId         = subPtr.p->m_subscriptionId;
00939   req->subscriptionKey        = subPtr.p->m_subscriptionKey;
00940   req->senderData             = subPtr.p->m_subscriberData;
00941 
00942   /***************************
00943    * Send to all participants
00944    ***************************/
00945   NodeReceiverGroup rg(GREP,  m_grep->m_aliveNodes);
00946   subPtr.p->m_outstandingParticipants = rg;
00947   sendSignal(rg,
00948              GSN_GREP_START_REQ, 
00949              signal, 
00950              GrepStartReq::SignalLength, JBB);
00951 
00952 #ifdef DEBUG_GREP_SUBSCRIPTION
00953   ndbout_c("Grep::PSCoord: Sent GREP_START_REQ "
00954            "(subId:%d, subKey:%d, senderData:%d, part:%d) to all participants",
00955            req->subscriptionId, req->subscriptionKey, req->senderData, part);
00956 #endif
00957 }
00958 
00959 
00960 void 
00961 Grep::PSPart::execGREP_START_REQ(Signal* signal) 
00962 {
00963   jamEntry();
00964   GrepStartReq * const grepReq = (GrepStartReq *) signal->getDataPtr();    
00965   SubscriptionData::Part part  = (SubscriptionData::Part)grepReq->part;
00966   Uint32 subId                 = grepReq->subscriptionId;
00967   Uint32 subKey                = grepReq->subscriptionKey;
00968   Uint32 operationPtrI         = grepReq->senderData;
00969   
00970   Subscription key;
00971   key.m_subscriptionId        = subId;
00972   key.m_subscriptionKey       = subKey;
00973   SubscriptionPtr subPtr;
00974   ndbrequire(c_subscriptions.find(subPtr, key));;
00975   subPtr.p->m_outstandingRequest = GSN_GREP_START_REQ; 
00976   subPtr.p->m_operationPtrI = operationPtrI;
00980   SubStartReq * sumaReq    = (SubStartReq *) grepReq;
00981   sumaReq->subscriptionId  = subId; 
00982   sumaReq->subscriptionKey = subKey;
00983   sumaReq->subscriberData  = subPtr.i;
00984   sumaReq->part            = (Uint32) part;
00985 
00986   sendSignal(SUMA_REF, GSN_SUB_START_REQ, signal, 
00987              SubStartReq::SignalLength, JBB);  
00988 #ifdef DEBUG_GREP_SUBSCRIPTION
00989   ndbout_c("Grep::PSPart: Sent SUB_START_REQ (subId:%d, subKey:%d, part:%d)", 
00990            subId, subKey, (Uint32)part);
00991 #endif
00992 }
00993 
00994 
00995 void
00996 Grep::PSPart::execSUB_START_CONF(Signal* signal) 
00997 {
00998   jamEntry();
00999   
01000   SubStartConf * const conf = (SubStartConf *) signal->getDataPtr();
01001   SubscriptionData::Part part = (SubscriptionData::Part)conf->part;
01002   Uint32 subId                = conf->subscriptionId;
01003   Uint32 subKey               = conf->subscriptionKey;
01004   Uint32 subData              = conf->subscriberData;
01005   Uint32 firstGCI             = conf->firstGCI;
01006 #ifdef DEBUG_GREP_SUBSCRIPTION
01007   ndbout_c("Grep::PSPart: Recd SUB_START_CONF "
01008            "(subId:%d, subKey:%d, subData:%d)",
01009            subId, subKey, subData);
01010 #endif
01011 
01012   SubscriptionPtr subPtr;
01013   c_subscriptions.getPtr(subPtr, subData);
01014   ndbrequire(subPtr.p->m_subscriptionId  == subId);
01015   ndbrequire(subPtr.p->m_subscriptionKey == subKey);
01016 
01017   GrepStartConf * grepConf = (GrepStartConf *)conf;
01018   grepConf->senderData      = subPtr.p->m_operationPtrI;
01019   grepConf->part            = (Uint32) part;
01020   grepConf->subscriptionKey = subKey;
01021   grepConf->subscriptionId  = subId;
01022   grepConf->firstGCI        = firstGCI;
01023   grepConf->senderNodeId    = getOwnNodeId();
01024   sendSignal(subPtr.p->m_coordinatorRef, GSN_GREP_START_CONF, signal, 
01025              GrepStartConf::SignalLength, JBB);
01026   subPtr.p->m_outstandingRequest = 0; 
01027 
01028 #ifdef DEBUG_GREP_SUBSCRIPTION
01029   ndbout_c("Grep::PSPart: Sent GREP_START_CONF "
01030            "(subId:%d, subKey:%d, subData:%d, part:%d)",
01031            subId, subKey, subData, part);
01032 #endif
01033 }
01034 
01035 
01044 void 
01045 Grep::PSPart::execSUB_START_REF(Signal* signal) 
01046 {
01047   SubStartRef * const ref = (SubStartRef *)signal->getDataPtr();
01048   Uint32 subData          = ref->subscriberData;
01049   GrepError::GE_Code err     = (GrepError::GE_Code)ref->err;
01050   SubscriptionData::Part part = (SubscriptionData::Part)ref->part;
01051   SubscriptionPtr subPtr;
01052   c_subscriptions.getPtr(subPtr, subData);
01053   sendRefToPSCoord(signal, *subPtr.p, err /*error*/, part);
01054   subPtr.p->m_outstandingRequest = 0;
01055 }
01056 
01057 
01061 void 
01062 Grep::PSCoord::execGREP_START_CONF(Signal* signal) 
01063 {
01064   jamEntry();  
01065 
01066   GrepStartConf * const conf = (GrepStartConf *) signal->getDataPtr();
01067   Uint32 subData              = conf->senderData;
01068   SubscriptionData::Part part = (SubscriptionData::Part)conf->part;
01069   Uint32 subId                = conf->subscriptionId;
01070   Uint32 subKey               = conf->subscriptionKey;
01071   Uint32 firstGCI             = conf->firstGCI;
01072 
01073   SubCoordinatorPtr subPtr;
01074   c_subCoordinatorPool.getPtr(subPtr, subData);
01075   ndbrequire(subPtr.p->m_outstandingRequest == GSN_GREP_START_REQ);
01076 
01077   subPtr.p->m_outstandingParticipants.clearWaitingFor(conf->senderNodeId);
01078 
01079   if(!subPtr.p->m_outstandingParticipants.done()) return;
01080   jam();
01081   
01082   /*************************
01083    * All participants ready 
01084    *************************/
01085   GrepSubStartConf * grepConf = (GrepSubStartConf *) conf;
01086   grepConf->part              = part;
01087   grepConf->subscriptionId    = subId;
01088   grepConf->subscriptionKey   = subKey;
01089   grepConf->firstGCI          = firstGCI;
01090 
01091   bool ok = false;
01092   switch(part) {
01093   case SubscriptionData::MetaData:
01094     ok = true;
01095     sendSignal(subPtr.p->m_subscriberRef, GSN_GREP_SUB_START_CONF, signal, 
01096                GrepSubStartConf::SignalLength, JBB);
01097     
01101     m_grep->sendEventRep(signal,
01102                          NDB_LE_GrepSubscriptionInfo,
01103                          GrepEvent::GrepPS_SubStartMetaConf,
01104                          subId, subKey,
01105                          (Uint32)GrepError::GE_NO_ERROR);
01106     
01107     c_subCoordinatorPool.release(subPtr);
01108     break;
01109   case SubscriptionData::TableData:
01110     ok = true;
01111     sendSignal(subPtr.p->m_subscriberRef, GSN_GREP_SUB_START_CONF, signal, 
01112                GrepSubStartConf::SignalLength, JBB);
01113 
01117     m_grep->sendEventRep(signal,
01118                          NDB_LE_GrepSubscriptionInfo,
01119                          GrepEvent::GrepPS_SubStartDataConf,
01120                          subId, subKey,
01121                          (Uint32)GrepError::GE_NO_ERROR);
01122     
01123 
01124     c_subCoordinatorPool.release(subPtr);
01125     break;
01126   }
01127   ndbrequire(ok);
01128 
01129 #ifdef DEBUG_GREP_SUBSCRIPTION
01130   ndbout_c("Grep::PSCoord: Recd SUB_START_CONF (subId:%d, subKey:%d, part:%d) "
01131            "from all slaves",
01132            subId, subKey, (Uint32)part); 
01133 #endif
01134 }
01135 
01142 void 
01143 Grep::PSCoord::execGREP_START_REF(Signal* signal) 
01144 {
01145   jamEntry();
01146   GrepStartRef * const ref = (GrepStartRef *)signal->getDataPtr();
01147   Uint32 subData           = ref->senderData;
01148   GrepError::GE_Code err      = (GrepError::GE_Code)ref->err;
01149   SubscriptionData::Part part  = (SubscriptionData::Part)ref->part;
01150 
01151   SubCoordinatorPtr subPtr;
01152   c_runningSubscriptions.getPtr(subPtr, subData);  
01153   sendRefToSS(signal, *subPtr.p, err /*error*/, part);
01154 }
01155  
01156 /**************************************************************************
01157  * ------------------------------------------------------------------------
01158  *  MODULE:    REMOVE SUBSCRIPTION
01159  * ------------------------------------------------------------------------
01160  * 
01161  *  Remove a subscription at SUMA.  
01162  *  Each participant removes its own subscription.
01163  *  We start by deleting the subscription inside the requestor
01164  *  since, we don't know if nodes (REP nodes or DB nodes) 
01165  *  have disconnected after we sent out this and 
01166  *  if we dont delete the sub in the requestor now, 
01167  *  we won't be able to create a new subscription
01168  **************************************************************************/
01169 
01173 void
01174 Grep::PSCoord::execGREP_SUB_REMOVE_REQ(Signal* signal) 
01175 {
01176   jamEntry();
01177   GrepSubRemoveReq * const subReq = (GrepSubRemoveReq *)signal->getDataPtr();
01178   Uint32 subId           = subReq->subscriptionId;
01179   Uint32 subKey          = subReq->subscriptionKey;
01180   BlockReference rep     = signal->getSendersBlockRef(); 
01181 
01182   SubCoordinatorPtr subPtr;
01183   if( !c_subCoordinatorPool.seize(subPtr)) {
01184     jam();
01185     SubCoordinator sub;
01186     sub.m_subscriberRef = rep;
01187     sub.m_subscriptionId = 0;
01188     sub.m_subscriptionKey = 0;
01189     sub.m_outstandingRequest = GSN_GREP_REMOVE_REQ;
01190     sendRefToSS(signal, sub, GrepError::NOSPACE_IN_POOL);
01191     return;
01192   }
01193 
01194 
01195   prepareOperationRec(subPtr,
01196                       numberToRef(PSREPBLOCKNO, refToNode(rep)), 
01197                       subId, subKey,
01198                       GSN_GREP_REMOVE_REQ);
01199 
01200   c_runningSubscriptions.add(subPtr);
01201 
01202   GrepRemoveReq * req         = (GrepRemoveReq *) subReq;
01203   req->subscriptionId         = subPtr.p->m_subscriptionId;
01204   req->subscriptionKey        = subPtr.p->m_subscriptionKey;
01205   req->senderData             = subPtr.p->m_subscriberData;
01206   req->senderRef              = subPtr.p->m_coordinatorRef;
01207 
01208   /***************************
01209    * Send to all participants
01210    ***************************/
01211   NodeReceiverGroup rg(GREP,  m_grep->m_aliveNodes);
01212   subPtr.p->m_outstandingParticipants = rg;
01213   sendSignal(rg,
01214              GSN_GREP_REMOVE_REQ, signal, 
01215              GrepRemoveReq::SignalLength, JBB);
01216 }
01217 
01218 
01219 void 
01220 Grep::PSPart::execGREP_REMOVE_REQ(Signal* signal)
01221 {
01222   jamEntry();
01223   GrepRemoveReq * const grepReq = (GrepRemoveReq *) signal->getDataPtr();    
01224   Uint32 subId        = grepReq->subscriptionId;
01225   Uint32 subKey       = grepReq->subscriptionKey;
01226   Uint32 subData      = grepReq->senderData;
01227   Uint32 coordinator  = grepReq->senderRef;
01228 
01229   Subscription key;
01230   key.m_subscriptionId        = subId;
01231   key.m_subscriptionKey       = subKey;
01232   SubscriptionPtr subPtr;
01233   
01234   if(!c_subscriptions.find(subPtr, key))
01235     {
01240       GrepRemoveConf * grepConf = (GrepRemoveConf *)grepReq;
01241       grepConf->subscriptionKey = subKey;
01242       grepConf->subscriptionId  = subId;
01243       grepConf->senderData      = subData;
01244       grepConf->senderNodeId    = getOwnNodeId();
01245       sendSignal(coordinator, GSN_GREP_REMOVE_CONF, signal, 
01246                  GrepRemoveConf::SignalLength, JBB);
01247       return;      
01248     }
01249 
01250   subPtr.p->m_operationPtrI = subData;
01251   subPtr.p->m_coordinatorRef = coordinator;
01252   subPtr.p->m_outstandingRequest = GSN_GREP_REMOVE_REQ; 
01253 
01257   SubRemoveReq * sumaReq   = (SubRemoveReq *) grepReq;
01258   sumaReq->subscriptionId  = subId; 
01259   sumaReq->subscriptionKey = subKey;
01260   sumaReq->senderData  = subPtr.i;
01261   sendSignal(SUMA_REF, GSN_SUB_REMOVE_REQ, signal, 
01262              SubStartReq::SignalLength, JBB);  
01263 }
01264 
01265 
01269 void
01270 Grep::PSPart::execSUB_REMOVE_CONF(Signal* signal) 
01271 {
01272   jamEntry();
01273   SubRemoveConf * const conf = (SubRemoveConf *) signal->getDataPtr();
01274   Uint32 subId     = conf->subscriptionId;
01275   Uint32 subKey    = conf->subscriptionKey;
01276   Uint32 subData   = conf->subscriberData;
01277 
01278   SubscriptionPtr subPtr;
01279   c_subscriptions.getPtr(subPtr, subData);
01280   ndbrequire(subPtr.p->m_subscriptionId  == subId);
01281   ndbrequire(subPtr.p->m_subscriptionKey == subKey);
01282   subPtr.p->m_outstandingRequest = 0; 
01283   GrepRemoveConf * grepConf = (GrepRemoveConf *)conf;
01284   grepConf->subscriptionKey = subKey;
01285   grepConf->subscriptionId  = subId;
01286   grepConf->senderData      = subPtr.p->m_operationPtrI;
01287   grepConf->senderNodeId    = getOwnNodeId();
01288   sendSignal(subPtr.p->m_coordinatorRef, GSN_GREP_REMOVE_CONF, signal, 
01289              GrepRemoveConf::SignalLength, JBB);
01290   c_subscriptions.release(subPtr);  
01291 
01292 }
01293 
01294 
01298 void
01299 Grep::PSPart::execSUB_REMOVE_REF(Signal* signal) 
01300 {
01301   jamEntry();
01302   SubRemoveRef * const ref = (SubRemoveRef *)signal->getDataPtr();
01303   Uint32 subData           = ref->subscriberData;
01304   /*  GrepError::GE_Code err      = (GrepError::GE_Code)ref->err;*/
01305   SubscriptionPtr subPtr;
01306   c_subscriptions.getPtr(subPtr, subData);
01307   
01308   //sendSubRemoveRef_PSCoord(signal, *subPtr.p, err /*error*/);
01309 }
01310 
01311 
01315 void 
01316 Grep::PSCoord::execGREP_REMOVE_CONF(Signal* signal) 
01317 {
01318   jamEntry();
01319   GrepRemoveConf * const conf = (GrepRemoveConf *) signal->getDataPtr();
01320   Uint32 subId                = conf->subscriptionId;
01321   Uint32 subKey               = conf->subscriptionKey;
01322   Uint32 senderNodeId         = conf->senderNodeId;
01323   Uint32 subData              = conf->senderData;
01324   SubCoordinatorPtr subPtr;
01325   c_subCoordinatorPool.getPtr(subPtr, subData);
01326 
01327   ndbrequire(subPtr.p->m_outstandingRequest == GSN_GREP_REMOVE_REQ);
01328   
01329   subPtr.p->m_outstandingParticipants.clearWaitingFor(senderNodeId);
01330 
01331   if(!subPtr.p->m_outstandingParticipants.done()) { 
01332     jam();
01333     return;
01334   }
01335   jam();
01336   
01337   /*************************
01338    * All participants ready 
01339    *************************/
01340 
01341   m_grep->sendEventRep(signal,
01342                        NDB_LE_GrepSubscriptionInfo,
01343                        GrepEvent::GrepPS_SubRemoveConf,
01344                        subId, subKey,
01345                        GrepError::GE_NO_ERROR);
01346 
01347   GrepSubRemoveConf * grepConf = (GrepSubRemoveConf *) conf;
01348   grepConf->subscriptionId = subId;
01349   grepConf->subscriptionKey = subKey;
01350   sendSignal(subPtr.p->m_subscriberRef, GSN_GREP_SUB_REMOVE_CONF, signal, 
01351              GrepSubRemoveConf::SignalLength, JBB);
01352   
01353   c_subCoordinatorPool.release(subPtr);
01354 }
01355 
01356 
01357 
01358 void 
01359 Grep::PSCoord::execGREP_REMOVE_REF(Signal* signal) 
01360 {
01361   jamEntry();
01362   GrepRemoveRef * const ref = (GrepRemoveRef *)signal->getDataPtr();
01363   Uint32 subData = ref->senderData;
01364   Uint32 err     = ref->err;
01365   SubCoordinatorPtr subPtr;
01366 
01371   for( c_runningSubscriptions.first(c_subPtr); 
01372        !c_subPtr.isNull(); c_runningSubscriptions.next(c_subPtr)) {
01373     jam();
01374     subPtr.i = c_subPtr.curr.i;
01375     subPtr.p = c_runningSubscriptions.getPtr(subPtr.i);
01376     if(subData == subPtr.i) 
01377       {
01378       sendRefToSS(signal, *subPtr.p, (GrepError::GE_Code)err /*error*/);
01379       c_runningSubscriptions.release(subPtr);
01380     return;
01381     }
01382   }
01383   return;
01384 }
01385 
01386 
01387 /**************************************************************************
01388  * ------------------------------------------------------------------------
01389  *  MODULE:       LOG RECORDS (COMING IN FROM LOCAL SUMA)
01390  * ------------------------------------------------------------------------
01391  * 
01392  *  After the subscription is started, we get log records from SUMA.
01393  *  Both table data and meta data log records are received.
01394  *
01395  *  TODO:
01396  *  @todo Changes in meta data is currently not 
01397  *        allowed during global replication
01398  **************************************************************************/
01399 
01400 void
01401 Grep::PSPart::execSUB_META_DATA(Signal* signal) 
01402 {
01403   jamEntry();
01404   if(m_recoveryMode) {
01405     jam();
01406     return;
01407   }
01411   SubMetaData * data = (SubMetaData *) signal->getDataPtrSend();
01412   SubscriptionPtr subPtr;  
01413   c_subscriptions.getPtr(subPtr, data->subscriberData);
01414 
01415   /***************************
01416    * Forward data to REP node
01417    ***************************/
01418   sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_META_DATA, signal, 
01419              SubMetaData::SignalLength, JBB);
01420 #ifdef DEBUG_GREP_SUBSCRIPTION
01421   ndbout_c("Grep::PSPart: Sent SUB_META_DATA to REP "
01422            "(TableId: %d, SenderData: %d, GCI: %d)",
01423            data->tableId, data->senderData, data->gci);
01424 #endif
01425 }
01426 
01430 void
01431 Grep::PSPart::execSUB_TABLE_DATA(Signal* signal) 
01432 {
01433   jamEntry();
01434   if(m_recoveryMode) {
01435     jam();
01436     return;
01437   }
01438   ndbrequire(m_repRef!=0);
01439   
01440   if(!assembleFragments(signal)) { jam(); return; }
01441   
01445   if(signal->getNoOfSections() == 2)
01446   {
01447     jam();
01451     if(m_firstScanGCI == 1 && m_lastScanGCI == 0) {
01452       m_firstScanGCI = m_latestSeenGCI;
01453       m_lastScanGCI = m_latestSeenGCI;
01454     }
01455     SubTableData * data = (SubTableData*)signal->getDataPtrSend();
01456     Uint32 subData      = data->senderData;
01457     data->gci           = m_latestSeenGCI;  
01458     data->logType       = SubTableData::SCAN;
01459     
01460     SubscriptionPtr subPtr;
01461     c_subscriptions.getPtr(subPtr, subData);
01462     sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_TABLE_DATA, signal, 
01463                SubTableData::SignalLength, JBB);
01464 #ifdef DEBUG_GREP
01465     ndbout_c("Grep::PSPart: Sent SUB_TABLE_DATA (Scan, GCI: %d)", 
01466              data->gci);
01467 #endif
01468   } 
01469   else 
01470   {
01471     jam();
01475     SubTableData * data = (SubTableData*)signal->getDataPtrSend();
01476     data->logType       = SubTableData::LOG;
01477     Uint32 subData      = data->senderData;
01478     if (data->gci > m_latestSeenGCI) m_latestSeenGCI = data->gci;
01479 
01480     // Reformat to sections and send to replication node.
01481     LinearSectionPtr ptr[3];
01482     ptr[0].p  =  signal->theData + 25;
01483     ptr[0].sz =  data->noOfAttributes;
01484     ptr[1].p  =  signal->theData + 25 + MAX_ATTRIBUTES_IN_TABLE;
01485     ptr[1].sz =  data->dataSize;
01486 
01487     SubscriptionPtr subPtr;
01488     c_subscriptions.getPtr(subPtr, subData);
01489     sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_TABLE_DATA,
01490                signal, SubTableData::SignalLength, JBB, ptr, 2);       
01491 #ifdef DEBUG_GREP
01492     ndbout_c("Grep::PSPart: Sent SUB_TABLE_DATA (Log, GCI: %d)", 
01493              data->gci);
01494 #endif
01495   }
01496 }
01497 
01498 
01499 /**************************************************************************
01500  * ------------------------------------------------------------------------
01501  *  MODULE:    START SYNCHRONIZATION
01502  * ------------------------------------------------------------------------
01503  * 
01504  *  
01505  **************************************************************************/
01506 
01510 void
01511 Grep::PSCoord::execGREP_SUB_SYNC_REQ(Signal* signal) 
01512 {
01513   jamEntry();
01514   GrepSubSyncReq * const subReq = (GrepSubSyncReq*)signal->getDataPtr();
01515   SubscriptionData::Part part   = (SubscriptionData::Part) subReq->part;
01516   Uint32 subId                  = subReq->subscriptionId;
01517   Uint32 subKey                 = subReq->subscriptionKey;
01518   BlockReference rep            = signal->getSendersBlockRef(); 
01519 
01520   SubCoordinatorPtr subPtr;
01521   if( !c_subCoordinatorPool.seize(subPtr)) {
01522     jam();
01523     SubCoordinator sub;
01524     sub.m_subscriberRef = rep;
01525     sub.m_subscriptionId = 0;
01526     sub.m_subscriptionKey = 0;
01527     sub.m_outstandingRequest = GSN_GREP_SYNC_REQ;
01528     sendRefToSS(signal, sub, GrepError::NOSPACE_IN_POOL);
01529     return;
01530   }
01531 
01532   prepareOperationRec(subPtr,
01533                       numberToRef(PSREPBLOCKNO, refToNode(rep)), 
01534                       subId, subKey,
01535                       GSN_GREP_SYNC_REQ);
01536 
01537   GrepSyncReq * req = (GrepSyncReq *)subReq;
01538   req->subscriptionId   = subPtr.p->m_subscriptionId;
01539   req->subscriptionKey  = subPtr.p->m_subscriptionKey;
01540   req->senderData       = subPtr.p->m_subscriberData;
01541   req->part             = (Uint32)part;
01542   
01543   /***************************
01544    * Send to all participants
01545    ***************************/
01546   NodeReceiverGroup rg(GREP,  m_grep->m_aliveNodes);
01547   subPtr.p->m_outstandingParticipants = rg;
01548   sendSignal(rg,
01549              GSN_GREP_SYNC_REQ, signal, GrepSyncReq::SignalLength, JBB);
01550 }
01551 
01552 
01556 void 
01557 Grep::PSPart::execGREP_SYNC_REQ(Signal* signal) 
01558 {
01559   jamEntry();
01560   
01561   GrepSyncReq * const grepReq = (GrepSyncReq *) signal->getDataPtr();    
01562   Uint32 part                 = grepReq->part;
01563   Uint32 subId                = grepReq->subscriptionId;
01564   Uint32 subKey               = grepReq->subscriptionKey;
01565   Uint32 subData              = grepReq->senderData;
01566 
01567   Subscription key;
01568   key.m_subscriptionId        = subId;
01569   key.m_subscriptionKey       = subKey;
01570   SubscriptionPtr subPtr;
01571   ndbrequire(c_subscriptions.find(subPtr, key));
01572   subPtr.p->m_operationPtrI   = subData;
01573   subPtr.p->m_outstandingRequest = GSN_GREP_SYNC_REQ; 
01574   /**********************************
01575    * Send SUB_SYNC_REQ to local SUMA
01576    **********************************/
01577   SubSyncReq * sumaReq      = (SubSyncReq *)grepReq;    
01578   sumaReq->subscriptionId   = subId; 
01579   sumaReq->subscriptionKey  = subKey;
01580   sumaReq->subscriberData   = subPtr.i;
01581   sumaReq->part             = part;
01582   sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ, signal, 
01583              SubSyncReq::SignalLength, JBB);
01584 }
01585 
01586 
01590 void 
01591 Grep::PSPart::execSUB_SYNC_CONF(Signal* signal) 
01592 {
01593   jamEntry();
01594   
01595   SubSyncConf * const conf = (SubSyncConf *) signal->getDataPtr();
01596   Uint32 part              = conf->part;
01597   Uint32 subId             = conf->subscriptionId;
01598   Uint32 subKey            = conf->subscriptionKey;
01599   Uint32 subData           = conf->subscriberData;
01600 
01601   SubscriptionPtr subPtr;
01602   c_subscriptions.getPtr(subPtr, subData);
01603 
01604   ndbrequire(subPtr.p->m_subscriptionId  == subId);
01605   ndbrequire(subPtr.p->m_subscriptionKey == subKey);
01606   
01607   GrepSyncConf * grepConf     = (GrepSyncConf *)conf;
01608   grepConf->senderNodeId      = getOwnNodeId();
01609   grepConf->part              = part;
01610   grepConf->firstGCI          = m_firstScanGCI;
01611   grepConf->lastGCI           = m_lastScanGCI;
01612   grepConf->subscriptionId    = subId;
01613   grepConf->subscriptionKey   = subKey;
01614   grepConf->senderData        = subPtr.p->m_operationPtrI;
01615   sendSignal(subPtr.p->m_coordinatorRef, GSN_GREP_SYNC_CONF, signal, 
01616              GrepSyncConf::SignalLength, JBB);
01617 
01618   m_firstScanGCI = 1;
01619   m_lastScanGCI = 0;
01620   subPtr.p->m_outstandingRequest = 0;
01621 }
01622 
01631 void 
01632 Grep::PSPart::execSUB_SYNC_REF(Signal* signal) {
01633   jamEntry();
01634   SubSyncRef * const ref = (SubSyncRef *)signal->getDataPtr();
01635   Uint32 subData              = ref->subscriberData;
01636   GrepError::GE_Code err     = (GrepError::GE_Code)ref->err;
01637   SubscriptionData::Part part = (SubscriptionData::Part)ref->part;
01638   
01639   SubscriptionPtr subPtr;
01640   c_subscriptions.getPtr(subPtr, subData);
01641   sendRefToPSCoord(signal, *subPtr.p, err /*error*/ ,part);
01642   subPtr.p->m_outstandingRequest = 0;
01643 }
01644 
01648 void 
01649 Grep::PSCoord::execGREP_SYNC_CONF(Signal* signal) 
01650 {
01651   jamEntry();
01652 
01653   GrepSyncConf const * conf = (GrepSyncConf *)signal->getDataPtr();
01654   Uint32 part               = conf->part;
01655   Uint32 firstGCI           = conf->firstGCI;
01656   Uint32 lastGCI            = conf->lastGCI;
01657   Uint32 subId              = conf->subscriptionId;
01658   Uint32 subKey             = conf->subscriptionKey;
01659   Uint32 subData            = conf->senderData;
01660   
01661   SubCoordinatorPtr subPtr;
01662   c_subCoordinatorPool.getPtr(subPtr, subData);
01663   ndbrequire(subPtr.p->m_outstandingRequest == GSN_GREP_SYNC_REQ);
01664   
01665   subPtr.p->m_outstandingParticipants.clearWaitingFor(conf->senderNodeId);
01666   if(!subPtr.p->m_outstandingParticipants.done()) return;
01667 
01671   GrepEvent::Subscription event;
01672   if(part == SubscriptionData::MetaData) 
01673     event = GrepEvent::GrepPS_SubSyncMetaConf;
01674   else
01675     event = GrepEvent::GrepPS_SubSyncDataConf;
01676   
01677   /* @todo Johan: Add firstGCI here. /Lars */
01678   m_grep->sendEventRep(signal, NDB_LE_GrepSubscriptionInfo,
01679                        event, subId, subKey,
01680                        (Uint32)GrepError::GE_NO_ERROR,
01681                        lastGCI);
01682 
01683   /*************************
01684    * All participants ready 
01685    *************************/
01686   GrepSubSyncConf * grepConf = (GrepSubSyncConf *)conf;
01687   grepConf->part             = part;
01688   grepConf->firstGCI         = firstGCI;
01689   grepConf->lastGCI          = lastGCI;
01690   grepConf->subscriptionId   = subId;
01691   grepConf->subscriptionKey  = subKey;
01692 
01693   sendSignal(subPtr.p->m_subscriberRef, GSN_GREP_SUB_SYNC_CONF, signal, 
01694              GrepSubSyncConf::SignalLength, JBB);
01695   c_subCoordinatorPool.release(subPtr);
01696 }
01697 
01704 void 
01705 Grep::PSCoord::execGREP_SYNC_REF(Signal* signal) {
01706   jamEntry();
01707   GrepSyncRef * const ref = (GrepSyncRef *)signal->getDataPtr();
01708   Uint32 subData              = ref->senderData;
01709   SubscriptionData::Part part = (SubscriptionData::Part)ref->part;
01710   GrepError::GE_Code err         = (GrepError::GE_Code)ref->err;
01711   SubCoordinatorPtr subPtr;
01712   c_runningSubscriptions.getPtr(subPtr, subData);  
01713   sendRefToSS(signal, *subPtr.p, err /*error*/, part);
01714 }
01715 
01716 
01717 
01718 void
01719 Grep::PSCoord::sendRefToSS(Signal * signal, 
01720                            SubCoordinator sub,
01721                            GrepError::GE_Code err,
01722                            SubscriptionData::Part part) {
01734   jam();
01735   GrepEvent::Subscription event;
01736   switch(sub.m_outstandingRequest) {
01737   case GSN_GREP_CREATE_SUBID_REQ: 
01738     {
01739       jam();
01740       CreateSubscriptionIdRef * ref = 
01741         (CreateSubscriptionIdRef*)signal->getDataPtrSend();
01742       ref->err             = (Uint32)err;
01743       ref->subscriptionId  = sub.m_subscriptionId;
01744       ref->subscriptionKey = sub.m_subscriptionKey;
01745       sendSignal(sub.m_subscriberRef, 
01746                  GSN_GREP_CREATE_SUBID_REF,
01747                  signal,
01748                  CreateSubscriptionIdRef::SignalLength,
01749                  JBB);
01750       event = GrepEvent::GrepPS_CreateSubIdRef;
01751     }
01752     break;
01753   case GSN_GREP_CREATE_REQ: 
01754     {
01755       jam();
01756       GrepSubCreateRef * ref = (GrepSubCreateRef*)signal->getDataPtrSend();
01757       ref->err = (Uint32)err;
01758       ref->subscriptionId  = sub.m_subscriptionId;
01759       ref->subscriptionKey = sub.m_subscriptionKey;
01760       sendSignal(sub.m_subscriberRef, GSN_GREP_SUB_CREATE_REF, signal,
01761                  GrepSubCreateRef::SignalLength, JBB);
01762       event = GrepEvent::GrepPS_SubCreateRef;
01763     }
01764     break;
01765   case GSN_GREP_SYNC_REQ: 
01766     {
01767       jam();
01768       GrepSubSyncRef * ref = (GrepSubSyncRef*)signal->getDataPtrSend(); 
01769       ref->err = (Uint32)err;
01770       ref->subscriptionId  = sub.m_subscriptionId;
01771       ref->subscriptionKey = sub.m_subscriptionKey;
01772       ref->part            = (SubscriptionData::Part) part;
01773       sendSignal(sub.m_subscriberRef, 
01774                  GSN_GREP_SUB_SYNC_REF,
01775                  signal,
01776                  GrepSubSyncRef::SignalLength,
01777                  JBB);
01778       if(part == SubscriptionData::MetaData) 
01779         event = GrepEvent::GrepPS_SubSyncMetaRef;
01780       else
01781         event = GrepEvent::GrepPS_SubSyncDataRef;
01782     }
01783     break;
01784   case GSN_GREP_START_REQ:  
01785     {
01786       jam();
01787       GrepSubStartRef * ref = (GrepSubStartRef*)signal->getDataPtrSend();
01788       ref->err = (Uint32)err;
01789       ref->subscriptionId  = sub.m_subscriptionId;
01790       ref->subscriptionKey = sub.m_subscriptionKey;
01791       
01792       sendSignal(sub.m_subscriberRef, GSN_GREP_SUB_START_REF,
01793                  signal, GrepSubStartRef::SignalLength, JBB);
01794       if(part == SubscriptionData::MetaData) 
01795         event = GrepEvent::GrepPS_SubStartMetaRef;
01796       else
01797         event = GrepEvent::GrepPS_SubStartDataRef;  
01801       m_grep->sendEventRep(signal,
01802                            NDB_LE_GrepSubscriptionAlert,
01803                            event,
01804                            sub.m_subscriptionId,
01805                            sub.m_subscriptionKey,
01806                            (Uint32)err);
01807     }
01808     break;
01809   case GSN_GREP_REMOVE_REQ:
01810     {
01811       jam();
01812       GrepSubRemoveRef * ref = (GrepSubRemoveRef*)signal->getDataPtrSend();
01813       ref->subscriptionId  = sub.m_subscriptionId;
01814       ref->subscriptionKey = sub.m_subscriptionKey;
01815       ref->err             = (Uint32)err;
01816       
01817       sendSignal(sub.m_subscriberRef, 
01818                  GSN_GREP_SUB_REMOVE_REF,
01819                  signal,
01820                  GrepSubRemoveRef::SignalLength,
01821                  JBB);
01822       
01823       event = GrepEvent::GrepPS_SubRemoveRef;   
01824     }
01825     break;
01826   default:
01827     ndbrequire(false);
01828     event= GrepEvent::Rep_Disconnect; // remove compiler warning
01829   }  
01833   m_grep->sendEventRep(signal,
01834                        NDB_LE_GrepSubscriptionAlert,
01835                        event,
01836                        sub.m_subscriptionId,
01837                        sub.m_subscriptionKey,
01838                        err);
01839  
01840 }
01841 
01842 
01843 void
01844 Grep::PSPart::sendRefToPSCoord(Signal * signal, 
01845                                Subscription sub,
01846                                GrepError::GE_Code err,
01847                                SubscriptionData::Part part) {
01848 
01849   jam();
01850   GrepEvent::Subscription event;
01851   switch(sub.m_outstandingRequest) {
01852     
01853   case GSN_GREP_CREATE_REQ: 
01854     {
01855       GrepCreateRef * ref = (GrepCreateRef*)signal->getDataPtrSend();
01856       ref->senderData = sub.m_subscriberData;
01857       ref->subscriptionId = sub.m_subscriptionId;
01858       ref->subscriptionKey = sub.m_subscriptionKey;
01859       ref->err = err;
01860       sendSignal(sub.m_coordinatorRef, GSN_GREP_CREATE_REF, signal, 
01861                  GrepCreateRef::SignalLength, JBB);    
01862       
01863       event =  GrepEvent::GrepPS_SubCreateRef;
01864     }
01865     break;
01866   case GSN_GREP_SYNC_REQ: 
01867     {
01868       GrepSyncRef * ref = (GrepSyncRef*)signal->getDataPtrSend();
01869       ref->senderData = sub.m_subscriberData;
01870       ref->subscriptionId = sub.m_subscriptionId;
01871       ref->subscriptionKey = sub.m_subscriptionKey;
01872       ref->part = part;
01873       ref->err = err;
01874       sendSignal(sub.m_coordinatorRef, 
01875                  GSN_GREP_SYNC_REF, signal, 
01876                  GrepSyncRef::SignalLength, JBB);    
01877       if(part == SubscriptionData::MetaData) 
01878         event = GrepEvent::GrepPS_SubSyncMetaRef;
01879       else
01880         event = GrepEvent::GrepPS_SubSyncDataRef;    
01881     }
01882     break;
01883   case GSN_GREP_START_REQ:  
01884     {
01885       jam();
01886       GrepStartRef * ref = (GrepStartRef*)signal->getDataPtrSend();
01887       ref->senderData = sub.m_subscriberData;
01888       ref->subscriptionId = sub.m_subscriptionId;
01889       ref->subscriptionKey = sub.m_subscriptionKey;
01890       ref->part = (Uint32) part;
01891       ref->err = err;
01892       sendSignal(sub.m_coordinatorRef, GSN_GREP_START_REF, signal, 
01893                  GrepStartRef::SignalLength, JBB);                          
01894       if(part == SubscriptionData::MetaData) 
01895         event = GrepEvent::GrepPS_SubStartMetaRef;
01896       else 
01897         event = GrepEvent::GrepPS_SubStartDataRef;    
01898     }
01899     break;
01900 
01901   case GSN_GREP_REMOVE_REQ:
01902     {
01903       jamEntry();
01904       GrepRemoveRef * ref  = (GrepRemoveRef*)signal->getDataPtrSend();
01905       ref->senderData      = sub.m_operationPtrI;
01906       ref->subscriptionId  = sub.m_subscriptionId;
01907       ref->subscriptionKey = sub.m_subscriptionKey;
01908       ref->err             = err;
01909       sendSignal(sub.m_coordinatorRef, GSN_GREP_REMOVE_REF, signal, 
01910                  GrepCreateRef::SignalLength, JBB);    
01911       
01912     }
01913     break;
01914   default:
01915     ndbrequire(false);
01916     event= GrepEvent::Rep_Disconnect; // remove compiler warning
01917   }
01918   
01922   m_grep->sendEventRep(signal,
01923                        NDB_LE_GrepSubscriptionAlert,
01924                        event,
01925                        sub.m_subscriptionId,
01926                        sub.m_subscriptionKey,
01927                        err);
01928  
01929 }
01930 
01931 /**************************************************************************
01932  * ------------------------------------------------------------------------
01933  *  MODULE:    GREP PS Coordinator GCP 
01934  * ------------------------------------------------------------------------
01935  * 
01936  *  
01937  **************************************************************************/
01938 
01939 void 
01940 Grep::PSPart::execSUB_GCP_COMPLETE_REP(Signal* signal) 
01941 {
01942   jamEntry();
01943   if(m_recoveryMode) {
01944     jam();
01945     return;
01946   }
01947   SubGcpCompleteRep * rep  = (SubGcpCompleteRep *)signal->getDataPtrSend();
01948   rep->senderRef           = reference();
01949 
01950   if (rep->gci > m_latestSeenGCI) m_latestSeenGCI = rep->gci;
01951   SubscriptionPtr subPtr;
01952   c_subscriptions.first(c_subPtr);
01953   for(; !c_subPtr.isNull(); c_subscriptions.next(c_subPtr)) {
01954     
01955     subPtr.i = c_subPtr.curr.i;
01956     subPtr.p = c_subscriptions.getPtr(subPtr.i);  
01957     sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_GCP_COMPLETE_REP, signal, 
01958                SubGcpCompleteRep::SignalLength, JBB);
01959   }
01960 
01961 #ifdef DEBUG_GREP
01962   ndbout_c("Grep::PSPart: Recd SUB_GCP_COMPLETE_REP "
01963            "(GCI: %d, nodeId: %d) from SUMA", 
01964            rep->gci, refToNode(rep->senderRef));
01965 #endif
01966 }
01967 
01968 
01969 void
01970 Grep::PSPart::execSUB_SYNC_CONTINUE_REQ(Signal* signal) 
01971 {
01972   jamEntry();
01973   SubSyncContinueReq * const req = (SubSyncContinueReq*)signal->getDataPtr();
01974   Uint32 subData                 = req->subscriberData;
01975 
01976   SubscriptionPtr subPtr;
01977   c_subscriptions.getPtr(subPtr,subData);
01978   
01982   SubSyncContinueConf * conf = (SubSyncContinueConf*)req;
01983   conf->subscriptionId       = subPtr.p->m_subscriptionId;
01984   conf->subscriptionKey      = subPtr.p->m_subscriptionKey;
01985   sendSignal(SUMA_REF, GSN_SUB_SYNC_CONTINUE_CONF, signal, 
01986              SubSyncContinueConf::SignalLength, JBB);  
01987 }
01988 
01989 void
01990 Grep::sendEventRep(Signal * signal,
01991                    Ndb_logevent_type type, 
01992                    GrepEvent::Subscription event,
01993                    Uint32 subId,
01994                    Uint32 subKey,
01995                    Uint32 err,
01996                    Uint32 other) {
01997   jam();
01998   signal->theData[0] = type;
01999   signal->theData[1] = event;
02000   signal->theData[2] = subId;
02001   signal->theData[3] = subKey; 
02002   signal->theData[4] = err;
02003   
02004   if(other==0)
02005     sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 5 ,JBB);        
02006   else {
02007     signal->theData[5] = other;
02008     sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 6 ,JBB);        
02009   }
02010 }

Generated on Wed Jul 20 21:05:32 2005 for MySQL 5.0.9 Beta by  doxygen 1.4.3