This program is a fairly simple message logging program in some respects similar to that described earlier. As with the earlier example, the complete source code is here for you to cut and paste into your programs, with accreditation please. The program is complied with the flags -lthread -lnsl -lsocket. There are, however, several important differences from the earlier example.
There are very significant differences in the code from the earlier example, these are necessitated by the support for multi-threading and signal handling. For a general introduction to multi-threaded programming follow this link
When the program is running there are several distinct threads of execution.
In any but the most elementary multi-threaded program, signal handling has to be considered very carefully. In general each thread will have its own signal mask and signal handling functions. In this program only the timing thread handles the SIGALRM and SIGHUP signals, the master thread does not handle any signals and the processing threads only handle SIGUSR1. The timing thread sends SIGUSR1 to processing threads which then examine a structure here called the "scoreboard" to find out why the timing thread has sent the processing thread a signal.
The process itself blocks SIGUSR1 so that this signal can be used safely by the threads for inter-thread communication.
SIGHUP sent to the process will close down all the sessions.
The program starts with the customary header files.
#include <netdb.h> #include <netinet/in.h> #include <sys/types.h> #include <sys/socket.h> #include <stdio.h> #include <time.h> #include <signal.h> #include <thread.h> #include <string.h> #include <errno.h>
The socket union declaration
union sock
{
struct sockaddr s;
struct sockaddr_in i;
};
This is followed by the "scoreboard" used by the timing thread to control the processing threads. The size of the scoreboard (15 slots) determines the maximum number of simultaneous sessions. The individual components of the scoreboard are
This is the time of the next event relevant to thread. It is expressed as an absolute time. As will be seen a processing thread requests a time out using the function reqto(). The timing thread examines the scoreboard once a second to see if any the current time is less than that for any requested event, is so SIGUSR1 is sent to the thread.
The identification number of the thread that has requested the time out.
The type of event that has caused the thread to be signalled. The thread in question should examine this value and proceed appropriately. In this program the actual signal codes from signal.h are used and the threads use the function sigtype() to determine the reason for the signal.
#define SMAX 15
struct scorebd
{
time_t tevent;
thread_t thread;
int sig;
} scorebd[SMAX];
mutex_t sb_lock;
thread_t thr_master;
The mutex is used to ensure the integrity of the scoreboard structure. thr_master holds the identification of the master thread so it can be signalled when the program is closing down.
This part of the code is followed by the function prototypes.
void *handling(void *); void handto(); void *timing(void *); void timer(); void master(); void reqto(int); int sigtype(); void thr_reg(); void thr_unreg();
The functions are as follows
| Name | Function |
|---|---|
| handling | The processing thread code. The prototype is as required by the thr_create library function. |
| handto | The signal handler for the processing thread. |
| timing | The timing thread code. |
| timer | The signal handler for the timing thread. |
| master | The signal handler for the master thread. |
| reqto | Used by handling threads to request a time out. |
| sigtype | Used by handling threads to determine why they have been signalled. |
| thr_reg | Used by a handling thread to create an entry for itself in the scoreboard. |
| thr_unreg | Used by a handling thread to remove its entry from the scoreboard. |
main(int argc,char *argv[])
{
int port;
union sock sock,work;
int wsd,sd;
int addlen;
struct sigaction siginfo;
int i,rv,nrv;
if(argc != 2) exit(1);
else port = atoi(argv[1]);
Before the program starts multi-threading it is necessary to set the process signal handling mask using the sigprocmask() system call having first set up the mask using the library functions sigemptyset() and sigaddset(). The process blocks all signals except SIGALRM and SIGHUP using the following code.
sigemptyset(&(siginfo.sa_mask)); sigaddset(&(siginfo.sa_mask),SIGHUP); sigaddset(&(siginfo.sa_mask),SIGALRM); sigprocmask(SIG_SETMASK,&(siginfo.sa_mask),NULL);
Having done this the program is now ready to start multi-threading, the "timing" thread is created using thr_create() and the current program is now the "master" thread. It is now necessary to set up signal handling correctly for the master thread which should block all signals except SIGUSR1 and execute the signal handler function master() on receipt of that signal.
if(thr_create(NULL,0,timing,NULL,0,NULL) == -1)
{
perror("Creating timing thread");
exit(2);
}
thr_master = thr_self();
sigemptyset(&(siginfo.sa_mask));
sigaddset(&(siginfo.sa_mask),SIGUSR1);
thr_sigsetmask(SIG_UNBLOCK,&(siginfo.sa_mask),NULL);
siginfo.sa_handler = master;
siginfo.sa_flags = 0;
sigaction(SIGUSR1,&siginfo,NULL);
The routine thr_sigsetmask() is used to set the thread's signal mask and the system call sigaction specifies the signal handler.
The remainder of the code in the function main() is concerned with setting up the sockets and is more or less identical to that in the previous examples.
if((sd = socket(AF_INET,SOCK_STREAM,0)) == -1)
{
perror("No socket");
exit(1);
}
sock.i.sin_family = AF_INET;
sock.i.sin_port = port;
sock.i.sin_addr.s_addr = INADDR_ANY;
if(bind(sd,&(sock.s),sizeof(struct sockaddr)) == -1)
{
perror("Bad Bind");
exit(1);
}
if(listen(sd,2) == -1)
{
perror("Bad listen");
exit(1);
}
Until the connection accepting loop is reached, Here once a connection has been accepted, thr_create() is used to invoke the function handling() as a separate thread. The address of the location holding the number of the socket used for communication is passed as the fourth parameter.
do
{
wsd = accept(sd,&(work.s),&addlen);
thr_create(NULL,0,handling,&wsd,0,NULL);
} while(1);
}
Apart from the manipulation of signal handling facilities, this does very little.
void *timing(void *arg)
{
struct sigaction siginfo;
sigemptyset(&(siginfo.sa_mask));
sigaddset(&(siginfo.sa_mask),SIGALRM);
sigaddset(&(siginfo.sa_mask),SIGHUP);
siginfo.sa_handler = timer;
siginfo.sa_flags = 0;
thr_sigsetmask(SIG_UNBLOCK,&(siginfo.sa_mask),NULL);
sigaction(SIGALRM,&siginfo,NULL);
sigaction(SIGHUP,&siginfo,NULL);
The consequence of the code above is that the "timing" thread will repsond to the SIGALRM and SIGHUP signals by invoking the handler function timer(). The rest of the code of the function timing() is, more or less, a loop that does nothing.
alarm(1);
do
{
pause();
thr_yield();
} while(1);
}
The initial call to alarm() sets up the first SIGALRM signal to the process in 1 second's time, the thread then pauses (using pause() until it receives a signal. The call to thr_yield() is probably un-necessary.
This function is the handler for the "timing" thread. It will be invoked whenever a SIGALRM or SIGHUP signal is taken, the parameter sig is the actual signal number as per signal.h. Its main purpose is to examine all the scoreboard entries and compare the requested event times with the current time and signal the threads if so required.
The code starts by determining the current time (in the variable now) and establishing a mutual exclusion lock on the scoreboard.
void timer(int sig)
{
int i,n;
time_t now;
time(&now);
mutex_lock(&sb_lock);
It then examines the scoreboard in a fairly obvious fashion. There is no thread zero (it says so in the manual somewhere) so unused slots can easily be skipped. If the incoming signal was SIGHUP, every registered thread is signalled as well as, as a special case, the master thread, if it was SIGALRM only the relevant threads are signalled. Threads are signalled using the function thr_kill().
for(i=0;i<SMAX;i++)
{
if(scorebd[i].thread)
{
if(sig == SIGHUP)
{
scorebd[i].sig = SIGHUP;
thr_kill(scorebd[i].thread,SIGUSR1);
}
else if(sig == SIGALRM)
{
if(scorebd[i].tevent < now)
{
scorebd[i].sig = SIGALRM;
thr_kill(scorebd[i].thread,SIGUSR1);
}
}
}
}
if(sig == SIGHUP) thr_kill(thr_master,SIGUSR1);
Once this has finished the mutual exclusion lock is released and the next SIGALRM is scheduled.
mutex_unlock(&sb_lock); alarm(1); return; }
This function does the actual handling of the communication with the remote host. This the function that is activated whenever a processing thread is launched from the "accept" loop in the "master" thread. It starts by establishing its signal handling characteristics in the same manner as the function timing(). The actual signal handling is done using the function handto(). This thread will only handle the signal SIGUSR1, this is blocked by the process so must have come from another thread within this process, specifically the "timing" thread.
void *handling(void *sd)
{
int nrv,wsd;
int i;
char buff[BUFSIZ];
FILE *logfp = NULL;
struct sigaction siginfo;
thread_t me;
wsd = *(int *)sd;
me = thr_self();
sigemptyset(&(siginfo.sa_mask));
sigaddset(&(siginfo.sa_mask),SIGUSR1);
thr_sigsetmask(SIG_UNBLOCK,&(siginfo.sa_mask),NULL);
siginfo.sa_handler = handto;
siginfo.sa_flags = 0;
sigaction(SIGUSR1,&siginfo,NULL);
thr_reg();
As well as the signal handling code, the extract above includes some network related items. The thread's thread identification is determined using the function thr_self() and stored in the variable me, it is used by the function sigtype().
The function thr_reg() is used to create a scoreboard entry for this thread.
The basic processing loop starts thus.
do
{
reqto(20);
nrv = read(wsd,buff+i,BUFSIZ-i);
The loop is similar to the previous example. The function reqto() is used to request a timeout in 20 seconds time. nrv is the number of bytes actually read. For normal operation the number of bytes read will be greater than zero and the following code deals with this case.
if(nrv > 0)
{
char *crlfp;
int fraglen;
i += nrv;
buff[i] = '\0';
if(crlfp=strstr(buff,"\r\n"))
{
*crlfp = '\0';
if(logfp == NULL)
{
logfp = fopen(buff,"w");
if(logfp == NULL)
{
close(wsd);
return;
}
}
else fprintf(logfp,"%s\n",buff);
fraglen = (buff+i)-(crlfp+2);
if(fraglen)
{
strcpy(buff,crlfp+2);
i = fraglen;
}
else i = 0;
}
}
This deals with the requirement that the first line of text is the name of the file to be used for logging and also deals with possiblity of the text arriving in "bits and drabs". If the number of bytes read is zero then the remote has closed the link, the program simply closes this end thus.
else if (nrv == 0)
{
close(wsd);
fclose(logfp);
thr_unreg();
return;
}
Returning from the thread terminates the thread after thr_unreg() has first removed the scoreboard entry.
If the number of bytes read is returned as less than zero then there has been some sort of error, or a signal has been handled during read().
else if(nrv < 0)
{
switch(sigtype(me))
{
case SIGALRM :
write(wsd,"Timed out - bye\r\n",17);
break;
case SIGHUP :
write(wsd,"Closing down - bye\r\n",20);
break;
}
close(wsd);
fclose(logfp);
thr_unreg();
return;
}
This assumes that the cause of the negative value is a signal which can only be SIGUSR1 from the "timing" thread. sigtype() is used to determine the type of signal that triggered this activity and appropriate messages are written to the communications channel. [Note, there are other possible causes of a negative return value from read(), it should be possible to distinguish these by examining errno but this didn't seem to work when this program was being developed.]
The final two lines of handling()
} while(1); }
This straightforward function posts a request for a time-out signal. This is done by writing the time of the required signal to the current thread's slot in the scoreboard.
void reqto(int dt)
{
time_t now;
thread_t me;
int i;
time(&now);
me = thr_self();
mutex_lock(&sb_lock);
for(i=0;i<SMAX;i++)
{
if(scorebd[i].thread == me)
{
scorebd[i].tevent = now + dt;
break;
}
}
if(i == SMAX) abort();
mutex_unlock(&sb_lock);
}
Note the use of mutual exclusion locks and the abort() if there is no slot for the current thread.
This function determines the reason for SIGUSR1 being posted to the current processing thread.
int sigtype()
{
thread_t me;
int i,rv = 0;
me = thr_self();
mutex_lock(&sb_lock);
for(i=0;i<SMAX;i++)
{
if(scorebd[i].thread == me)
{
rv = scorebd[i].sig;
scorebd[i].sig = 0;
break;
}
}
mutex_unlock(&sb_lock);
return rv;
}
Notice that once the slot for the current thread has been located, the signal is set to zero, this probably isn't really necessary.
This "registers" a processing thread in the scoreboard and is similar to the other scoreboard manipulating functions.
void thr_reg()
{
thread_t me;
int i;
me = thr_self();
mutex_lock(&sb_lock);
for(i=0;i<SMAX;i++)
{
if(scorebd[i].thread == 0)
{
scorebd[i].thread = me;
break;
}
}
if(i==SMAX) abort();
mutex_unlock(&sb_lock);
return;
}
This "de-registers" a processing thread and is, not surprisingly, very similar to thr_reg().
void thr_unreg()
{
thread_t me;
int i;
me = thr_self();
mutex_lock(&sb_lock);
for(i=0;i<SMAX;i++)
{
if(scorebd[i].thread == me)
{
scorebd[i].thread = 0;
break;
}
}
if(i == SMAX) abort();
mutex_unlock(&sb_lock);
return;
}
This is the processing thread signal handler function. It actually does nothing, all that matters is that the processing thread has received a signal.
void handto(int sig)
{
return;
}
This is the master thread signal handling function. It will normally only be invoked if the master thread has received a SIGUSR1 which will mean that the system is closing down. It waits for the termination of all active threads, which are identified from the scoreboard using, the function thr_join(), it then exits the process.
void master(int sig)
{
int i;
mutex_lock(&sb_lock);
for(i=0;i<SMAX;i++)
{
if(scorebd[i].thread) thr_join(scorebd[i].thread,NULL,NULL);
}
mutex_unlock(&sb_lock);
exit(0);
}
Note, there is a minor bug here, it should do something about the timing thread.