#define bufTrigger 2 #define bufFactor 2 #define wrap 9 #define queueSize 2 mtype = {cp,data,resume,resume_ok}; typedef dataMsg { mtype type; byte cpId; byte cpPos; } typedef ctrlMsg { mtype type; chan addr; byte cpId; } byte ackedId[2]; byte ackedSent[2]; byte ackedRecd[2]; chan recvCtrl[2]; init { /* control channels */ chan point1CtrlRecv[2] = [0] of {ctrlMsg}; chan point2CtrlRecv[2] = [0] of {ctrlMsg}; recvCtrl[0] = point1CtrlRecv[0]; recvCtrl[1] = point2CtrlRecv[0]; /* data channels */ chan point1DataRecv = [queueSize] of {dataMsg}; chan point2DataRecv = [queueSize] of {dataMsg}; /* nameserver channels */ chan point1Name = [0] of {chan}; chan point2Name = [0] of {chan}; /* move channels */ chan point1Move = [0] of {chan}; chan point2Move = [0] of {chan}; chan openData = [0] of {mtype}; /* movement indicators */ byte point1Idx = 1; byte point2Idx = 1; /* start processes */ run Nameserver(point1Name, point2Name, point1CtrlRecv[0], point2CtrlRecv[0]); run Initiator(point1Move, point1Name, point2CtrlRecv[0], point1DataRecv, point2DataRecv, openData); run Noninitiator(point2Move, point2Name, point1CtrlRecv[0], point2DataRecv, point1DataRecv, openData); do :: point1Move!point1CtrlRecv[point1Idx]; point1Idx = (point1Idx + 1) % 2; :: point2Move!point2CtrlRecv[point2Idx]; point2Idx = (point2Idx + 1) % 2; od; } proctype Nameserver(chan ns1, ns2, point1Recv, point2Recv) { do :: atomic { ns1?point1Recv; ns1!point2Recv; }; :: atomic { ns2?point2Recv; ns2!point1Recv; }; od; } proctype Initiator(chan move, ns, sendCtrl, recvData, sendData, openData) { /* relative positions in the streams */ byte sent = 0; byte recd = 0; /* checkpoints */ ackedId[0] = 0; ackedSent[0] = 0; ackedRecd[0] = 0; byte pendId = 1; byte pendSent = 0; byte pendRecd = 0; /* indicate whether a reply is expected */ bool recvCpAck = false; /* messages */ dataMsg recvDataMsg; dataMsg sendDataDataMsg; sendDataDataMsg.type = data; sendDataDataMsg.cpId = 0; sendDataDataMsg.cpPos = 0; dataMsg sendCpDataMsg; sendCpDataMsg.type = cp; ctrlMsg recvCtrlMsg; ctrlMsg sendCtrlMsg; ACTIVE: if :: (sent == bufTrigger*bufFactor || recd == bufTrigger*bufFactor) && recvCpAck == true; if :: recvData?recvDataMsg; /* received data message */ goto ACTIVE_RECV_DATA; :: recvCtrl[0]?recvCtrlMsg; /* received control message */ goto ACTIVE_RECV_CTRL; :: atomic { move?recvCtrl[0]; /* moved to new network, register with name server */ ns!recvCtrl[0]; ns?sendCtrl; }; goto USER_RESUME; fi; :: (sent == bufTrigger*bufFactor || recd == bufTrigger*bufFactor) && recvCpAck == false; /* create checkpoint */ d_step { /* move the reference points */ sent = sent - pendSent; recd = recd - pendRecd; /* overwrite old acked and update absolute counters */ ackedId[0] = pendId; ackedSent[0] = ((ackedSent[0] + pendSent) % wrap); ackedRecd[0] = ((ackedRecd[0] + pendRecd) % wrap); /* new pend */ pendId = (pendId + 1) % 3; pendSent = sent; pendRecd = recd; /* prepare msg */ sendCpDataMsg.cpId = pendId; sendCpDataMsg.cpPos = pendSent; }; if :: sendData!sendCpDataMsg; recvCpAck = true; goto ACTIVE; :: true; /* failed to send */ goto READY_RESUME; fi; :: else; if :: recvData?recvDataMsg; /* received data or checkpoint ack */ goto ACTIVE_RECV_DATA; :: recvCtrl[0]?recvCtrlMsg; /* received control message */ goto ACTIVE_RECV_CTRL; :: sendData!sendDataDataMsg; /* sent data */ sent++; if :: sent >= bufTrigger && recvCpAck == false; /* create checkpoint */ d_step { /* move the reference points */ sent = sent - pendSent; recd = recd - pendRecd; /* overwrite old acked and update absolute counters */ ackedId[0] = pendId; ackedSent[0] = ((ackedSent[0] + pendSent) % wrap); ackedRecd[0] = ((ackedRecd[0] + pendRecd) % wrap); /* new pend */ pendId = (pendId + 1) % 3; pendSent = sent; pendRecd = recd; /* prepare message */ sendCpDataMsg.cpId = pendId; sendCpDataMsg.cpPos = pendSent; }; if :: sendData!sendCpDataMsg; recvCpAck = true; goto ACTIVE; :: true; /* failed to send */ goto READY_RESUME; fi; :: else; goto ACTIVE; fi; :: true; /* failed to send */ goto READY_RESUME; :: atomic { move?recvCtrl[0]; /* moved to new network, register with name server */ ns!recvCtrl[0]; ns?sendCtrl; }; goto USER_RESUME; fi; fi; ACTIVE_RECV_DATA: if :: recvDataMsg.type == data; /* received data */ recd++; if :: recd >= bufTrigger && recvCpAck == false; /* create checkpoint */ d_step { /* move the reference points */ sent = sent - pendSent; recd = recd - pendRecd; /* overwrite old acked and update absolute counter */ ackedId[0] = pendId; ackedSent[0] = ((ackedSent[0] + pendSent) % wrap); ackedRecd[0] = ((ackedRecd[0] + pendRecd) % wrap); /* new pend */ pendId = (pendId + 1) % 3; pendSent = sent; pendRecd = recd; /* prepare message */ sendCpDataMsg.cpId = pendId; sendCpDataMsg.cpPos = pendSent; }; if :: sendData!sendCpDataMsg; recvCpAck = true; goto ACTIVE; :: true; /* failed to send */ goto READY_RESUME; fi; :: else; goto ACTIVE; fi; :: recvDataMsg.type == cp && recvDataMsg.cpId == pendId && recvCpAck == true; /* this is a reply */ d_step { pendRecd = recvDataMsg.cpPos; recvCpAck = false; }; goto ACTIVE; :: else; assert(false); fi; ACTIVE_RECV_CTRL: if :: recvCtrlMsg.type == resume; sendCtrl = recvCtrlMsg.addr; /* received resume, send resume_ok */ goto SEND_RESUME_OK; :: recvCtrlMsg.type == resume_ok; /* should not receive resume_ok here */ assert(false); :: else; assert(false); fi; USER_RESUME: skip; d_step { sendCtrlMsg.type = resume; sendCtrlMsg.addr = recvCtrl[0]; sendCtrlMsg.cpId = ackedId[0]; }; if :: sendCtrl!sendCtrlMsg; goto SENT_RESUME; :: true; /* failed to send */ goto READY_RESUME; fi; SENT_RESUME: if :: recvCtrl[0]?recvCtrlMsg; if :: recvCtrlMsg.type == resume; sendCtrl = recvCtrlMsg.addr; /* received resume, initiator */ goto USER_RESUME; :: recvCtrlMsg.type == resume_ok; sendCtrl = recvCtrlMsg.addr; /* received resume_ok, rollback */ assert(recvCtrlMsg.cpId == ackedId[0]); d_step { /* reset to acked */ sent = 0; recd = 0; pendSent = sent; pendRecd = recd; recvCpAck = false; }; /* attempt to open data connection */ if :: openData!data; goto ACTIVE; :: true; /* open connection failed */ goto READY_RESUME; fi; :: else; assert(false); fi; :: atomic { move?recvCtrl[0]; /* moved to new network, register with name server */ ns!recvCtrl[0]; ns?sendCtrl; }; goto USER_RESUME; fi; READY_RESUME: if :: recvCtrl[0]?recvCtrlMsg; if :: recvCtrlMsg.type == resume; sendCtrl = recvCtrlMsg.addr; /* send resume_ok */ goto SEND_RESUME_OK; :: recvCtrlMsg.type == resume_ok; sendCtrl = recvCtrlMsg.addr; /* ignore */ goto READY_RESUME; :: else; assert(false); fi; :: atomic { move?recvCtrl[0]; /* moved to new network, register with name server */ ns!recvCtrl[0]; ns?sendCtrl; }; goto USER_RESUME; fi; SEND_RESUME_OK: assert(((recvCtrlMsg.cpId + 1) % 3) == ackedId[0] || recvCtrlMsg.cpId == ackedId[0]); d_step { sendCtrlMsg.type = resume_ok; sendCtrlMsg.addr = recvCtrl[0]; sendCtrlMsg.cpId = ackedId[0]; }; if :: sendCtrl!sendCtrlMsg; /* rollback */ d_step { /* reset to acked */ sent = 0; recd = 0; pendSent = sent; pendRecd = recd; recvCpAck = false; }; /* attempt to receive data connection */ if :: atomic { openData?data; /* processes should resume from the same place */ assert(ackedId[0] == ackedId[1] && ackedSent[0] == ackedRecd[1] && ackedRecd[0] == ackedSent[1]); d_step { /* empty data channel buffers */ do :: recvData?recvDataMsg; :: empty(recvData); break; od; do :: sendData?recvDataMsg; :: empty(sendData); break; od; skip; }; }; goto ACTIVE; :: true; /* failed to receive data connection */ goto READY_RESUME; :: atomic { move?recvCtrl[0]; /* moved to new network, register with name server */ ns!recvCtrl[0]; ns?sendCtrl; }; goto USER_RESUME; fi; :: true; /* failed to send */ goto READY_RESUME; fi; } proctype Noninitiator(chan move, ns, sendCtrl, recvData, sendData, openData) { /* relative positions in the streams */ byte sent = 0; byte recd = 0; /* checkpoints */ ackedId[1] = 0; ackedSent[1] = 0; ackedRecd[1] = 0; byte pendId = 1; byte pendSent = 0; byte pendRecd = 0; /* messages */ dataMsg recvDataMsg; dataMsg sendDataDataMsg; sendDataDataMsg.type = data; sendDataDataMsg.cpId = 0; sendDataDataMsg.cpPos = 0; dataMsg sendCpDataMsg; sendCpDataMsg.type = cp; ctrlMsg recvCtrlMsg; ctrlMsg sendCtrlMsg; ACTIVE: if :: sent == bufTrigger*bufFactor || recd == bufTrigger*bufFactor; if :: recvData?recvDataMsg; /* received data message */ goto ACTIVE_RECV_DATA; :: recvCtrl[1]?recvCtrlMsg; /* received control message */ goto ACTIVE_RECV_CTRL; :: atomic { move?recvCtrl[1]; /* moved to new network, register with name server */ ns!recvCtrl[1]; ns?sendCtrl; }; goto USER_RESUME; fi; :: else; if :: recvData?recvDataMsg; /* received data message */ goto ACTIVE_RECV_DATA; :: recvCtrl[1]?recvCtrlMsg; /* received control message */ goto ACTIVE_RECV_CTRL; :: sendData!sendDataDataMsg; /* sent data */ sent++; goto ACTIVE; :: true; /* failed to send */ goto READY_RESUME; :: atomic { move?recvCtrl[1]; /* moved to new network, register with name server */ ns!recvCtrl[1]; ns?sendCtrl; }; goto USER_RESUME; fi; fi; ACTIVE_RECV_DATA: if :: recvDataMsg.type == data; /* received data */ recd++; goto ACTIVE; :: recvDataMsg.type == cp && recvDataMsg.cpId == (pendId + 1) % 3; /* received checkpoint req */ d_step { /* move the reference points */ sent = sent - pendSent; recd = recd - pendRecd; /* overwrite old acked; update absolute counter */ ackedId[1] = pendId; ackedSent[1] = ((ackedSent[1] + pendSent) % wrap); ackedRecd[1] = ((ackedRecd[1] + pendRecd) % wrap); /* new pend */ pendId = recvDataMsg.cpId; pendSent = sent; pendRecd = recvDataMsg.cpPos; /* prepare message */ sendCpDataMsg.cpId = pendId; sendCpDataMsg.cpPos = pendSent; }; /* send checkpoint ack */ if :: sendData!sendCpDataMsg; goto ACTIVE; :: true; /* failed to send */ goto READY_RESUME; fi; :: else; /* should not happen */ assert(false); fi; ACTIVE_RECV_CTRL: if :: recvCtrlMsg.type == resume; sendCtrl = recvCtrlMsg.addr; /* received resume, try to send resume_ok */ goto SEND_RESUME_OK; :: recvCtrlMsg.type == resume_ok; /* should not receive resume_ok here */ assert(false); :: else; assert(false); fi; USER_RESUME: skip; d_step { sendCtrlMsg.type = resume; sendCtrlMsg.addr = recvCtrl[1]; sendCtrlMsg.cpId = ackedId[1]; }; if :: sendCtrl!sendCtrlMsg; goto SENT_RESUME; :: true; /* failed to send */ goto READY_RESUME; fi; SENT_RESUME: if :: recvCtrl[1]?recvCtrlMsg; if :: recvCtrlMsg.type == resume; sendCtrl = recvCtrlMsg.addr; /* received resume, not initiator */ goto READY_RESUME; :: recvCtrlMsg.type == resume_ok; sendCtrl = recvCtrlMsg.addr; /* received resume_ok, rollback */ d_step { if :: recvCtrlMsg.cpId == ackedId[1]; sent = 0; recd = 0; pendSent = sent; pendRecd = recd; :: recvCtrlMsg.cpId == pendId; /* reset */ sent = 0; recd = 0; /* overwrite old acked; update absolute counter */ ackedId[1] = pendId; ackedSent[1] = ((ackedSent[1] + pendSent) % wrap); ackedRecd[1] = ((ackedRecd[1] + pendRecd) % wrap); /* new pend */ pendId = (pendId + 1) % 3; pendSent = sent; pendRecd = recd; :: else; assert(false); fi; }; /* attempt to open data connection */ if :: openData!data; goto ACTIVE; :: true; /* open connection failed */ goto READY_RESUME; fi; :: else; /* should not happen */ assert(false); fi; :: atomic { move?recvCtrl[1]; /* moved to new network, register with name server */ ns!recvCtrl[1]; ns?sendCtrl; }; goto USER_RESUME; fi; READY_RESUME: if :: recvCtrl[1]?recvCtrlMsg; if :: recvCtrlMsg.type == resume; sendCtrl = recvCtrlMsg.addr; /* send resume_ok */ goto SEND_RESUME_OK; :: recvCtrlMsg.type == resume_ok; sendCtrl = recvCtrlMsg.addr; /* ignore */ goto READY_RESUME; :: else; /* should not happen */ assert(false); fi; :: atomic { move?recvCtrl[1]; /* moved to new network, register with name server */ ns!recvCtrl[1]; ns?sendCtrl; }; goto USER_RESUME; fi; SEND_RESUME_OK: assert(recvCtrlMsg.cpId == ackedId[1] || recvCtrlMsg.cpId == pendId); d_step { sendCtrlMsg.type = resume_ok; sendCtrlMsg.addr = recvCtrl[1]; if :: recvCtrlMsg.cpId == pendId; sendCtrlMsg.cpId = pendId; :: else; sendCtrlMsg.cpId = ackedId[1]; fi; }; if :: sendCtrl!sendCtrlMsg; /* rollback */ d_step { if :: sendCtrlMsg.cpId == ackedId[1]; sent = 0; recd = 0; pendSent = sent; pendRecd = recd; :: sendCtrlMsg.cpId == pendId; /* reset */ sent = 0; recd = 0; /* overwrite old acked; update absolute counter */ ackedId[1] = pendId; ackedSent[1] = ((ackedSent[1] + pendSent) % wrap); ackedRecd[1] = ((ackedRecd[1] + pendRecd) % wrap); /* new pend */ pendId = (pendId + 1) % 3; pendSent = sent; pendRecd = recd; fi; }; /* attempt to receive data connection */ if :: atomic { openData?data; /* processes should resume from the same place */ assert(ackedId[0] == ackedId[1] && ackedSent[0] == ackedRecd[1] && ackedRecd[0] == ackedSent[1]); d_step { /* empty data channel buffers */ do :: recvData?recvDataMsg; :: empty(recvData); break; od; do :: sendData?recvDataMsg; :: empty(sendData); break; od; skip; }; }; goto ACTIVE; :: true; /* failed to receive data connection */ goto READY_RESUME; :: atomic { move?recvCtrl[1]; /* moved to new network, register with name server */ ns!recvCtrl[1]; ns?sendCtrl; }; goto USER_RESUME; fi; :: true; /* failed to send */ goto READY_RESUME; fi; }