00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #include "Grep.hpp"
00018 #include <ndb_version.h>
00019
00020 #include <NdbTCP.h>
00021 #include <Bitmask.hpp>
00022
00023 #include <signaldata/NodeFailRep.hpp>
00024 #include <signaldata/ReadNodesConf.hpp>
00025 #include <signaldata/CheckNodeGroups.hpp>
00026 #include <signaldata/GrepImpl.hpp>
00027 #include <signaldata/RepImpl.hpp>
00028 #include <signaldata/EventReport.hpp>
00029 #include <signaldata/DictTabInfo.hpp>
00030 #include <signaldata/GetTabInfo.hpp>
00031 #include <signaldata/WaitGCP.hpp>
00032 #include <GrepEvent.hpp>
00033 #include <AttributeHeader.hpp>
00034
00035 #define CONTINUEB_DELAY 500
00036 #define SSREPBLOCKNO 2
00037 #define PSREPBLOCKNO 2
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050 static Uint32 g_TypeOfStart = NodeState::ST_ILLEGAL_TYPE;
00051 void
00052 Grep::getNodeGroupMembers(Signal* signal) {
00053 jam();
00057 CheckNodeGroups * sd = (CheckNodeGroups*)signal->getDataPtrSend();
00058 sd->blockRef = reference();
00059 sd->requestType =
00060 CheckNodeGroups::Direct |
00061 CheckNodeGroups::GetNodeGroupMembers;
00062 sd->nodeId = getOwnNodeId();
00063 EXECUTE_DIRECT(DBDIH, GSN_CHECKNODEGROUPSREQ, signal,
00064 CheckNodeGroups::SignalLength);
00065 jamEntry();
00066
00067 c_nodeGroup = sd->output;
00068 c_noNodesInGroup = 0;
00069 for (int i = 0; i < MAX_NDB_NODES; i++) {
00070 if (sd->mask.get(i)) {
00071 if (i == getOwnNodeId()) c_idInNodeGroup = c_noNodesInGroup;
00072 c_nodesInGroup[c_noNodesInGroup] = i;
00073 c_noNodesInGroup++;
00074 }
00075 }
00076 ndbrequire(c_noNodesInGroup > 0);
00077
00078 #ifdef NODEFAIL_DEBUG
00079 for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
00080 ndbout_c ("Grep: NodeGroup %u, me %u, me in group %u, member[%u] %u",
00081 c_nodeGroup, getOwnNodeId(), c_idInNodeGroup,
00082 i, c_nodesInGroup[i]);
00083 }
00084 #endif
00085 }
00086
00087
00088 void
00089 Grep::execSTTOR(Signal* signal)
00090 {
00091 jamEntry();
00092 const Uint32 startphase = signal->theData[1];
00093 const Uint32 typeOfStart = signal->theData[7];
00094 if (startphase == 3)
00095 {
00096 jam();
00097 signal->theData[0] = reference();
00098 g_TypeOfStart = typeOfStart;
00099 sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB);
00100 return;
00101 }
00102 if(startphase == 5) {
00103 jam();
00108 if (g_TypeOfStart == NodeState::ST_NODE_RESTART) {
00109 jam();
00110 pspart.m_recoveryMode = true;
00111 getNodeGroupMembers(signal);
00112 for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
00113 Uint32 ref =numberToRef(GREP, c_nodesInGroup[i]);
00114 if (ref != reference())
00115 sendSignal(ref, GSN_GREP_START_ME, signal,
00116 1 , JBB);
00117 }
00118 } else pspart.m_recoveryMode = false;
00119
00120 }
00121
00122 if(startphase == 7) {
00123 jam();
00124 if (g_TypeOfStart == NodeState::ST_NODE_RESTART) {
00125 pspart.m_recoveryMode = false;
00126 }
00127 }
00128
00129 sendSTTORRY(signal);
00130 }
00131
00132
00133 void
00134 Grep::PSPart::execSTART_ME(Signal* signal)
00135 {
00136 jamEntry();
00137 GrepStartMe * me =(GrepStartMe*)signal->getDataPtr();
00138 BlockReference ref = me->senderRef;
00139 GrepAddSubReq* const addReq = (GrepAddSubReq *)signal->getDataPtr();
00140
00141
00142 SubscriptionPtr subPtr;
00143 c_subscriptions.first(c_subPtr);
00144 for(; !c_subPtr.isNull(); c_subscriptions.next(c_subPtr)) {
00145 jam();
00146 subPtr.i = c_subPtr.curr.i;
00147 subPtr.p = c_subscriptions.getPtr(subPtr.i);
00148 addReq->subscriptionId = subPtr.p->m_subscriptionId;
00149 addReq->subscriptionKey = subPtr.p->m_subscriptionKey;
00150 addReq->subscriberData = subPtr.p->m_subscriberData;
00151 addReq->subscriptionType = subPtr.p->m_subscriptionType;
00152 addReq->senderRef = subPtr.p->m_coordinatorRef;
00153 addReq->subscriberRef =subPtr.p->m_subscriberRef;
00154
00155 sendSignal(ref,
00156 GSN_GREP_ADD_SUB_REQ,
00157 signal,
00158 GrepAddSubReq::SignalLength,
00159 JBB);
00160 }
00161
00162 addReq->subscriptionId = 0;
00163 addReq->subscriptionKey = 0;
00164 addReq->subscriberData = 0;
00165 addReq->subscriptionType = 0;
00166 addReq->senderRef = 0;
00167 addReq->subscriberRef = 0;
00168
00169 sendSignal(ref,
00170 GSN_GREP_ADD_SUB_REQ,
00171 signal,
00172 GrepAddSubReq::SignalLength,
00173 JBB);
00174 }
00175
00176 void
00177 Grep::PSPart::execGREP_ADD_SUB_REQ(Signal* signal)
00178 {
00179 jamEntry();
00180 GrepAddSubReq * const grepReq = (GrepAddSubReq *)signal->getDataPtr();
00181 const Uint32 subId = grepReq->subscriptionId;
00182 const Uint32 subKey = grepReq->subscriptionKey;
00183 const Uint32 subData = grepReq->subscriberData;
00184 const Uint32 subType = grepReq->subscriptionType;
00185 const Uint32 coordinatorRef = grepReq->senderRef;
00186
00190 const Uint32 subRef = grepReq->subscriberRef;
00191
00192 if(subId!=0 && subKey!=0) {
00193 jam();
00194 SubscriptionPtr subPtr;
00195 ndbrequire( c_subscriptionPool.seize(subPtr));
00196 subPtr.p->m_coordinatorRef = coordinatorRef;
00197 subPtr.p->m_subscriptionId = subId;
00198 subPtr.p->m_subscriptionKey = subKey;
00199 subPtr.p->m_subscriberRef = subRef;
00200 subPtr.p->m_subscriberData = subData;
00201 subPtr.p->m_subscriptionType = subType;
00202
00203 c_subscriptions.add(subPtr);
00204 }
00205 else {
00206 jam();
00207 GrepAddSubConf * conf = (GrepAddSubConf *)grepReq;
00208 conf->noOfSub =
00209 c_subscriptionPool.getSize()-c_subscriptionPool.getNoOfFree();
00210 sendSignal(signal->getSendersBlockRef(),
00211 GSN_GREP_ADD_SUB_CONF,
00212 signal,
00213 GrepAddSubConf::SignalLength,
00214 JBB);
00215 }
00216 }
00217
00218 void
00219 Grep::PSPart::execGREP_ADD_SUB_REF(Signal* signal)
00220 {
00224 }
00225
00226 void
00227 Grep::PSPart::execGREP_ADD_SUB_CONF(Signal* signal)
00228 {
00229 jamEntry();
00230 GrepAddSubConf* const conf = (GrepAddSubConf *)signal->getDataPtr();
00231 Uint32 noOfSubscriptions = conf->noOfSub;
00232 Uint32 noOfRestoredSubscriptions =
00233 c_subscriptionPool.getSize()-c_subscriptionPool.getNoOfFree();
00234 if(noOfSubscriptions!=noOfRestoredSubscriptions) {
00235 jam();
00239 ndbrequire(false);
00240 }
00241 }
00242
00243 void
00244 Grep::execREAD_NODESCONF(Signal* signal)
00245 {
00246 jamEntry();
00247 ReadNodesConf * conf = (ReadNodesConf *)signal->getDataPtr();
00248
00249 #if 0
00250 ndbout_c("Grep: Recd READ_NODESCONF");
00251 #endif
00252
00253
00254
00255
00256 Uint32 i;
00257 for (i = 1; i < MAX_NODES; i++)
00258 {
00259 jam();
00260 #if 0
00261 ndbout_c("Grep: Found node %d of type %d", i, getNodeInfo(i).getType());
00262 #endif
00263 if (getNodeInfo(i).getType() == NodeInfo::REP)
00264 {
00265 jam();
00269 pscoord.m_repRef = numberToRef(PSREPBLOCKNO, i);
00270 pspart.m_repRef = numberToRef(PSREPBLOCKNO, i);
00271 #if 0
00272 ndbout_c("Grep: REP node %d detected", i);
00273 #endif
00274 }
00275 }
00276
00277
00278
00279
00280 m_aliveNodes.clear();
00281
00282 Uint32 count = 0;
00283 for(i = 0; i<MAX_NDB_NODES; i++)
00284 {
00285 if (NodeBitmask::get(conf->allNodes, i))
00286 {
00287 jam();
00288 count++;
00289
00290 NodePtr node;
00291 ndbrequire(m_nodes.seize(node));
00292
00293 node.p->nodeId = i;
00294 if (NodeBitmask::get(conf->inactiveNodes, i))
00295 {
00296 node.p->alive = 0;
00297 }
00298 else
00299 {
00300 node.p->alive = 1;
00301 m_aliveNodes.set(i);
00302 }
00303 }
00304 }
00305 m_masterNodeId = conf->masterNodeId;
00306 ndbrequire(count == conf->noOfNodes);
00307 sendSTTORRY(signal);
00308 }
00309
00310 void
00311 Grep::sendSTTORRY(Signal* signal)
00312 {
00313 signal->theData[0] = 0;
00314 signal->theData[3] = 1;
00315 signal->theData[4] = 3;
00316 signal->theData[5] = 5;
00317 signal->theData[6] = 7;
00318 signal->theData[7] = 255;
00319 sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 8, JBB);
00320 }
00321
00322 void
00323 Grep::execNDB_STTOR(Signal* signal)
00324 {
00325 jamEntry();
00326 }
00327
00328 void
00329 Grep::execDUMP_STATE_ORD(Signal* signal)
00330 {
00331 jamEntry();
00332
00333
00334 #if 0
00335 if(sscoord.m_repRef == 0)
00336 {
00337 ndbout << "Grep: Recd DUMP signal but has no connection with REP node"
00338 << endl;
00339 return;
00340 }
00341 #endif
00342
00343
00344
00345
00346
00347
00348
00349
00350
00351
00352
00353
00354
00355
00356
00357
00358
00359
00360 }
00361
00365 void
00366 Grep::execAPI_FAILREQ(Signal* signal)
00367 {
00368 jamEntry();
00369
00370
00371
00377 #if 0
00378 ndbout_c("Grep: API_FAILREQ received for API node %d.", failedApiNode);
00379 #endif
00380
00386 }
00387
00388
00389
00390
00391
00392
00393 void
00394 Grep::execGREP_REQ(Signal* signal)
00395 {
00396 jamEntry();
00397
00398
00399
00405 ndbout_c("Warning! REP commands can only be executed at REP SERVER prompt!");
00406 }
00407
00408
00409
00410
00411
00412
00413
00414 void
00415 Grep::execNODE_FAILREP(Signal* signal)
00416 {
00417 jamEntry();
00418 NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr();
00419 bool changed = false;
00420
00421 NodePtr nodePtr;
00422 for(m_nodes.first(nodePtr); nodePtr.i != RNIL; m_nodes.next(nodePtr))
00423 {
00424 jam();
00425 if (NodeBitmask::get(rep->theNodes, nodePtr.p->nodeId))
00426 {
00427 jam();
00428
00429 if (nodePtr.p->alive)
00430 {
00431 jam();
00432 ndbassert(m_aliveNodes.get(nodePtr.p->nodeId));
00433 changed = true;
00434 }
00435 else
00436 {
00437 ndbassert(!m_aliveNodes.get(nodePtr.p->nodeId));
00438 }
00439
00440 nodePtr.p->alive = 0;
00441 m_aliveNodes.clear(nodePtr.p->nodeId);
00442 }
00443 }
00444
00445
00460 }
00461
00462 void
00463 Grep::execINCL_NODEREQ(Signal* signal)
00464 {
00465 jamEntry();
00466
00467
00468 const Uint32 inclNode = signal->theData[1];
00469
00470 NodePtr node;
00471 for(m_nodes.first(node); node.i != RNIL; m_nodes.next(node))
00472 {
00473 jam();
00474 const Uint32 nodeId = node.p->nodeId;
00475 if (inclNode == nodeId) {
00476 jam();
00477
00478 ndbrequire(node.p->alive == 0);
00479 ndbassert(!m_aliveNodes.get(nodeId));
00480
00481 node.p->alive = 1;
00482 m_aliveNodes.set(nodeId);
00483
00484 break;
00485 }
00486 }
00487
00491 #if 0
00492 signal->theData[0] = reference();
00493
00494 sendSignal(senderRef, GSN_INCL_NODECONF, signal, 1, JBB);
00495 #endif
00496 }
00497
00498
00502 void
00503 Grep::PSCoord::prepareOperationRec(SubCoordinatorPtr subPtr,
00504 BlockReference subscriber,
00505 Uint32 subId,
00506 Uint32 subKey,
00507 Uint32 request)
00508 {
00509 subPtr.p->m_coordinatorRef = reference();
00510 subPtr.p->m_subscriberRef = subscriber;
00511 subPtr.p->m_subscriberData = subPtr.i;
00512 subPtr.p->m_subscriptionId = subId;
00513 subPtr.p->m_subscriptionKey = subKey;
00514 subPtr.p->m_outstandingRequest = request;
00515 }
00516
00517
00518
00519
00520
00521
00522
00523
00524
00525
00526 void
00527 Grep::PSCoord::execGREP_CREATE_SUBID_REQ(Signal* signal)
00528 {
00529 jamEntry();
00530
00531 CreateSubscriptionIdReq * req =
00532 (CreateSubscriptionIdReq*)signal->getDataPtr();
00533 BlockReference ref = signal->getSendersBlockRef();
00534
00535 SubCoordinatorPtr subPtr;
00536 if( !c_subCoordinatorPool.seize(subPtr)) {
00537 jam();
00538 SubCoordinator sub;
00539 sub.m_subscriberRef = ref;
00540 sub.m_subscriptionId = 0;
00541 sub.m_subscriptionKey = 0;
00542 sendRefToSS(signal, sub, GrepError::SUBSCRIPTION_ID_NOMEM );
00543 return;
00544 }
00545 prepareOperationRec(subPtr,
00546 ref,
00547 0,0,
00548 GSN_CREATE_SUBID_REQ);
00549
00550
00551 ndbout_c("SUBID_REQ Ref %d",ref);
00552 req->senderData=subPtr.p->m_subscriberData;
00553
00554 sendSignal(SUMA_REF, GSN_CREATE_SUBID_REQ, signal,
00555 SubCreateReq::SignalLength, JBB);
00556
00557 #if 1 //def DEBUG_GREP_SUBSCRIPTION
00558 ndbout_c("Grep::PSCoord: Sent CREATE_SUBID_REQ to SUMA");
00559 #endif
00560 }
00561
00562 void
00563 Grep::PSCoord::execCREATE_SUBID_CONF(Signal* signal)
00564 {
00565 jamEntry();
00566 CreateSubscriptionIdConf const * conf =
00567 (CreateSubscriptionIdConf *)signal->getDataPtr();
00568 Uint32 subId = conf->subscriptionId;
00569 Uint32 subKey = conf->subscriptionKey;
00570 Uint32 subData = conf->subscriberData;
00571
00572 #if 1 //def DEBUG_GREP_SUBSCRIPTION
00573 ndbout_c("Grep::PSCoord: Recd GREP_SUBID_CONF (subId:%d, subKey:%d)",
00574 subId, subKey);
00575 #endif
00576
00577 SubCoordinatorPtr subPtr;
00578 c_subCoordinatorPool.getPtr(subPtr, subData);
00579 BlockReference repRef = subPtr.p->m_subscriberRef;
00580
00581 {
00582 SubCoordinator key;
00583 SubCoordinatorPtr tmp;
00584 key.m_subscriptionId = subId;
00585 key.m_subscriptionKey = subKey;
00586 if(c_runningSubscriptions.find(tmp, key)){
00587 jam();
00588 SubCoordinator sub;
00589 sub.m_subscriberRef=repRef;
00590 sub.m_subscriptionId = subId;
00591 sub.m_subscriptionKey = subKey;
00592 sendRefToSS(signal,sub, GrepError::SUBSCRIPTION_ID_NOT_UNIQUE );
00593 return;
00594 }
00595 }
00596
00597 sendSignal(subPtr.p->m_subscriberRef, GSN_GREP_CREATE_SUBID_CONF, signal,
00598 CreateSubscriptionIdConf::SignalLength, JBB);
00599 c_subCoordinatorPool.release(subData);
00600
00601 m_grep->sendEventRep(signal,
00602 NDB_LE_GrepSubscriptionInfo,
00603 GrepEvent::GrepPS_CreateSubIdConf,
00604 subId,
00605 subKey,
00606 (Uint32)GrepError::GE_NO_ERROR);
00607 }
00608
00609 void
00610 Grep::PSCoord::execCREATE_SUBID_REF(Signal* signal) {
00611 jamEntry();
00612 CreateSubscriptionIdRef const * ref =
00613 (CreateSubscriptionIdRef *)signal->getDataPtr();
00614 Uint32 subData = ref->subscriberData;
00615 GrepError::GE_Code err;
00616
00617 Uint32 sendersBlockRef = signal->getSendersBlockRef();
00618 if(sendersBlockRef == SUMA_REF)
00619 {
00620 jam();
00621 err = GrepError::SUBSCRIPTION_ID_SUMA_FAILED_CREATE;
00622 } else {
00623 jam();
00624 ndbrequire(false);
00625
00626
00627 err= GrepError::GE_NO_ERROR;
00628 }
00629
00630 SubCoordinatorPtr subPtr;
00631 c_runningSubscriptions.getPtr(subPtr, subData);
00632 BlockReference repref = subPtr.p->m_subscriberRef;
00633
00634 SubCoordinator sub;
00635 sub.m_subscriberRef = repref;
00636 sub.m_subscriptionId = 0;
00637 sub.m_subscriptionKey = 0;
00638 sendRefToSS(signal,sub, err);
00639
00640 }
00641
00642
00643
00644
00645
00646
00647
00648
00649
00650
00651
00655 void
00656 Grep::PSCoord::execGREP_SUB_CREATE_REQ(Signal* signal)
00657 {
00658 jamEntry();
00659 GrepSubCreateReq const * grepReq = (GrepSubCreateReq *)signal->getDataPtr();
00660 Uint32 subId = grepReq->subscriptionId;
00661 Uint32 subKey = grepReq->subscriptionKey;
00662 Uint32 subType = grepReq->subscriptionType;
00663 BlockReference rep = signal->getSendersBlockRef();
00664
00665 GrepCreateReq * req =(GrepCreateReq*)grepReq;
00666
00667 SubCoordinatorPtr subPtr;
00668
00669 if( !c_subCoordinatorPool.seize(subPtr)) {
00670 jam();
00671 SubCoordinator sub;
00672 sub.m_subscriberRef = rep;
00673 sub.m_subscriptionId = 0;
00674 sub.m_subscriptionKey = 0;
00675 sub.m_outstandingRequest = GSN_GREP_CREATE_REQ;
00676 sendRefToSS(signal, sub, GrepError::NOSPACE_IN_POOL);
00677 return;
00678 }
00679 prepareOperationRec(subPtr,
00680 numberToRef(PSREPBLOCKNO, refToNode(rep)), subId, subKey,
00681 GSN_GREP_CREATE_REQ);
00682
00683
00684
00685 SegmentedSectionPtr selectedTablesPtr;
00686 if(subType == SubCreateReq::SelectiveTableSnapshot) {
00687 jam();
00688 ndbrequire(signal->getNoOfSections()==1);
00689 signal->getSection(selectedTablesPtr,0);
00690 signal->header.m_noOfSections = 0;
00691 }
00695 subPtr.p->m_subscriptionType = subType;
00696 req->senderRef = reference();
00697 req->subscriberRef = numberToRef(PSREPBLOCKNO, refToNode(rep));
00698 req->subscriberData = subPtr.p->m_subscriberData;
00699 req->subscriptionId = subId;
00700 req->subscriptionKey = subKey;
00701 req->subscriptionType = subType;
00702
00703
00704 if(subType == SubCreateReq::SelectiveTableSnapshot) {
00705 jam();
00706 signal->setSection(selectedTablesPtr, 0);
00707 }
00708
00709
00710
00711
00712 NodeReceiverGroup rg(GREP, m_grep->m_aliveNodes);
00713 subPtr.p->m_outstandingParticipants = rg;
00714 sendSignal(rg,
00715 GSN_GREP_CREATE_REQ, signal,
00716 GrepCreateReq::SignalLength, JBB);
00717
00718
00719 #ifdef DEBUG_GREP_SUBSCRIPTION
00720 ndbout_c("Grep::PSCoord: Sent GREP_CREATE_REQ "
00721 "(subId:%d, subKey:%d, subData:%d, subType:%d) to parts",
00722 subId, subKey, subPtr.p->m_subscriberData, subType);
00723 #endif
00724 }
00725
00726 void
00727 Grep::PSPart::execGREP_CREATE_REQ(Signal* signal)
00728 {
00729 jamEntry();
00730 GrepCreateReq * const grepReq = (GrepCreateReq *)signal->getDataPtr();
00731 const Uint32 subId = grepReq->subscriptionId;
00732 const Uint32 subKey = grepReq->subscriptionKey;
00733 const Uint32 subData = grepReq->subscriberData;
00734 const Uint32 subType = grepReq->subscriptionType;
00735 const Uint32 coordinatorRef = grepReq->senderRef;
00736 const Uint32 subRef = grepReq->subscriberRef;
00737
00738
00739
00740 SubscriptionPtr subPtr;
00741 ndbrequire( c_subscriptionPool.seize(subPtr));
00742 subPtr.p->m_coordinatorRef = coordinatorRef;
00743 subPtr.p->m_subscriptionId = subId;
00744 subPtr.p->m_subscriptionKey = subKey;
00745 subPtr.p->m_subscriberRef = subRef;
00746 subPtr.p->m_subscriberData = subPtr.i;
00747 subPtr.p->m_subscriptionType = subType;
00748 subPtr.p->m_outstandingRequest = GSN_GREP_CREATE_REQ;
00749 subPtr.p->m_operationPtrI = subData;
00750
00751 c_subscriptions.add(subPtr);
00752
00753 SegmentedSectionPtr selectedTablesPtr;
00754 if(subType == SubCreateReq::SelectiveTableSnapshot) {
00755 jam();
00756 ndbrequire(signal->getNoOfSections()==1);
00757 signal->getSection(selectedTablesPtr,0);
00758 signal->header.m_noOfSections = 0;
00759 }
00760
00764 SubCreateReq * sumaReq = (SubCreateReq *)grepReq;
00765 sumaReq->subscriberRef = GREP_REF;
00766 sumaReq->subscriberData = subPtr.p->m_subscriberData;
00767 sumaReq->subscriptionId = subPtr.p->m_subscriptionId;
00768 sumaReq->subscriptionKey = subPtr.p->m_subscriptionKey;
00769 sumaReq->subscriptionType = subPtr.p->m_subscriptionType;
00770
00771 if(subType == SubCreateReq::SelectiveTableSnapshot) {
00772 jam();
00773 signal->setSection(selectedTablesPtr, 0);
00774 }
00775 sendSignal(SUMA_REF,
00776 GSN_SUB_CREATE_REQ,
00777 signal,
00778 SubCreateReq::SignalLength,
00779 JBB);
00780 }
00781
00782 void
00783 Grep::PSPart::execSUB_CREATE_CONF(Signal* signal)
00784 {
00785 jamEntry();
00786
00787 SubCreateConf * const conf = (SubCreateConf *)signal->getDataPtr();
00788 Uint32 subData = conf->subscriberData;
00789
00790 SubscriptionPtr subPtr;
00791 c_subscriptions.getPtr(subPtr, subData);
00798 #ifdef DEBUG_GREP_SUBSCRIPTION
00799 ndbout_c("Grep::PSPart: Recd SUB_CREATE_CONF "
00800 "(subId:%d, subKey:%d) from SUMA",
00801 conf->subscriptionId, conf->subscriptionKey);
00802 #endif
00803
00804
00805
00806
00807 GrepCreateConf * grepConf = (GrepCreateConf*)conf;
00808 grepConf->senderNodeId = getOwnNodeId();
00809 grepConf->senderData = subPtr.p->m_operationPtrI;
00810 sendSignal(subPtr.p->m_coordinatorRef, GSN_GREP_CREATE_CONF, signal,
00811 GrepCreateConf::SignalLength, JBB);
00812 subPtr.p->m_outstandingRequest = 0;
00813 }
00814
00821 void
00822 Grep::PSPart::execSUB_CREATE_REF(Signal* signal)
00823 {
00824 jamEntry();
00825 SubCreateRef * const ref = (SubCreateRef *)signal->getDataPtr();
00826 Uint32 subData = ref->subscriberData;
00827 GrepError::GE_Code err = (GrepError::GE_Code)ref->err;
00828 SubscriptionPtr subPtr;
00829 c_subscriptions.getPtr(subPtr, subData);
00830 sendRefToPSCoord(signal, *subPtr.p, err );
00831 subPtr.p->m_outstandingRequest = 0;
00832 }
00833
00834 void
00835 Grep::PSCoord::execGREP_CREATE_CONF(Signal* signal)
00836 {
00837 jamEntry();
00838 GrepCreateConf const * conf = (GrepCreateConf *)signal->getDataPtr();
00839 Uint32 subData = conf->senderData;
00840 Uint32 nodeId = conf->senderNodeId;
00841
00842 SubCoordinatorPtr subPtr;
00843 c_subCoordinatorPool.getPtr(subPtr, subData);
00844
00845 ndbrequire(subPtr.p->m_outstandingRequest == GSN_GREP_CREATE_REQ);
00846
00847 subPtr.p->m_outstandingParticipants.clearWaitingFor(nodeId);
00848
00849 if(!subPtr.p->m_outstandingParticipants.done()) return;
00850
00851
00852
00853 Uint32 subId = subPtr.p->m_subscriptionId;
00854 Uint32 subKey = subPtr.p->m_subscriptionKey;
00855
00856 GrepSubCreateConf * grepConf = (GrepSubCreateConf *)signal->getDataPtr();
00857 grepConf->subscriptionId = subId;
00858 grepConf->subscriptionKey = subKey;
00859 sendSignal(subPtr.p->m_subscriberRef, GSN_GREP_SUB_CREATE_CONF, signal,
00860 GrepSubCreateConf::SignalLength, JBB);
00861
00865 m_grep->sendEventRep(signal,
00866 NDB_LE_GrepSubscriptionInfo,
00867 GrepEvent::GrepPS_SubCreateConf,
00868 subId,
00869 subKey,
00870 (Uint32)GrepError::GE_NO_ERROR);
00871
00872 c_subCoordinatorPool.release(subPtr);
00873
00874 }
00875
00882 void
00883 Grep::PSCoord::execGREP_CREATE_REF(Signal* signal)
00884 {
00885 jamEntry();
00886 GrepCreateRef * const ref = (GrepCreateRef *)signal->getDataPtr();
00887 Uint32 subData = ref->senderData;
00888 Uint32 err = ref->err;
00889 SubCoordinatorPtr subPtr;
00890 c_runningSubscriptions.getPtr(subPtr, subData);
00891
00892 sendRefToSS(signal, *subPtr.p, (GrepError::GE_Code)err );
00893 }
00894
00895
00896
00897
00898
00899
00900
00901
00902
00903
00904
00908 void
00909 Grep::PSCoord::execGREP_SUB_START_REQ(Signal* signal)
00910 {
00911 jamEntry();
00912 GrepSubStartReq * const subReq = (GrepSubStartReq *)signal->getDataPtr();
00913 SubscriptionData::Part part = (SubscriptionData::Part) subReq->part;
00914 Uint32 subId = subReq->subscriptionId;
00915 Uint32 subKey = subReq->subscriptionKey;
00916 BlockReference rep = signal->getSendersBlockRef();
00917
00918 SubCoordinatorPtr subPtr;
00919
00920 if(!c_subCoordinatorPool.seize(subPtr)) {
00921 jam();
00922 SubCoordinator sub;
00923 sub.m_subscriberRef = rep;
00924 sub.m_subscriptionId = 0;
00925 sub.m_subscriptionKey = 0;
00926 sub.m_outstandingRequest = GSN_GREP_START_REQ;
00927 sendRefToSS(signal, sub, GrepError::NOSPACE_IN_POOL);
00928 return;
00929 }
00930
00931 prepareOperationRec(subPtr,
00932 numberToRef(PSREPBLOCKNO, refToNode(rep)),
00933 subId, subKey,
00934 GSN_GREP_START_REQ);
00935
00936 GrepStartReq * const req = (GrepStartReq *) subReq;
00937 req->part = (Uint32) part;
00938 req->subscriptionId = subPtr.p->m_subscriptionId;
00939 req->subscriptionKey = subPtr.p->m_subscriptionKey;
00940 req->senderData = subPtr.p->m_subscriberData;
00941
00942
00943
00944
00945 NodeReceiverGroup rg(GREP, m_grep->m_aliveNodes);
00946 subPtr.p->m_outstandingParticipants = rg;
00947 sendSignal(rg,
00948 GSN_GREP_START_REQ,
00949 signal,
00950 GrepStartReq::SignalLength, JBB);
00951
00952 #ifdef DEBUG_GREP_SUBSCRIPTION
00953 ndbout_c("Grep::PSCoord: Sent GREP_START_REQ "
00954 "(subId:%d, subKey:%d, senderData:%d, part:%d) to all participants",
00955 req->subscriptionId, req->subscriptionKey, req->senderData, part);
00956 #endif
00957 }
00958
00959
00960 void
00961 Grep::PSPart::execGREP_START_REQ(Signal* signal)
00962 {
00963 jamEntry();
00964 GrepStartReq * const grepReq = (GrepStartReq *) signal->getDataPtr();
00965 SubscriptionData::Part part = (SubscriptionData::Part)grepReq->part;
00966 Uint32 subId = grepReq->subscriptionId;
00967 Uint32 subKey = grepReq->subscriptionKey;
00968 Uint32 operationPtrI = grepReq->senderData;
00969
00970 Subscription key;
00971 key.m_subscriptionId = subId;
00972 key.m_subscriptionKey = subKey;
00973 SubscriptionPtr subPtr;
00974 ndbrequire(c_subscriptions.find(subPtr, key));;
00975 subPtr.p->m_outstandingRequest = GSN_GREP_START_REQ;
00976 subPtr.p->m_operationPtrI = operationPtrI;
00980 SubStartReq * sumaReq = (SubStartReq *) grepReq;
00981 sumaReq->subscriptionId = subId;
00982 sumaReq->subscriptionKey = subKey;
00983 sumaReq->subscriberData = subPtr.i;
00984 sumaReq->part = (Uint32) part;
00985
00986 sendSignal(SUMA_REF, GSN_SUB_START_REQ, signal,
00987 SubStartReq::SignalLength, JBB);
00988 #ifdef DEBUG_GREP_SUBSCRIPTION
00989 ndbout_c("Grep::PSPart: Sent SUB_START_REQ (subId:%d, subKey:%d, part:%d)",
00990 subId, subKey, (Uint32)part);
00991 #endif
00992 }
00993
00994
00995 void
00996 Grep::PSPart::execSUB_START_CONF(Signal* signal)
00997 {
00998 jamEntry();
00999
01000 SubStartConf * const conf = (SubStartConf *) signal->getDataPtr();
01001 SubscriptionData::Part part = (SubscriptionData::Part)conf->part;
01002 Uint32 subId = conf->subscriptionId;
01003 Uint32 subKey = conf->subscriptionKey;
01004 Uint32 subData = conf->subscriberData;
01005 Uint32 firstGCI = conf->firstGCI;
01006 #ifdef DEBUG_GREP_SUBSCRIPTION
01007 ndbout_c("Grep::PSPart: Recd SUB_START_CONF "
01008 "(subId:%d, subKey:%d, subData:%d)",
01009 subId, subKey, subData);
01010 #endif
01011
01012 SubscriptionPtr subPtr;
01013 c_subscriptions.getPtr(subPtr, subData);
01014 ndbrequire(subPtr.p->m_subscriptionId == subId);
01015 ndbrequire(subPtr.p->m_subscriptionKey == subKey);
01016
01017 GrepStartConf * grepConf = (GrepStartConf *)conf;
01018 grepConf->senderData = subPtr.p->m_operationPtrI;
01019 grepConf->part = (Uint32) part;
01020 grepConf->subscriptionKey = subKey;
01021 grepConf->subscriptionId = subId;
01022 grepConf->firstGCI = firstGCI;
01023 grepConf->senderNodeId = getOwnNodeId();
01024 sendSignal(subPtr.p->m_coordinatorRef, GSN_GREP_START_CONF, signal,
01025 GrepStartConf::SignalLength, JBB);
01026 subPtr.p->m_outstandingRequest = 0;
01027
01028 #ifdef DEBUG_GREP_SUBSCRIPTION
01029 ndbout_c("Grep::PSPart: Sent GREP_START_CONF "
01030 "(subId:%d, subKey:%d, subData:%d, part:%d)",
01031 subId, subKey, subData, part);
01032 #endif
01033 }
01034
01035
01044 void
01045 Grep::PSPart::execSUB_START_REF(Signal* signal)
01046 {
01047 SubStartRef * const ref = (SubStartRef *)signal->getDataPtr();
01048 Uint32 subData = ref->subscriberData;
01049 GrepError::GE_Code err = (GrepError::GE_Code)ref->err;
01050 SubscriptionData::Part part = (SubscriptionData::Part)ref->part;
01051 SubscriptionPtr subPtr;
01052 c_subscriptions.getPtr(subPtr, subData);
01053 sendRefToPSCoord(signal, *subPtr.p, err , part);
01054 subPtr.p->m_outstandingRequest = 0;
01055 }
01056
01057
01061 void
01062 Grep::PSCoord::execGREP_START_CONF(Signal* signal)
01063 {
01064 jamEntry();
01065
01066 GrepStartConf * const conf = (GrepStartConf *) signal->getDataPtr();
01067 Uint32 subData = conf->senderData;
01068 SubscriptionData::Part part = (SubscriptionData::Part)conf->part;
01069 Uint32 subId = conf->subscriptionId;
01070 Uint32 subKey = conf->subscriptionKey;
01071 Uint32 firstGCI = conf->firstGCI;
01072
01073 SubCoordinatorPtr subPtr;
01074 c_subCoordinatorPool.getPtr(subPtr, subData);
01075 ndbrequire(subPtr.p->m_outstandingRequest == GSN_GREP_START_REQ);
01076
01077 subPtr.p->m_outstandingParticipants.clearWaitingFor(conf->senderNodeId);
01078
01079 if(!subPtr.p->m_outstandingParticipants.done()) return;
01080 jam();
01081
01082
01083
01084
01085 GrepSubStartConf * grepConf = (GrepSubStartConf *) conf;
01086 grepConf->part = part;
01087 grepConf->subscriptionId = subId;
01088 grepConf->subscriptionKey = subKey;
01089 grepConf->firstGCI = firstGCI;
01090
01091 bool ok = false;
01092 switch(part) {
01093 case SubscriptionData::MetaData:
01094 ok = true;
01095 sendSignal(subPtr.p->m_subscriberRef, GSN_GREP_SUB_START_CONF, signal,
01096 GrepSubStartConf::SignalLength, JBB);
01097
01101 m_grep->sendEventRep(signal,
01102 NDB_LE_GrepSubscriptionInfo,
01103 GrepEvent::GrepPS_SubStartMetaConf,
01104 subId, subKey,
01105 (Uint32)GrepError::GE_NO_ERROR);
01106
01107 c_subCoordinatorPool.release(subPtr);
01108 break;
01109 case SubscriptionData::TableData:
01110 ok = true;
01111 sendSignal(subPtr.p->m_subscriberRef, GSN_GREP_SUB_START_CONF, signal,
01112 GrepSubStartConf::SignalLength, JBB);
01113
01117 m_grep->sendEventRep(signal,
01118 NDB_LE_GrepSubscriptionInfo,
01119 GrepEvent::GrepPS_SubStartDataConf,
01120 subId, subKey,
01121 (Uint32)GrepError::GE_NO_ERROR);
01122
01123
01124 c_subCoordinatorPool.release(subPtr);
01125 break;
01126 }
01127 ndbrequire(ok);
01128
01129 #ifdef DEBUG_GREP_SUBSCRIPTION
01130 ndbout_c("Grep::PSCoord: Recd SUB_START_CONF (subId:%d, subKey:%d, part:%d) "
01131 "from all slaves",
01132 subId, subKey, (Uint32)part);
01133 #endif
01134 }
01135
01142 void
01143 Grep::PSCoord::execGREP_START_REF(Signal* signal)
01144 {
01145 jamEntry();
01146 GrepStartRef * const ref = (GrepStartRef *)signal->getDataPtr();
01147 Uint32 subData = ref->senderData;
01148 GrepError::GE_Code err = (GrepError::GE_Code)ref->err;
01149 SubscriptionData::Part part = (SubscriptionData::Part)ref->part;
01150
01151 SubCoordinatorPtr subPtr;
01152 c_runningSubscriptions.getPtr(subPtr, subData);
01153 sendRefToSS(signal, *subPtr.p, err , part);
01154 }
01155
01156
01157
01158
01159
01160
01161
01162
01163
01164
01165
01166
01167
01168
01169
01173 void
01174 Grep::PSCoord::execGREP_SUB_REMOVE_REQ(Signal* signal)
01175 {
01176 jamEntry();
01177 GrepSubRemoveReq * const subReq = (GrepSubRemoveReq *)signal->getDataPtr();
01178 Uint32 subId = subReq->subscriptionId;
01179 Uint32 subKey = subReq->subscriptionKey;
01180 BlockReference rep = signal->getSendersBlockRef();
01181
01182 SubCoordinatorPtr subPtr;
01183 if( !c_subCoordinatorPool.seize(subPtr)) {
01184 jam();
01185 SubCoordinator sub;
01186 sub.m_subscriberRef = rep;
01187 sub.m_subscriptionId = 0;
01188 sub.m_subscriptionKey = 0;
01189 sub.m_outstandingRequest = GSN_GREP_REMOVE_REQ;
01190 sendRefToSS(signal, sub, GrepError::NOSPACE_IN_POOL);
01191 return;
01192 }
01193
01194
01195 prepareOperationRec(subPtr,
01196 numberToRef(PSREPBLOCKNO, refToNode(rep)),
01197 subId, subKey,
01198 GSN_GREP_REMOVE_REQ);
01199
01200 c_runningSubscriptions.add(subPtr);
01201
01202 GrepRemoveReq * req = (GrepRemoveReq *) subReq;
01203 req->subscriptionId = subPtr.p->m_subscriptionId;
01204 req->subscriptionKey = subPtr.p->m_subscriptionKey;
01205 req->senderData = subPtr.p->m_subscriberData;
01206 req->senderRef = subPtr.p->m_coordinatorRef;
01207
01208
01209
01210
01211 NodeReceiverGroup rg(GREP, m_grep->m_aliveNodes);
01212 subPtr.p->m_outstandingParticipants = rg;
01213 sendSignal(rg,
01214 GSN_GREP_REMOVE_REQ, signal,
01215 GrepRemoveReq::SignalLength, JBB);
01216 }
01217
01218
01219 void
01220 Grep::PSPart::execGREP_REMOVE_REQ(Signal* signal)
01221 {
01222 jamEntry();
01223 GrepRemoveReq * const grepReq = (GrepRemoveReq *) signal->getDataPtr();
01224 Uint32 subId = grepReq->subscriptionId;
01225 Uint32 subKey = grepReq->subscriptionKey;
01226 Uint32 subData = grepReq->senderData;
01227 Uint32 coordinator = grepReq->senderRef;
01228
01229 Subscription key;
01230 key.m_subscriptionId = subId;
01231 key.m_subscriptionKey = subKey;
01232 SubscriptionPtr subPtr;
01233
01234 if(!c_subscriptions.find(subPtr, key))
01235 {
01240 GrepRemoveConf * grepConf = (GrepRemoveConf *)grepReq;
01241 grepConf->subscriptionKey = subKey;
01242 grepConf->subscriptionId = subId;
01243 grepConf->senderData = subData;
01244 grepConf->senderNodeId = getOwnNodeId();
01245 sendSignal(coordinator, GSN_GREP_REMOVE_CONF, signal,
01246 GrepRemoveConf::SignalLength, JBB);
01247 return;
01248 }
01249
01250 subPtr.p->m_operationPtrI = subData;
01251 subPtr.p->m_coordinatorRef = coordinator;
01252 subPtr.p->m_outstandingRequest = GSN_GREP_REMOVE_REQ;
01253
01257 SubRemoveReq * sumaReq = (SubRemoveReq *) grepReq;
01258 sumaReq->subscriptionId = subId;
01259 sumaReq->subscriptionKey = subKey;
01260 sumaReq->senderData = subPtr.i;
01261 sendSignal(SUMA_REF, GSN_SUB_REMOVE_REQ, signal,
01262 SubStartReq::SignalLength, JBB);
01263 }
01264
01265
01269 void
01270 Grep::PSPart::execSUB_REMOVE_CONF(Signal* signal)
01271 {
01272 jamEntry();
01273 SubRemoveConf * const conf = (SubRemoveConf *) signal->getDataPtr();
01274 Uint32 subId = conf->subscriptionId;
01275 Uint32 subKey = conf->subscriptionKey;
01276 Uint32 subData = conf->subscriberData;
01277
01278 SubscriptionPtr subPtr;
01279 c_subscriptions.getPtr(subPtr, subData);
01280 ndbrequire(subPtr.p->m_subscriptionId == subId);
01281 ndbrequire(subPtr.p->m_subscriptionKey == subKey);
01282 subPtr.p->m_outstandingRequest = 0;
01283 GrepRemoveConf * grepConf = (GrepRemoveConf *)conf;
01284 grepConf->subscriptionKey = subKey;
01285 grepConf->subscriptionId = subId;
01286 grepConf->senderData = subPtr.p->m_operationPtrI;
01287 grepConf->senderNodeId = getOwnNodeId();
01288 sendSignal(subPtr.p->m_coordinatorRef, GSN_GREP_REMOVE_CONF, signal,
01289 GrepRemoveConf::SignalLength, JBB);
01290 c_subscriptions.release(subPtr);
01291
01292 }
01293
01294
01298 void
01299 Grep::PSPart::execSUB_REMOVE_REF(Signal* signal)
01300 {
01301 jamEntry();
01302 SubRemoveRef * const ref = (SubRemoveRef *)signal->getDataPtr();
01303 Uint32 subData = ref->subscriberData;
01304
01305 SubscriptionPtr subPtr;
01306 c_subscriptions.getPtr(subPtr, subData);
01307
01308
01309 }
01310
01311
01315 void
01316 Grep::PSCoord::execGREP_REMOVE_CONF(Signal* signal)
01317 {
01318 jamEntry();
01319 GrepRemoveConf * const conf = (GrepRemoveConf *) signal->getDataPtr();
01320 Uint32 subId = conf->subscriptionId;
01321 Uint32 subKey = conf->subscriptionKey;
01322 Uint32 senderNodeId = conf->senderNodeId;
01323 Uint32 subData = conf->senderData;
01324 SubCoordinatorPtr subPtr;
01325 c_subCoordinatorPool.getPtr(subPtr, subData);
01326
01327 ndbrequire(subPtr.p->m_outstandingRequest == GSN_GREP_REMOVE_REQ);
01328
01329 subPtr.p->m_outstandingParticipants.clearWaitingFor(senderNodeId);
01330
01331 if(!subPtr.p->m_outstandingParticipants.done()) {
01332 jam();
01333 return;
01334 }
01335 jam();
01336
01337
01338
01339
01340
01341 m_grep->sendEventRep(signal,
01342 NDB_LE_GrepSubscriptionInfo,
01343 GrepEvent::GrepPS_SubRemoveConf,
01344 subId, subKey,
01345 GrepError::GE_NO_ERROR);
01346
01347 GrepSubRemoveConf * grepConf = (GrepSubRemoveConf *) conf;
01348 grepConf->subscriptionId = subId;
01349 grepConf->subscriptionKey = subKey;
01350 sendSignal(subPtr.p->m_subscriberRef, GSN_GREP_SUB_REMOVE_CONF, signal,
01351 GrepSubRemoveConf::SignalLength, JBB);
01352
01353 c_subCoordinatorPool.release(subPtr);
01354 }
01355
01356
01357
01358 void
01359 Grep::PSCoord::execGREP_REMOVE_REF(Signal* signal)
01360 {
01361 jamEntry();
01362 GrepRemoveRef * const ref = (GrepRemoveRef *)signal->getDataPtr();
01363 Uint32 subData = ref->senderData;
01364 Uint32 err = ref->err;
01365 SubCoordinatorPtr subPtr;
01366
01371 for( c_runningSubscriptions.first(c_subPtr);
01372 !c_subPtr.isNull(); c_runningSubscriptions.next(c_subPtr)) {
01373 jam();
01374 subPtr.i = c_subPtr.curr.i;
01375 subPtr.p = c_runningSubscriptions.getPtr(subPtr.i);
01376 if(subData == subPtr.i)
01377 {
01378 sendRefToSS(signal, *subPtr.p, (GrepError::GE_Code)err );
01379 c_runningSubscriptions.release(subPtr);
01380 return;
01381 }
01382 }
01383 return;
01384 }
01385
01386
01387
01388
01389
01390
01391
01392
01393
01394
01395
01396
01397
01398
01399
01400 void
01401 Grep::PSPart::execSUB_META_DATA(Signal* signal)
01402 {
01403 jamEntry();
01404 if(m_recoveryMode) {
01405 jam();
01406 return;
01407 }
01411 SubMetaData * data = (SubMetaData *) signal->getDataPtrSend();
01412 SubscriptionPtr subPtr;
01413 c_subscriptions.getPtr(subPtr, data->subscriberData);
01414
01415
01416
01417
01418 sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_META_DATA, signal,
01419 SubMetaData::SignalLength, JBB);
01420 #ifdef DEBUG_GREP_SUBSCRIPTION
01421 ndbout_c("Grep::PSPart: Sent SUB_META_DATA to REP "
01422 "(TableId: %d, SenderData: %d, GCI: %d)",
01423 data->tableId, data->senderData, data->gci);
01424 #endif
01425 }
01426
01430 void
01431 Grep::PSPart::execSUB_TABLE_DATA(Signal* signal)
01432 {
01433 jamEntry();
01434 if(m_recoveryMode) {
01435 jam();
01436 return;
01437 }
01438 ndbrequire(m_repRef!=0);
01439
01440 if(!assembleFragments(signal)) { jam(); return; }
01441
01445 if(signal->getNoOfSections() == 2)
01446 {
01447 jam();
01451 if(m_firstScanGCI == 1 && m_lastScanGCI == 0) {
01452 m_firstScanGCI = m_latestSeenGCI;
01453 m_lastScanGCI = m_latestSeenGCI;
01454 }
01455 SubTableData * data = (SubTableData*)signal->getDataPtrSend();
01456 Uint32 subData = data->senderData;
01457 data->gci = m_latestSeenGCI;
01458 data->logType = SubTableData::SCAN;
01459
01460 SubscriptionPtr subPtr;
01461 c_subscriptions.getPtr(subPtr, subData);
01462 sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_TABLE_DATA, signal,
01463 SubTableData::SignalLength, JBB);
01464 #ifdef DEBUG_GREP
01465 ndbout_c("Grep::PSPart: Sent SUB_TABLE_DATA (Scan, GCI: %d)",
01466 data->gci);
01467 #endif
01468 }
01469 else
01470 {
01471 jam();
01475 SubTableData * data = (SubTableData*)signal->getDataPtrSend();
01476 data->logType = SubTableData::LOG;
01477 Uint32 subData = data->senderData;
01478 if (data->gci > m_latestSeenGCI) m_latestSeenGCI = data->gci;
01479
01480
01481 LinearSectionPtr ptr[3];
01482 ptr[0].p = signal->theData + 25;
01483 ptr[0].sz = data->noOfAttributes;
01484 ptr[1].p = signal->theData + 25 + MAX_ATTRIBUTES_IN_TABLE;
01485 ptr[1].sz = data->dataSize;
01486
01487 SubscriptionPtr subPtr;
01488 c_subscriptions.getPtr(subPtr, subData);
01489 sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_TABLE_DATA,
01490 signal, SubTableData::SignalLength, JBB, ptr, 2);
01491 #ifdef DEBUG_GREP
01492 ndbout_c("Grep::PSPart: Sent SUB_TABLE_DATA (Log, GCI: %d)",
01493 data->gci);
01494 #endif
01495 }
01496 }
01497
01498
01499
01500
01501
01502
01503
01504
01505
01506
01510 void
01511 Grep::PSCoord::execGREP_SUB_SYNC_REQ(Signal* signal)
01512 {
01513 jamEntry();
01514 GrepSubSyncReq * const subReq = (GrepSubSyncReq*)signal->getDataPtr();
01515 SubscriptionData::Part part = (SubscriptionData::Part) subReq->part;
01516 Uint32 subId = subReq->subscriptionId;
01517 Uint32 subKey = subReq->subscriptionKey;
01518 BlockReference rep = signal->getSendersBlockRef();
01519
01520 SubCoordinatorPtr subPtr;
01521 if( !c_subCoordinatorPool.seize(subPtr)) {
01522 jam();
01523 SubCoordinator sub;
01524 sub.m_subscriberRef = rep;
01525 sub.m_subscriptionId = 0;
01526 sub.m_subscriptionKey = 0;
01527 sub.m_outstandingRequest = GSN_GREP_SYNC_REQ;
01528 sendRefToSS(signal, sub, GrepError::NOSPACE_IN_POOL);
01529 return;
01530 }
01531
01532 prepareOperationRec(subPtr,
01533 numberToRef(PSREPBLOCKNO, refToNode(rep)),
01534 subId, subKey,
01535 GSN_GREP_SYNC_REQ);
01536
01537 GrepSyncReq * req = (GrepSyncReq *)subReq;
01538 req->subscriptionId = subPtr.p->m_subscriptionId;
01539 req->subscriptionKey = subPtr.p->m_subscriptionKey;
01540 req->senderData = subPtr.p->m_subscriberData;
01541 req->part = (Uint32)part;
01542
01543
01544
01545
01546 NodeReceiverGroup rg(GREP, m_grep->m_aliveNodes);
01547 subPtr.p->m_outstandingParticipants = rg;
01548 sendSignal(rg,
01549 GSN_GREP_SYNC_REQ, signal, GrepSyncReq::SignalLength, JBB);
01550 }
01551
01552
01556 void
01557 Grep::PSPart::execGREP_SYNC_REQ(Signal* signal)
01558 {
01559 jamEntry();
01560
01561 GrepSyncReq * const grepReq = (GrepSyncReq *) signal->getDataPtr();
01562 Uint32 part = grepReq->part;
01563 Uint32 subId = grepReq->subscriptionId;
01564 Uint32 subKey = grepReq->subscriptionKey;
01565 Uint32 subData = grepReq->senderData;
01566
01567 Subscription key;
01568 key.m_subscriptionId = subId;
01569 key.m_subscriptionKey = subKey;
01570 SubscriptionPtr subPtr;
01571 ndbrequire(c_subscriptions.find(subPtr, key));
01572 subPtr.p->m_operationPtrI = subData;
01573 subPtr.p->m_outstandingRequest = GSN_GREP_SYNC_REQ;
01574
01575
01576
01577 SubSyncReq * sumaReq = (SubSyncReq *)grepReq;
01578 sumaReq->subscriptionId = subId;
01579 sumaReq->subscriptionKey = subKey;
01580 sumaReq->subscriberData = subPtr.i;
01581 sumaReq->part = part;
01582 sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ, signal,
01583 SubSyncReq::SignalLength, JBB);
01584 }
01585
01586
01590 void
01591 Grep::PSPart::execSUB_SYNC_CONF(Signal* signal)
01592 {
01593 jamEntry();
01594
01595 SubSyncConf * const conf = (SubSyncConf *) signal->getDataPtr();
01596 Uint32 part = conf->part;
01597 Uint32 subId = conf->subscriptionId;
01598 Uint32 subKey = conf->subscriptionKey;
01599 Uint32 subData = conf->subscriberData;
01600
01601 SubscriptionPtr subPtr;
01602 c_subscriptions.getPtr(subPtr, subData);
01603
01604 ndbrequire(subPtr.p->m_subscriptionId == subId);
01605 ndbrequire(subPtr.p->m_subscriptionKey == subKey);
01606
01607 GrepSyncConf * grepConf = (GrepSyncConf *)conf;
01608 grepConf->senderNodeId = getOwnNodeId();
01609 grepConf->part = part;
01610 grepConf->firstGCI = m_firstScanGCI;
01611 grepConf->lastGCI = m_lastScanGCI;
01612 grepConf->subscriptionId = subId;
01613 grepConf->subscriptionKey = subKey;
01614 grepConf->senderData = subPtr.p->m_operationPtrI;
01615 sendSignal(subPtr.p->m_coordinatorRef, GSN_GREP_SYNC_CONF, signal,
01616 GrepSyncConf::SignalLength, JBB);
01617
01618 m_firstScanGCI = 1;
01619 m_lastScanGCI = 0;
01620 subPtr.p->m_outstandingRequest = 0;
01621 }
01622
01631 void
01632 Grep::PSPart::execSUB_SYNC_REF(Signal* signal) {
01633 jamEntry();
01634 SubSyncRef * const ref = (SubSyncRef *)signal->getDataPtr();
01635 Uint32 subData = ref->subscriberData;
01636 GrepError::GE_Code err = (GrepError::GE_Code)ref->err;
01637 SubscriptionData::Part part = (SubscriptionData::Part)ref->part;
01638
01639 SubscriptionPtr subPtr;
01640 c_subscriptions.getPtr(subPtr, subData);
01641 sendRefToPSCoord(signal, *subPtr.p, err ,part);
01642 subPtr.p->m_outstandingRequest = 0;
01643 }
01644
01648 void
01649 Grep::PSCoord::execGREP_SYNC_CONF(Signal* signal)
01650 {
01651 jamEntry();
01652
01653 GrepSyncConf const * conf = (GrepSyncConf *)signal->getDataPtr();
01654 Uint32 part = conf->part;
01655 Uint32 firstGCI = conf->firstGCI;
01656 Uint32 lastGCI = conf->lastGCI;
01657 Uint32 subId = conf->subscriptionId;
01658 Uint32 subKey = conf->subscriptionKey;
01659 Uint32 subData = conf->senderData;
01660
01661 SubCoordinatorPtr subPtr;
01662 c_subCoordinatorPool.getPtr(subPtr, subData);
01663 ndbrequire(subPtr.p->m_outstandingRequest == GSN_GREP_SYNC_REQ);
01664
01665 subPtr.p->m_outstandingParticipants.clearWaitingFor(conf->senderNodeId);
01666 if(!subPtr.p->m_outstandingParticipants.done()) return;
01667
01671 GrepEvent::Subscription event;
01672 if(part == SubscriptionData::MetaData)
01673 event = GrepEvent::GrepPS_SubSyncMetaConf;
01674 else
01675 event = GrepEvent::GrepPS_SubSyncDataConf;
01676
01677
01678 m_grep->sendEventRep(signal, NDB_LE_GrepSubscriptionInfo,
01679 event, subId, subKey,
01680 (Uint32)GrepError::GE_NO_ERROR,
01681 lastGCI);
01682
01683
01684
01685
01686 GrepSubSyncConf * grepConf = (GrepSubSyncConf *)conf;
01687 grepConf->part = part;
01688 grepConf->firstGCI = firstGCI;
01689 grepConf->lastGCI = lastGCI;
01690 grepConf->subscriptionId = subId;
01691 grepConf->subscriptionKey = subKey;
01692
01693 sendSignal(subPtr.p->m_subscriberRef, GSN_GREP_SUB_SYNC_CONF, signal,
01694 GrepSubSyncConf::SignalLength, JBB);
01695 c_subCoordinatorPool.release(subPtr);
01696 }
01697
01704 void
01705 Grep::PSCoord::execGREP_SYNC_REF(Signal* signal) {
01706 jamEntry();
01707 GrepSyncRef * const ref = (GrepSyncRef *)signal->getDataPtr();
01708 Uint32 subData = ref->senderData;
01709 SubscriptionData::Part part = (SubscriptionData::Part)ref->part;
01710 GrepError::GE_Code err = (GrepError::GE_Code)ref->err;
01711 SubCoordinatorPtr subPtr;
01712 c_runningSubscriptions.getPtr(subPtr, subData);
01713 sendRefToSS(signal, *subPtr.p, err , part);
01714 }
01715
01716
01717
01718 void
01719 Grep::PSCoord::sendRefToSS(Signal * signal,
01720 SubCoordinator sub,
01721 GrepError::GE_Code err,
01722 SubscriptionData::Part part) {
01734 jam();
01735 GrepEvent::Subscription event;
01736 switch(sub.m_outstandingRequest) {
01737 case GSN_GREP_CREATE_SUBID_REQ:
01738 {
01739 jam();
01740 CreateSubscriptionIdRef * ref =
01741 (CreateSubscriptionIdRef*)signal->getDataPtrSend();
01742 ref->err = (Uint32)err;
01743 ref->subscriptionId = sub.m_subscriptionId;
01744 ref->subscriptionKey = sub.m_subscriptionKey;
01745 sendSignal(sub.m_subscriberRef,
01746 GSN_GREP_CREATE_SUBID_REF,
01747 signal,
01748 CreateSubscriptionIdRef::SignalLength,
01749 JBB);
01750 event = GrepEvent::GrepPS_CreateSubIdRef;
01751 }
01752 break;
01753 case GSN_GREP_CREATE_REQ:
01754 {
01755 jam();
01756 GrepSubCreateRef * ref = (GrepSubCreateRef*)signal->getDataPtrSend();
01757 ref->err = (Uint32)err;
01758 ref->subscriptionId = sub.m_subscriptionId;
01759 ref->subscriptionKey = sub.m_subscriptionKey;
01760 sendSignal(sub.m_subscriberRef, GSN_GREP_SUB_CREATE_REF, signal,
01761 GrepSubCreateRef::SignalLength, JBB);
01762 event = GrepEvent::GrepPS_SubCreateRef;
01763 }
01764 break;
01765 case GSN_GREP_SYNC_REQ:
01766 {
01767 jam();
01768 GrepSubSyncRef * ref = (GrepSubSyncRef*)signal->getDataPtrSend();
01769 ref->err = (Uint32)err;
01770 ref->subscriptionId = sub.m_subscriptionId;
01771 ref->subscriptionKey = sub.m_subscriptionKey;
01772 ref->part = (SubscriptionData::Part) part;
01773 sendSignal(sub.m_subscriberRef,
01774 GSN_GREP_SUB_SYNC_REF,
01775 signal,
01776 GrepSubSyncRef::SignalLength,
01777 JBB);
01778 if(part == SubscriptionData::MetaData)
01779 event = GrepEvent::GrepPS_SubSyncMetaRef;
01780 else
01781 event = GrepEvent::GrepPS_SubSyncDataRef;
01782 }
01783 break;
01784 case GSN_GREP_START_REQ:
01785 {
01786 jam();
01787 GrepSubStartRef * ref = (GrepSubStartRef*)signal->getDataPtrSend();
01788 ref->err = (Uint32)err;
01789 ref->subscriptionId = sub.m_subscriptionId;
01790 ref->subscriptionKey = sub.m_subscriptionKey;
01791
01792 sendSignal(sub.m_subscriberRef, GSN_GREP_SUB_START_REF,
01793 signal, GrepSubStartRef::SignalLength, JBB);
01794 if(part == SubscriptionData::MetaData)
01795 event = GrepEvent::GrepPS_SubStartMetaRef;
01796 else
01797 event = GrepEvent::GrepPS_SubStartDataRef;
01801 m_grep->sendEventRep(signal,
01802 NDB_LE_GrepSubscriptionAlert,
01803 event,
01804 sub.m_subscriptionId,
01805 sub.m_subscriptionKey,
01806 (Uint32)err);
01807 }
01808 break;
01809 case GSN_GREP_REMOVE_REQ:
01810 {
01811 jam();
01812 GrepSubRemoveRef * ref = (GrepSubRemoveRef*)signal->getDataPtrSend();
01813 ref->subscriptionId = sub.m_subscriptionId;
01814 ref->subscriptionKey = sub.m_subscriptionKey;
01815 ref->err = (Uint32)err;
01816
01817 sendSignal(sub.m_subscriberRef,
01818 GSN_GREP_SUB_REMOVE_REF,
01819 signal,
01820 GrepSubRemoveRef::SignalLength,
01821 JBB);
01822
01823 event = GrepEvent::GrepPS_SubRemoveRef;
01824 }
01825 break;
01826 default:
01827 ndbrequire(false);
01828 event= GrepEvent::Rep_Disconnect;
01829 }
01833 m_grep->sendEventRep(signal,
01834 NDB_LE_GrepSubscriptionAlert,
01835 event,
01836 sub.m_subscriptionId,
01837 sub.m_subscriptionKey,
01838 err);
01839
01840 }
01841
01842
01843 void
01844 Grep::PSPart::sendRefToPSCoord(Signal * signal,
01845 Subscription sub,
01846 GrepError::GE_Code err,
01847 SubscriptionData::Part part) {
01848
01849 jam();
01850 GrepEvent::Subscription event;
01851 switch(sub.m_outstandingRequest) {
01852
01853 case GSN_GREP_CREATE_REQ:
01854 {
01855 GrepCreateRef * ref = (GrepCreateRef*)signal->getDataPtrSend();
01856 ref->senderData = sub.m_subscriberData;
01857 ref->subscriptionId = sub.m_subscriptionId;
01858 ref->subscriptionKey = sub.m_subscriptionKey;
01859 ref->err = err;
01860 sendSignal(sub.m_coordinatorRef, GSN_GREP_CREATE_REF, signal,
01861 GrepCreateRef::SignalLength, JBB);
01862
01863 event = GrepEvent::GrepPS_SubCreateRef;
01864 }
01865 break;
01866 case GSN_GREP_SYNC_REQ:
01867 {
01868 GrepSyncRef * ref = (GrepSyncRef*)signal->getDataPtrSend();
01869 ref->senderData = sub.m_subscriberData;
01870 ref->subscriptionId = sub.m_subscriptionId;
01871 ref->subscriptionKey = sub.m_subscriptionKey;
01872 ref->part = part;
01873 ref->err = err;
01874 sendSignal(sub.m_coordinatorRef,
01875 GSN_GREP_SYNC_REF, signal,
01876 GrepSyncRef::SignalLength, JBB);
01877 if(part == SubscriptionData::MetaData)
01878 event = GrepEvent::GrepPS_SubSyncMetaRef;
01879 else
01880 event = GrepEvent::GrepPS_SubSyncDataRef;
01881 }
01882 break;
01883 case GSN_GREP_START_REQ:
01884 {
01885 jam();
01886 GrepStartRef * ref = (GrepStartRef*)signal->getDataPtrSend();
01887 ref->senderData = sub.m_subscriberData;
01888 ref->subscriptionId = sub.m_subscriptionId;
01889 ref->subscriptionKey = sub.m_subscriptionKey;
01890 ref->part = (Uint32) part;
01891 ref->err = err;
01892 sendSignal(sub.m_coordinatorRef, GSN_GREP_START_REF, signal,
01893 GrepStartRef::SignalLength, JBB);
01894 if(part == SubscriptionData::MetaData)
01895 event = GrepEvent::GrepPS_SubStartMetaRef;
01896 else
01897 event = GrepEvent::GrepPS_SubStartDataRef;
01898 }
01899 break;
01900
01901 case GSN_GREP_REMOVE_REQ:
01902 {
01903 jamEntry();
01904 GrepRemoveRef * ref = (GrepRemoveRef*)signal->getDataPtrSend();
01905 ref->senderData = sub.m_operationPtrI;
01906 ref->subscriptionId = sub.m_subscriptionId;
01907 ref->subscriptionKey = sub.m_subscriptionKey;
01908 ref->err = err;
01909 sendSignal(sub.m_coordinatorRef, GSN_GREP_REMOVE_REF, signal,
01910 GrepCreateRef::SignalLength, JBB);
01911
01912 }
01913 break;
01914 default:
01915 ndbrequire(false);
01916 event= GrepEvent::Rep_Disconnect;
01917 }
01918
01922 m_grep->sendEventRep(signal,
01923 NDB_LE_GrepSubscriptionAlert,
01924 event,
01925 sub.m_subscriptionId,
01926 sub.m_subscriptionKey,
01927 err);
01928
01929 }
01930
01931
01932
01933
01934
01935
01936
01937
01938
01939 void
01940 Grep::PSPart::execSUB_GCP_COMPLETE_REP(Signal* signal)
01941 {
01942 jamEntry();
01943 if(m_recoveryMode) {
01944 jam();
01945 return;
01946 }
01947 SubGcpCompleteRep * rep = (SubGcpCompleteRep *)signal->getDataPtrSend();
01948 rep->senderRef = reference();
01949
01950 if (rep->gci > m_latestSeenGCI) m_latestSeenGCI = rep->gci;
01951 SubscriptionPtr subPtr;
01952 c_subscriptions.first(c_subPtr);
01953 for(; !c_subPtr.isNull(); c_subscriptions.next(c_subPtr)) {
01954
01955 subPtr.i = c_subPtr.curr.i;
01956 subPtr.p = c_subscriptions.getPtr(subPtr.i);
01957 sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_GCP_COMPLETE_REP, signal,
01958 SubGcpCompleteRep::SignalLength, JBB);
01959 }
01960
01961 #ifdef DEBUG_GREP
01962 ndbout_c("Grep::PSPart: Recd SUB_GCP_COMPLETE_REP "
01963 "(GCI: %d, nodeId: %d) from SUMA",
01964 rep->gci, refToNode(rep->senderRef));
01965 #endif
01966 }
01967
01968
01969 void
01970 Grep::PSPart::execSUB_SYNC_CONTINUE_REQ(Signal* signal)
01971 {
01972 jamEntry();
01973 SubSyncContinueReq * const req = (SubSyncContinueReq*)signal->getDataPtr();
01974 Uint32 subData = req->subscriberData;
01975
01976 SubscriptionPtr subPtr;
01977 c_subscriptions.getPtr(subPtr,subData);
01978
01982 SubSyncContinueConf * conf = (SubSyncContinueConf*)req;
01983 conf->subscriptionId = subPtr.p->m_subscriptionId;
01984 conf->subscriptionKey = subPtr.p->m_subscriptionKey;
01985 sendSignal(SUMA_REF, GSN_SUB_SYNC_CONTINUE_CONF, signal,
01986 SubSyncContinueConf::SignalLength, JBB);
01987 }
01988
01989 void
01990 Grep::sendEventRep(Signal * signal,
01991 Ndb_logevent_type type,
01992 GrepEvent::Subscription event,
01993 Uint32 subId,
01994 Uint32 subKey,
01995 Uint32 err,
01996 Uint32 other) {
01997 jam();
01998 signal->theData[0] = type;
01999 signal->theData[1] = event;
02000 signal->theData[2] = subId;
02001 signal->theData[3] = subKey;
02002 signal->theData[4] = err;
02003
02004 if(other==0)
02005 sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 5 ,JBB);
02006 else {
02007 signal->theData[5] = other;
02008 sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 6 ,JBB);
02009 }
02010 }