Write a C program to implement Birman-Schiper-Stephenson protocol - BITS WILP

Birman-Schiper-Stephenson Protocol 
Distributed Computing Assignment- BITS-WILP 

Write a C program to implement Birman-Schiper-Stephenson protocol

Birman-Schiper-Stephenson protocol - The goal of this protocol is to preserve ordering in the sending of messages. For example, if send(m1) -> send(m2), then for all processes that receive both m1 and m2,  receive(m1) -> receive(m2).

Suppose there are 3 processes

1. Open 3 terminals (Cygwin terminals ) and execute bss.exe
2. On terminal 1 enter 3 in  (Enter Total Number Of Processes (<100):)
3. Enter 0 in (Enter Process Id of Current Process (should be 0 to 2))
4. Enter port of process 0 To Receive Broadcast Message (ex. 1000)
5. Enter port of process 1 To Receive Broadcast Message (ex. 2000)
6. Enter port of process 2 To Receive Broadcast Message (ex. 3000)
7. Repeat steps 2 to 6 on remaining two terminals.

8. Enter something on any one of the terminals to initiate the broadcast message

bss.c
 #include <stdio.h>  
 #include <sys/socket.h>  
 #include <arpa/inet.h>  
 #include <stdlib.h>  
 #include <string.h>  
 #include <unistd.h>  
 #include <pthread.h>  
 #define MAXPENDING 100  
 int totalProcesses;  
 int processId;  
 unsigned int ports[100];  
 int proccessId;  
 int pclock[100];  
 int channelDelay[100];  
 int delivBuf[100][100];  
 int delivBufIndex=0;  
 struct node {  
   int rclock[100];  
   int isDelivered;  
   struct node *next;  
 };  
 struct node *head;  
 void DieWithError(char *errorMessage){  
   perror (errorMessage) ;  
   exit(1);  
 }  
 void addToBuffer(int *rclock){  
   struct node *temp = (struct node*) malloc(sizeof(struct node));  
   for(int p=0;p<=totalProcesses;p++){  
     temp->rclock[p] = *(rclock+p);  
   }  
   temp->isDelivered =0;  
   temp->next = head;  
   head = temp;  
 }  
 void displayLocalClock(){  
   printf("Local Clock \n");  
   for(int p =0;p<totalProcesses;p++){  
     printf("%d,",pclock[p]);  
   }  
    printf("\n\n");  
 }  
 void displayBuffer(){  
   struct node* temp =head;  
   printf("Buffered Messages Clock \n");  
   while(temp !=NULL){  
     for(int p =-1;p<totalProcesses;p++){  
       printf("-");  
     }  
     printf("\n");  
     for(int p =0;p<totalProcesses;p++){  
       printf("%d|",temp->rclock[p]);  
     }  
     printf("\n");  
     for(int p =-1;p<totalProcesses;p++){  
       printf("-");  
     }  
     temp = temp->next;  
   }  
   printf("\n\n");  
 }  
 void checkBufferAndDeliver(struct sockaddr_in peerAddr){  
   struct node* temp =head;  
   while(temp !=NULL && !(temp->isDelivered)){  
     if(pclock[temp->rclock[totalProcesses]] == temp->rclock[temp->rclock[totalProcesses]] -1){  
       int deliver =1;  
       for(int c=0;c<totalProcesses;c++){  
         if(c!= temp->rclock[totalProcesses]){  
           if(pclock[c] < temp->rclock[c]){  
             deliver=0;  
             break;  
           }  
         }  
       }  
       if(deliver){  
         printf("\nDelivered BUffered Message Which Was Received From:- %s(%d), Msg :- %d \n", inet_ntoa(peerAddr.sin_addr), ports[temp->rclock[totalProcesses]], temp->rclock[temp->rclock[totalProcesses]]);  
         temp->isDelivered =1;  
         for(int c=0;c<totalProcesses;c++){  
           pclock[c] = pclock[c] > temp->rclock[c] ? pclock[c] : temp->rclock[c];  
         }  
       }  
     }  
     temp = temp->next;  
   }  
 }  
 void handleDelivery(int *rclock,struct sockaddr_in peerAddr){  
   // Check all messages are recived which sent from sender before this event  
   if(pclock[*(rclock +totalProcesses)] == *(rclock +(*(rclock +totalProcesses))) -1){  
     int deliver =1;  
     for(int c=0;c<totalProcesses;c++){  
       if(c!= (*(rclock+totalProcesses))){  
         if(pclock[c] <(*(rclock+c))){  
           deliver=0;  
           break;  
         }  
       }  
     }  
     if(deliver){  
       printf("\nDelivered Message From:- %s(%d), Msg :- %d \n", inet_ntoa(peerAddr.sin_addr), ports[*(rclock+totalProcesses)], *(rclock+ (*(rclock+totalProcesses))));  
       for(int c=0;c<totalProcesses;c++){  
         pclock[c] = pclock[c] > (*(rclock+c)) ? pclock[c] : (*(rclock+c));  
       }  
       displayLocalClock();  
       checkBufferAndDeliver(peerAddr);  
     }else{  
       addToBuffer(rclock);  
       printf("*** All messages sent from other proceeses before this event is not yet received. Putting into buffer *****\n");  
       displayLocalClock();  
       displayBuffer();  
     }  
   }else{  
     addToBuffer(rclock);  
     printf("*** All message sent from %s(%d) before this event is not yet received. Putting into buffer *****\n",inet_ntoa(peerAddr.sin_addr), ports[(*(rclock+totalProcesses))]);  
     displayLocalClock();  
     displayBuffer();  
   }  
 }  
 void handleTCPClient(int clntSocket,struct sockaddr_in peerAddr){  
   int rclock[100];  
   int recvMsgSize;  
   if ((recvMsgSize = recv(clntSocket, &rclock, sizeof(rclock), 0)) < 0)  
     DieWithError("recv() failed");  
   printf("\nReceived Message From:- %s(%d), Msg :- %d \n", inet_ntoa(peerAddr.sin_addr), ports[rclock[totalProcesses]], rclock[rclock[totalProcesses]]);  
   handleDelivery(rclock,peerAddr);  
   close(clntSocket);  
 }  
 void *recvBCMsg(void *attribute){  
   int recvSock;  
   int peerSock;  
   struct sockaddr_in recvAddr;  
   struct sockaddr_in peerAddr;  
   unsigned int recvPort = ports[processId];  
   unsigned int peerLen;  
   if ((recvSock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)  
     DieWithError( "socket () failed\n") ;  
   memset(&recvAddr, 0, sizeof(recvAddr));  
   recvAddr.sin_family = AF_INET;  
   recvAddr.sin_addr.s_addr = htonl(INADDR_ANY);  
   recvAddr.sin_port = htons(recvPort);  
   if (bind(recvSock, (struct sockaddr *)&recvAddr, sizeof(recvAddr)) < 0)  
     DieWithError ("bind() failed\n");  
   if (listen(recvSock, MAXPENDING) < 0)  
     DieWithError("listen() failed\n");  
   printf("\nSocket Binded To Recv Messages\n");  
   for (;;){  
     peerLen = sizeof(peerAddr);  
     if ((peerSock = accept(recvSock, (struct sockaddr *) &peerAddr,&peerLen)) < 0)  
       DieWithError("accept() failed\n");  
     handleTCPClient(peerSock,peerAddr);  
   }  
 }  
 void sendBCMsg(char * msgsend){  
   int sendSock;  
   struct sockaddr_in peerAddr;  
   char *peerlP = "127.0.0.1";  
   pclock[processId] = pclock[processId] + 1;  
   int sent=0;  
   for(int p=processId+1;sent<totalProcesses-1;p++){  
     if ((sendSock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)  
       DieWithError("socket () failed\n") ;  
     memset(&peerAddr, 0, sizeof(peerAddr));  
     peerAddr.sin_family = AF_INET;  
     peerAddr.sin_addr.s_addr = inet_addr(peerlP);  
     peerAddr.sin_port = htons(ports[ p % totalProcesses]);  
     sleep(channelDelay[p % totalProcesses]);  
     if (connect(sendSock, (struct sockaddr *) &peerAddr, sizeof(peerAddr)) < 0)  
       DieWithError("connect () failed\n");  
     if (send(sendSock, &pclock, sizeof(pclock), 0) != sizeof(pclock))  
       DieWithError("send() sent a different number of bytes than expected\n");  
     sent++;  
   }  
 }  
 void initChannelDelay(){  
   int delay=2;  
   channelDelay[processId] = 0;  
   int chupdated=1;  
   for(int ch=processId+1;chupdated<totalProcesses;ch++){  
     channelDelay[ ch % totalProcesses ] = delay;  
     delay+=2;  
     chupdated++;  
   }  
   printf("Channel Delays to Process %d to %d ----> ",0,totalProcesses-1);  
   for(int ch =0 ;ch<totalProcesses;ch++){  
     printf("%d,",channelDelay[ch]);  
   }  
   printf("\n");  
 }  
 int main(int argc, const char * argv[])  
 {  
   printf("Enter Total Number Of Processes (< 100):\n");  
   scanf("%d",&totalProcesses);  
   printf("Enter Process Id of Current Process (should be 0 to %d)\n",totalProcesses-1);  
   scanf("%d",&processId);  
   for(int p=0;p<totalProcesses;p++){  
     printf("Enter (Procees-%d) Port To Recevice Brodacast Message\n",p);  
     scanf("%d",&ports[p]);  
   }  
   pclock[totalProcesses] = processId;  
   printf("This Procees Port No Is : %d\n",ports[processId]);  
   initChannelDelay();  
   pthread_t t;  
   if(pthread_create(&t, NULL, recvBCMsg, NULL)){  
     DieWithError("Failed To Create Thread To Receive Brodcast Messages");  
   }  
   char str[100];  
   for(;;){  
     printf("Enter something to send broadcast message\n");  
     scanf("%s" ,str);  
     sendBCMsg(str);  
   }  
 }  



compile -  gcc bss.c
execute - ./bss.exe

Output:

- Terminal 1 -


User@DESKTOP-L17EN0B ~/dc
$ gcc bss.c

User@DESKTOP-L17EN0B ~/dc
$ ./bss.exe
Enter Total Number Of Processes (< 100):
3
Enter Process Id of Current Process (should be 0 to 2)
0
Enter (Procees-0) Port To Recevice Brodacast Message
1000
Enter (Procees-1) Port To Recevice Brodacast Message
2000
Enter (Procees-2) Port To Recevice Brodacast Message
3000
This Procees Port No Is : 1000
Channel Delays to Process 0 to 2 ----> 0,2,4,
Enter something to send broadcast message

Socket Binded To Recv Messages

Received Message From:-  127.0.0.1(3000),  Msg :- 1

Delivered Message From:-  127.0.0.1(3000),  Msg :- 1
Local Clock
0,0,1,


Received Message From:-  127.0.0.1(2000),  Msg :- 1

Delivered Message From:-  127.0.0.1(2000),  Msg :- 1
Local Clock
0,1,1,

hi
Enter something to send broadcast message

Received Message From:-  127.0.0.1(3000),  Msg :- 2

Delivered Message From:-  127.0.0.1(3000),  Msg :- 2
Local Clock
1,1,2,


Received Message From:-  127.0.0.1(2000),  Msg :- 2

Delivered Message From:-  127.0.0.1(2000),  Msg :- 2
Local Clock
1,2,2,
Terminal 2:

User@DESKTOP-L17EN0B ~/dc
$ ./bss.exe
Enter Total Number Of Processes (< 100):
3
Enter Process Id of Current Process (should be 0 to 2)
1
Enter (Procees-0) Port To Recevice Brodacast Message
1000
Enter (Procees-1) Port To Recevice Brodacast Message
2000
Enter (Procees-2) Port To Recevice Brodacast Message
3000
This Procees Port No Is : 2000
Channel Delays to Process 0 to 2 ----> 4,0,2,
Enter something to send broadcast message

Socket Binded To Recv Messages

Received Message From:-  127.0.0.1(3000),  Msg :- 1

Delivered Message From:-  127.0.0.1(3000),  Msg :- 1
Local Clock
0,0,1,

message
Enter something to send broadcast message

Received Message From:-  127.0.0.1(1000),  Msg :- 1

Delivered Message From:-  127.0.0.1(1000),  Msg :- 1
Local Clock
1,1,1,


Received Message From:-  127.0.0.1(3000),  Msg :- 2

Delivered Message From:-  127.0.0.1(3000),  Msg :- 2
Local Clock
1,1,2,

Thanks
Enter something to send broadcast message
Terminal 3

User@DESKTOP-L17EN0B ~/dc
$ ./bss.exe
Enter Total Number Of Processes (< 100):
3
Enter Process Id of Current Process (should be 0 to 2)
2
Enter (Procees-0) Port To Recevice Brodacast Message
1000
Enter (Procees-1) Port To Recevice Brodacast Message
2000
Enter (Procees-2) Port To Recevice Brodacast Message
3000
This Procees Port No Is : 3000
Channel Delays to Process 0 to 2 ----> 2,4,0,
Enter something to send broadcast message

Socket Binded To Recv Messages
hello
Enter something to send broadcast message

Received Message From:-  127.0.0.1(2000),  Msg :- 1

Delivered Message From:-  127.0.0.1(2000),  Msg :- 1
Local Clock
0,1,1,


Received Message From:-  127.0.0.1(1000),  Msg :- 1

Delivered Message From:-  127.0.0.1(1000),  Msg :- 1
Local Clock
1,1,1,

ok
Enter something to send broadcast message

Received Message From:-  127.0.0.1(2000),  Msg :- 2

Delivered Message From:-  127.0.0.1(2000),  Msg :- 2
Local Clock
1,2,2,
Each process having some delay to send message to other processes, this is shown in the output window 'Channel Delays to Process 0 to 2 ----> 0,2,4, p0 -> p0 = 0 delay p0 -> p1 = 2 seconds p0 -> p2 = 4 seconds when p0 sends message to p1 and p2 ,p1 gets the message first and p2 second. Once p1 gets message and itself sends message to p2 then p0. p2 first gets the message from p1 and p0 gets. If p2 gets message from p1 before its receive message from p0 then p2 puts the p1 message in buffer, after that whenever p2 receives message from p0 then it delivers both buffered and received message to process.

1 comment:

  1. Do you have the code for SES protocol implementation? this seems to be a good code!!
    If you have the code can you please provide me the link!!

    ReplyDelete