|
|
This document is available in: English Castellano Deutsch Francais Nederlands Russian Turkce |
by Leonardo Giordani <leo.giordani(at)libero.it> About the author: Student at the Faculty of Telecommunication Engineering in Politecnico of Milan, works as network administrator and is interested in programming (mostly in Assembly and C/C++). Since 1999 works almost only with Linux/Unix. Translated to English by: Leonardo Giordani <leo.giordani(at)libero.it> Content: |
Concurrent programming - Message queues (1)Abstract: This series of articles has the purpose of introducing the reader to the concept of multitasking and to its implementation in the Linux operating system. Starting from the theoretical concepts at the base of multitasking we will end up writing a complete application demonstrating the communication between processes, with a simple but efficient communication protocol. Prerequisites for the understanding of the article are:
It is a good idea to read also the other articles in this series which appeared in the last 2 issues of LinuxFocus (November2002 and January2003). |
Synchronizing processes means timing their work, not in an absolute reference system (giving a precise time in which the process should begin its operations) but in a relative one, where we can schedule which process should work first and which second.
Using semaphores for this reveals itself as complex and limited: complex because every process should manage a semaphore for every other process that has to synchronize with it. Limited because it does not allow us the exchange parameters between the processes. Let's consider for example the creation of a new process: this event should be notified to every working process, but semaphores do not allow a process to send such information.
The concurrency control of the access to shared resources through semaphores, moreover, can lead to continuous blocking of a process, when one of the other processes involved release the resource and lock it again before others can use it: as we saw, in the world of concurrency programming it is not possible to know in advance which process will be executed and when.
These brief notes let us immediately understand that semaphores are an inadequate tool for managing complex synchronization problems. An elegant solution to this matter comes with the use of message queues: in this article we will study the theory of this interprocess communication facility and write a little program using SysV primitives.
The use of queues is thus a simple implementation of a mail system between processes: every process has an address with which it can other processes. The process can then read the messages delivered to its box in a preferential order and act accorting to what has been notified.
The synchronization of two processes can thus be performed simply using messages between the two: resources will still own semaphores to let the processes know their status, but timing between processes will be performed directly. Immediately we can understand that the use of message queues simplified very much what at the beginning was a extremely complex problem.
Before we can implement in C language the message queues it is necessary to speak about another problem related to synchronzation: the need for a communication protocol.
This is a simple example of a protocol based on message exchange: two processes A and B are executing concurrently and process different data; once they end their processing the have to merge the results. A simple protocol to rule their interaction could be the following
PROCESS B:
This protocol is simply extensible to the case of n processes: every process but A works with its own data and then sends a message to A. When A answers every process sends it its results: the structure of the individual processes (except A) has not been modified.
The structure at the basis of the system describing a message is called msgbuf ;it is declared in linux/msg.h
/* message buffer for msgsnd and msgrcv calls */ struct msgbuf { long mtype; /* type of message */ char mtext[1]; /* message text */ };
struct message { long mtype; /* message type */ long sender; /* sender id */ long receiver; /* receiver id */ struct info data; /* message content */ ... };
To create a new queue a process should call the msgget() function
int msgget(key_t key, int msgflg)which receives as arguments an IPC key and some flags, which by now can be set to
IPC_CREAT | 0660(create the queue if it does not exist and grant access to the owner and group users), and that returns the queue identifier.
As in the previous articles we will assume that no errors will happen, so that we can simplify the code, even if in a future article we will speak about secure IPC code.
To send a message to a queue of which we know the identifier we have to use the msgsnd() primitive
int msgsnd(int msqid, struct msgbuf *msgp, int msgsz, int msgflg)
length = sizeof(struct message) - sizeof(long);
To read the messages contained in a queue we use the
msgrcv()
system call
int msgrcv(int msqid, struct msgbuf *msgp, int msgsz, long mtype, int msgflg)
Removing a queue can be performed through the use of the
msgctl()
primitive with the flag IPC_RMID
msgctl(qid, IPC_RMID, 0)
#include <stdio.h> #include <stdlib.h> #include <linux/ipc.h> #include <linux/msg.h> /* Redefines the struct msgbuf */ typedef struct mymsgbuf { long mtype; int int_num; float float_num; char ch; } mess_t; int main() { int qid; key_t msgkey; mess_t sent; mess_t received; int length; /* Initializes the seed of the pseudo-random number generator */ srand (time (0)); /* Length of the message */ length = sizeof(mess_t) - sizeof(long); msgkey = ftok(".",'m'); /* Creates the queue*/ qid = msgget(msgkey, IPC_CREAT | 0660); printf("QID = %d\n", qid); /* Builds a message */ sent.mtype = 1; sent.int_num = rand(); sent.float_num = (float)(rand())/3; sent.ch = 'f'; /* Sends the message */ msgsnd(qid, &sent, length, 0); printf("MESSAGE SENT...\n"); /* Receives the message */ msgrcv(qid, &received, length, sent.mtype, 0); printf("MESSAGE RECEIVED...\n"); /* Controls that received and sent messages are equal */ printf("Integer number = %d (sent %d) -- ", received.int_num, sent.int_num); if(received.int_num == sent.int_num) printf(" OK\n"); else printf("ERROR\n"); printf("Float numero = %f (sent %f) -- ", received.float_num, sent.float_num); if(received.float_num == sent.float_num) printf(" OK\n"); else printf("ERROR\n"); printf("Char = %c (sent %c) -- ", received.ch, sent.ch); if(received.ch == sent.ch) printf(" OK\n"); else printf("ERROR\n"); /* Destroys the queue */ msgctl(qid, IPC_RMID, 0); }
The code I wrote creates a queue used by the son process to
send its data to the father: the son generates random numbers,
sends them to the father and both print them on the standard
output.
#include <stdio.h> #include <stdlib.h> #include <linux/ipc.h> #include <linux/msg.h> #include <sys/types.h> /* Redefines the message structure */ typedef struct mymsgbuf { long mtype; int num; } mess_t; int main() { int qid; key_t msgkey; pid_t pid; mess_t buf; int length; int cont; length = sizeof(mess_t) - sizeof(long); msgkey = ftok(".",'m'); qid = msgget(msgkey, IPC_CREAT | 0660); if(!(pid = fork())){ printf("SON - QID = %d\n", qid); srand (time (0)); for(cont = 0; cont < 10; cont++){ sleep (rand()%4); buf.mtype = 1; buf.num = rand()%100; msgsnd(qid, &buf, length, 0); printf("SON - MESSAGE NUMBER %d: %d\n", cont+1, buf.num); } return 0; } printf("FATHER - QID = %d\n", qid); for(cont = 0; cont < 10; cont++){ sleep (rand()%4); msgrcv(qid, &buf, length, 1, 0); printf("FATHER - MESSAGE NUMBER %d: %d\n", cont+1, buf.num); } msgctl(qid, IPC_RMID, 0); return 0; }
|
Webpages maintained by the LinuxFocus Editor team
© Leonardo Giordani, FDL LinuxFocus.org |
Translation information:
|
2003-03-05, generated by lfparser version 2.35