Message Queue
pipe와 같이 데이터를 주고받을 수 있는 IPC이다.pipe의 경우 byte stream으로 주고받으므로 데이터의 경제가 없지만, message queue는 message라는 단위로 데이터를 주고받으므로 경계가 명확하다.message들은 linked list형태로 kernel에 저장되며, IPC이므로 identifer가 지정된다.
message queue는 file의 struct stat과 같은 구조체로 mspid_ds가 있다.
struct msqid_ds { /* <sys/msg.h> */
struct ipc_perm msg_perm; /* see ch08_ipc1-p.29 */
struct msg *msg_first; /* ptr to first message on queue */
struct msg *msg_last; /* ptr to last message on queue */
msglen_t msg_cbytes; /* current #bytes on queue */
msgqnum_t msg_qnum; /* # of messages on queue */
msglen_t msg_qbytes; /* max # of bytes on queue */
pid_t msg_lspid; /* pid of last msgsnd() */
pid_t msg_lrpid; /* pid of last msgrcv() */
time_t msg_stime; /* last-msgsnd() time */
time_t msg_rtime; /* last-msgrcv() time */
time_t msg_ctime; /* last-change time */
};
msg_perm은 permission에 대한 정보가 있는 변수이다.
msg_first와 msg_last는 message가 저장되는 linked list의 시작과 끝 pointer를 담는 변수이다.
msg_cbytes는 message queue의 전체 size
msg_qnum은 message의 개수
msg_qbytes는 queue의 max byte이다.
msg_lspid와 msg_lrpid는 마지막으로 send, receive를 한 프로세스의 pid이다.
msg_stime은 가장 최근에 send를 한 시간
msg_rtime은 가장 최근에 receive를 한 시간
msg_ctime은 가장 최근에 변경된 시간
message는 linked list로 되어있으므로 그림과 같이 msg_first가 첫번째 데이터를 가리키고, msg_last가 마지막 데이터를 가리킨다.
The msgget(2) system call
#include <sys/msg.h>
int msgget(key_t key, int flag);
// Returns: identifier if OK, -1 on error
key와 이용해서 identify를 return한다. flag에는 read / write에 대한 permission이 들어간다.
key를 생성하는 방법에는 3가지가 있다고 했었다. 다음은 그 세 가지 중 IPC_PRIVATE를 통해 생성한다.
#define PERMS (S_IRUSR | S_IWUSR)
int msqid;
if ((msqid = msgget(IPC_PRIVATE, PERMS)) == -1)
perror("Failed to create new private message queue");
errno | cause |
EACCES | message queue exists for key, but permission denied key가 있지만, permission이 없는 경우 |
EEXIST | message queue exists for key, but((flag&IPC_CREAT)&&(flag&IPC_EXCL)) == -1 flag가 IPC_CREAT & IPC_EXCL인데 key가 이미 있는 경우 |
ENOENT | message queue does not exist for key, but(msgflg & IPC_CREAT) == 0 IPC_CREAT flag가 아닌데 key가 없는 경우 |
ENOSPC | systemwide limit on message queues would be exceeded message queue를 더 이상 만들 공간이 없는 경우 |
IPC를 생성하고 나서 remove를 해주지 않으면 메모리에 쌓이므로 지워주어야 한다.
The msgsnd(2) system call
#include <sys/msg.h>
int msgsnd(int msqid, const void *ptr, size_t nbytes, int flag);
// Returns: 0 if OK, -1 on error
msqid에 해당하는 id를 가진 message queue에 message를 보내는 system call이다.
ptr에 해당하는 주소에 보낼 message가 들어있다. message는 queue의 마지막에 linked list로 연결된다.
ptr에 해당하는 내용은 다음과 같은 구조체로 이루어져 있다.
struct mymesg {
long mtype; /* positive message type */
char mtext[1]; /* message data, of length nbytes */
};
mtype은 message type, 그 뒤의 mtext에 필요한 만큼의 message data가 들어간다.
mtext의 size는 msgsnd의 nbytes와 일치해야한다.
flag의 경우 IPC_NOWAIT이 지정되면, message queue가 full일 때 block되지 않고 fail로 return한다.
The msgrcv(2) system call
#include <sys/msg.h>
ssize_t msgrcv(int msqid, void *ptr, size_t nbytes,
long type, int flag);
// Returns: size of data portion of message if OK, -1 on error
msqid에 해당하는 message queue로부터 message를 receive한다. nbytes 크기 만큼만 가져올 수 있다.
type에 따라서 다음과 같이 작동방식이 구분된다.
type | action |
== 0 | remove first message from queue queue의 맨 앞의 message를 가져옴 |
> 0 | remove first message of type from the queue type번호와 일치하는 message를 가져옴 (message 구조체인 mymesg struct의 mtype변수와 일치하는 것) |
< 0 | remove first message of lowest type that is less than or equal to the absolute value of type type 수의 절댓값보다 작거나 같은 type 중에서 가장 작은 type을 가진 message를 가져옴 |
flag를 IPC_NOWAIT으로 설정하면 message queue가 full일 때 block되지 않고 fail return한다.
flag를 MSG_NOERROR로 설정하면 ntypes보다 mtext의 size가 큰 message가 들어왔을 때 error를 발생시키지않고 truncate하여 받는다.
만약 같은상황에서 MSG_NOERROR로 설정하지 않은 경우 E2BIG error를 return한다.
The msgctl(2) system call
#include <sys/msg.h>
int msgctl(int msqid, int cmd, struct msqid_ds *buf );
// Returns: 0 if OK, -1 on error
cmd는 다음 중 하나이다.
cmd | description |
IPC_STAT | copy members of the msqid_ds data structure into buf msqid_ds의 데이터 구조를 buf로 복사해온다. |
IPC_SET | set members of the msqid_ds data structure from buf msqid_ds를 저장할 때 사용 |
IPC_RMID | remove the message queue msqid and destroy the corresponding msqid_ds message queue msqid_ds를 삭제한다. |
example - a queue with priorities
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>
#include <errno.h>
#define QKEY (key_t)0105 /* 큐의 키를 식별한다 */
#define QPERM 0660 /* 큐의 허가 */
#define MAXOBN 50 /* 개체 이름의 최대길이 */
#define MAXPRIOR 10 /* 최대 우선 순위 수준 */
struct q_entry {
long mtype;
char mtext [MAXOBN+1];
};
int warn (char *s){
fprintf (stderr, "warning: %s\n", s);
}
int init_queue(void){/* init_queue -- 큐 식별자를 획득한다. */
int queue_id;
if ( (queue_id=msgget (QKEY,IPC_CREAT|QPERM))==-1) perror("msgget failed");
return (queue_id);
}
init_queue는 msgget에 key, IPC_CREAT, QPERM을 통해 얻은 id값을 return 한다.
int enter (char *objname, int priority){
int len, s_qid;
struct q_entry s_entry; /* 메시지를 저장할 구조 */
if ( (len = strlen(objname)) > MAXOBN){ /* 이름의 길이, 우선순위 수준을 확인한다. */
warn ("name too long");
return (-1);
}
if (priority > MAXPRIOR || priority < 0){
warn ("invalid priority level");
return (-1);
}
if ( (s_qid=init_queue())==-1) return (-1); /* 필요에 따라 메시지 큐를 초기화한다. */
/* s_entry를 초기화한다. */
s_entry.mtype = (long) priority;
strncpy (s_entry.mtext, objname, MAXOBN);
/* 메시지를 보내고, 필요할 경우 기다린다. */
if (msgsnd (s_qid, &s_entry, len, 0) == -1){
perror ("msgsnd failed");
return (-1);
} else return (0);
}
enter는 message를 send하는 function이다.
objname의 길이를 확인하고, priority가 0~MAXPRIOR 사이인지 확인한다.
init_queue를 통해 queue를 초기화하여 qid를 가져온다.
mtype을 지정하고, strncpy로 objname을 mtext에 복사한다.
준비된 메시지를 msqsnd로 보낸다.
int proc_obj (struct q_entry *msg){
printf ("\npriority: %ld name: %s\n", msg―>mtype, msg―>mtext);
}
int serve (void){
int mlen, r_qid;
struct q_entry r_entry;
/* 필요에 따라 메시지 큐를 초기화한다. */
if ((r_qid = init_queue()) == -1) return (-1);
/* 다음 메시지를 가져와 처리한다. 필요하면 기다린다. */
for (;;){
if ((mlen=msgrcv(r_qid, &r_entry, MAXOBN,(-1*MAXPRIOR), MSG_NOERROR))== -1){
perror ("msgrcv failed");
return (-1);
}
else {
/* 우리가 문자열을 가지고 있는지 확인한다. */
r_entry.mtext[mlen]='\0';
/* 객체 이름을 처리한다. */
proc_obj (&r_entry);
}
}
}
serve function을 통해 message를 receive한다.
init_queue로 message queue를 초기화하여 qid를 받고, msgrcv로 message를 receive한다.
받은 message를 출력한다.
다음은 main function이다.
/* etest -- 큐에 객체 이름을 넣는다. */
#include <stdio.h>
#include <stdlib.h>
#include "q.h"
main (int argc, char **argv){
int priority;
if (argc != 3){
fprintf (stderr, "usage: %s objname priority\n", argv[0]);
exit (1);
}
if ((priority = atoi(arvg[2])) <= 0 || priority > MAXPRIOR){
warn ("invalid priority");
exit (2);
}
if (enter (argv[1], priority) < 0){
warn ("enter failure");
exit (3);
}
exit (0);
}
argument를 받고 priority를 지정하고, enter function으로 message를 send한다.
위 코드는 send process이고 receive process는 다음과 같다.
/* stest -- 큐를 위한 단순한 서버 */
#include <stdio.h>
#include "q.h"
main(){
pid_pid;
switch (pid = fork()){
case 0: /* 자식 */
serve();
break; /* 실제로는, 서버는 결코 퇴장(exit)하지 않음 */
case -1:
warn ("fork to start server failed");
break;
default:
printf ("server process pid is %d\n", pid);
}
exit( pid != -1 ? 0 : 1);
}
fork를 통해 child process를 생성하고, child가 serve(receive)를 담당한다.
결과는 다음과 같다.
$ etest objname1 3
$ etest objname2 4
$ etest objname3 1
$ etest objname4 9
$ stest
server process pid is 2545
$
priority: 1 name: objname3
priority: 3 name: objname1
priority: 4 name: objname2
priority: 9 name: objname4
출력결과를 보면 우선순위대로 1,3,4,9 순으로 출력된 것을 확인할 수 있다.
example
#include <sys/types.h> /* showmsg -- 메시지 큐의 자세한 정보를 보인다. */
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <time.h>
void mqstat_print (key_t, int, struct msqid_ds *);
main( int argc, char **argv){
key_t mkey;
int msq_id;
struct msqid_ds msq_status;
if (argc != 2){
fprintf (stderr, "usage: showmsg keyval\n"); exit (1);
}
mkey = (key_t) atoi (argv[1]); /* 메시지 큐 식별자를 얻는다. */
if (( msq_id = msgget(mkey, 0)) == -1){
perror( "msgget failed"); exit (2);
}
if (msgctl(msq_id, IPC_STAT, &msq_status) == -1){ /* 상태 정보를 얻는다. */
perror ("msgctl failed"); exit (3);
}
mqstat_print (mkey, msq_id, *msq_status); /* 상태 정보를 프린트한다. */
exit (0);
}
mqstat_print (key_t mkey, int mqid, struct msqid_ds *mstat){
printf ("\nKey %d, msg_qid %d\n\n", mkey, mqid);
printf ("%d message(s) on queue\n\n", mstat->msg_qnum);
printf ("Last send by proc %d at %s\n", mstat->msg_lspid,
ctime (&(mstat->msg_stime)));
printf ("Last recv by proc %d at %s\n", mstat->msg_lrpid,
ctime (& (mstat->msg_rtime)));
}
mkey 변수에 key숫자를 넣고, get으로 msq_id를 얻는다.
msgctl을 통해 IPC_STAT 즉 msq_status로 msqid_ds를 받고 mqstat_print function을 통해 정보를 출력한다.