Multi-threaded software has been with us for a number of years. Largely confined to the world of real-time control systems and high-performance servers, multi-threaded programming has been an arcane, specialized form of programming, shrouded in myth. The recent move to multi-core microprocessors and multi-processor systems in the consumer market is about to bring more attention to this form of programming. Intel (CoreDuo, Pentium 4 with Hyper Threading) and AMD (Opteron) have moved to multi-core processors to increase performance. IBM is about to make the biggest multi-core splash with the nine-core Cell Processor. Commercial software developers, especially gaming companies, are now grappling with programming multi-threaded products.
The multi-threaded programming model maps very nicely to today's multi-core processors. Originally, multi-threaded software was intended to take advantage of the parallelism available in multiple processor computers. Now, we have to consider multiple execution cores within a processor, as well as possible multiple processors. Figure 1 shows how a multi-threaded process maps to IBM's new Cell Processor. Even software running on a uniprocessor system can benefit from threading, because two of the key advantages of threading are finer control over execution scheduling and less context-switching overhead.
One of the myths surrounding multi-threaded programming is the degree of difficulty. A lot of recent buzz mentions the "burden" placed on programmers developing multi-threaded software. I've heard a lot of this FUD coming from people whose interests are threatened by IBM's new Cell Processor (which will be the heart of Sony's new PlayStation 3, among others). The current party-line amongst such people is that the Cell Processor has a place in high-performance graphics and scientific calculation, but is unsuitable for general purpose computing tasks. What they fail to mention is that the limiting factor for all current computers is hard drive input/output. In general, a faster processor is always better than a slower one, provided that the system otherwise performs reliably. Given the Cell Processor's increased bus speed, it is hard to imagine it being unsuitable for general purpose computing. It is true that some portions of the desktop environment and applications will need to be re-written to take advantage of the newer processor architectures; this is evident in Apple's new CoreDuo systems.
Multi-threaded programming has been problematic in the past, mainly due to operating system developers' commitment (or lack thereof) to providing adequate thread support in their kernels and a lack of good development tools (mainly debuggers that can handle threads.) Different operating systems offered their own unique threading models and APIs. To alleviate this confusion, IEEE worked to develop a standardized method of implementing software threading. What came out of their work was the Pthreads (an acronym for "POSIX Threads") standard. Pthreads is incorporated under two IEEE Standards: 1003.1b and 1003.1c. Despite the length of time the Pthreads standard has been with us, operating system developers have been slow to offer support for it. The standard allows for layering Pthreads on top of proprietary threading implementations, and this is the approach some companies adopted. Despite this, early attempts at Pthreads conformance were unusable. In the Summer of 1998, I developed an automated voice mail provisioning system on Solaris, found the Pthread libraries unusable and had to write a single-threaded application to get the job done. One year later, in 1999, I helped develop a high-performance email delivery system, on a more recent version of Solaris, and the Pthread libraries were then functional, enabling me to develop my daemon process as a multi-threaded program (which was critical for that particular application.)
Some operating systems, notably Linux (which aims for POSIX compliance), offer satisfactory Pthreads support. IBM has been slow to release information on software tools for their new Cell Processor, but to quote from page 617, section 21.5.6 of the current (as of the time of this writing) IBM Cell Broadband Engine Programming Handbook (Version 1.0, revised April 12, 2006): "Most CBE software systems will include a thread library, and many such libraries will follow the POSIX definition, documented as POSIX threads..." So it looks like IBM is wisely encouraging their operating system developers to implement Pthreads for the Cell Processor, and I think it is safe to assume at this point that the Linux port to CBE will continue to support Pthreads.
How difficult is it to program with Pthreads? My experience with Pthreads has been that it is a different way of thinking about your program, much like having to think differently about using a reverse Polish notation calculator compared to a regular one. Many programmers had to learn a different paradigm when they made the shift to object-oriented programming. I like to think of threads as objects. Indeed, if you think of each created thread as an instantiated object you will find the transition easier from a syntactic standpoint. The difference between threads and objects is that any individual thread can be given the processor at any time, without warning. This requires you to program defensively, protecting your data structures from being mangled by misbehaved threads. The challenge of multi-threaded programming centers on synchronization and scheduling. Those of you who have developed software where multiple processes cooperate using semaphores and shared memory are already familiar with these issues and have a head start when learning Pthreads. So, my experience is that programming with Pthreads is not the "burden" some people claim, but is yet another programming paradigm that takes getting use to.
There are efforts underway to create tool kits that promise to insulate programmers from the details of multi-threaded programming. Intel is leading the way in this area (see "related links" at the end of this article). The Intel tool kit works only with Intel processors, so those who avail themselves of this convenience will find their source code conveniently tied to Intel-based computers (a very clever product tying strategy.)
Why should you use Pthreads? It actually doesn't make sense to use Pthreads for every program you develop. Certain types of software benefit from threading, while others don't. Programs requiring concurrency, where a large number of tasks need to be performed at the same time, benefit from threading. This is why the Cell Processor excels at graphics and number crunching. Software that handles large numbers of sessions with clients also benefit, which is why the Apache web server is threaded. Theoretically, the desktop environment (such as KDE or the Mac OS X desktop) would benefit from threading, making windows and applications more responsive to user interrupts. One area that benefits from multi-threaded software is real-time, high-performance communications devices. Figure 2 shows a theoretical high-performance Internet telephone handset, that besides voice handles live video streams. The figure shows a possible use of threads to shuttle data between different functional processes within the handset. Figure 3, at the end of this article, shows example C source code for the message handling daemon within such a handset.
In conclusion, interest in multi-threaded software is increasing as developers look for ways to leverage modern multi-core processors. Pthreads is a well developed standard enabling developers to create software that takes advantage of concurrent hardware environments. Pthreads is currently implemented on different operating systems, making it a safe bet in cases where software needs to be ported. Importantly, it appears Pthreads will be fully supported on IBM's new Cell Broadband Engine (AKA the "Cell Processor"). Several books are available, explaining how to program with Pthreads. It looks like Pthreads is a standard whose time has come.
June 14, 2006
Related Links:
O'Reilly tutorial book explaining how to program with Pthreads.
Intel C++ template library for multi-threaded programming. Supports most platforms.
IBM Cell Broadband Engine Programming Handbook.
Figure 3 - Example source code for multi-threaded data delivery daemon.
The header file for this program is listed after the program.
/* mesgd.c - A multi-threaded daemon used to handle inter-host messaging.
Copyright 2004-2006, Adrien Lamothe.
This program is free software; you can redistribute it and/or modify
it under the terms of the Version 2 GNU General Public License as published by
the Free Software Foundation.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */
#include "mesgd.h" // user-defined stuff for this program, includes many standard header files
main() {
printf("\n\nAbout to make this process a daemon!\n\n");
if (daemonize() == -1) // zap! A new daemon is born
exit(1);
sigfillset(&signal_set); // we want to block all signals ...
sigdelset(&signal_set, SIGHUP); // ... except SIGHUP
sigprocmask(SIG_BLOCK, &new_signal_set, &signal_set); // block them all
sigset(SIGHUP, sig_hup); // let's catch SIGHUP
openlog("mesgd", LOG_NDELAY, LOG_DAEMON);
syslog(LOG_NOTICE, "\nmesgd (PID = %i) has just started.\n", (int *) getpid());
initialize(); // create threads; threads will then create sockets, shared memory and buffers
for(;;) {
pause(); // main thread simply waits for signals, pause() is now thread-blocking
}
} // end main()
int daemonize(void) {
pid_t pid; // we need PID information to fork
if ( (pid = fork()) < 0 ) {
syslog(LOG_EMERG, "\nmesgd died while trying to fork! error: %m\n");
return(-1);
}
else if ( pid != 0 )
exit(0); // parent exits ...
// ... child continues:
setsid(); // become session leader
chdir("/root"); // change working directory
umask(0); // clear our file mode creation mask
return(0); // I am now a daemon, don't try to impede me!
} // end daemonize()
int end_it_all(void) {
// can also release thread-created data resources here, if pthread_cleanup_push() and _pop() are not working
// (but must use mutexes to do so.)
pthread_cancel(receiver_thread);
pthread_cancel(sender_thread);
syslog(LOG_NOTICE, "\nmesgd has just ended\n"); // the final syslog message
closelog(); // close syslog
pthread_mutex_unlock(&jump_lock); // for the sake of formality
exit(0); // bye-bye!
} // end end_it_all()
int initialize() {
// Read config file here and create threads, passing configuration parameters to them.
pthread_attr_init(&snd_thread_attr);
pthread_attr_setscope(&snd_thread_attr, PTHREAD_SCOPE_SYSTEM);
pthread_attr_setschedpolicy(&snd_thread_attr, SCHED_OTHER);
struct sched_param snd_thread_sched;
snd_thread_sched.sched_priority = sched_get_priority_max(SCHED_OTHER); // set to highest priority
pthread_attr_setschedparam(&snd_thread_attr, &snd_thread_sched);
pthread_create(&sender_thread, NULL, (void *) sender, (int *) &sender_thread_id);
pthread_detach(sender_thread);
pthread_attr_init(&rcv_thread_attr);
pthread_attr_setscope(&rcv_thread_attr, PTHREAD_SCOPE_SYSTEM);
pthread_attr_setschedpolicy(&rcv_thread_attr, SCHED_OTHER);
struct sched_param rcv_thread_sched;
rcv_thread_sched.sched_priority = sched_get_priority_max(SCHED_OTHER); // set to highest priority
pthread_attr_setschedparam(&rcv_thread_attr, &rcv_thread_sched);
pthread_create(&receiver_thread, NULL, (void *) receiver, (int *) &receiver_thread_id);
pthread_detach(receiver_thread);
pthread_mutex_lock(&count_lock); // wait for threads to initialize their data ....
while (thread_count != 0) {
pthread_cond_wait(&init_done, &count_lock);
}
pthread_mutex_unlock(&count_lock); // ... then return to main thread
return(0);
} // end initialize()
void receiver(int *my_id) {
receiver_thread_data *my_data;
int last_cancel_state, last_cancel_type;
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &last_cancel_state);
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &last_cancel_type);
pthread_mutex_lock(&count_lock); // lock count_lock and begin data initialization
my_data = (receiver_thread_data *) malloc(sizeof(receiver_thread_data));
my_data->thread_id = *my_id;
my_data->shm_buff_size = rcv_shm_size;
my_data->msg_ptr = my_data->msg_buffer = malloc(my_data->shm_buff_size);
my_data->msg_rec_len = rcv_rec_len;
my_data->shm_key = 1234567;
my_data->shm_fd = shmget(my_data->shm_key, rcv_shm_size, IPC_CREAT | IPC_EXCL | SHM_W | (SHM_W >> 3) | SHM_R | (SHM_R >> 3));
syslog(LOG_NOTICE, "\nReceiver shared memory allocated\n");
my_data->shm_ptr = my_data->shm_buffer = shmat(my_data->shm_fd, 0, 0);
pthread_cleanup_push((void *) end_receiver, (void *) &my_data);
if ((my_data->socket = socket(AF_INET, SOCK_DGRAM, 0)) > 0)
syslog(LOG_NOTICE, "\nInput socket created\n");
else {
syslog(LOG_EMERG, "\nUnable to create input socket! error: %m\n");
pthread_mutex_lock(&jump_lock);
end_it_all();
}
//setsockopt(my_data->socket, SOL_SOCKET, SO_BROADCAST, char *optval, int optlen);
my_data->socket_address.sin_family = AF_INET;
my_data->socket_address.sin_addr.s_addr = INADDR_ANY;
my_data->socket_address.sin_port = htons(15001); // later, use a configurable port number
/* Bind the socket to the address prepared */
if (bind(my_data->socket, (struct sockaddr *) &my_data->socket_address,
sizeof(my_data->socket_address)) == -1) {
syslog(LOG_EMERG, "\nUnable to bind to output socket! error: %m\n");
end_it_all();
}
thread_count -= 1; // decrement
pthread_cond_signal(&init_done); // signal that initialization is finished for this thread
pthread_mutex_unlock(&count_lock); // release lock
sleep(9); // pause a bit, for the user's sake
printf("\n\nHello from receiver thread!\n");
for(;;) {
pthread_testcancel();
recvfrom(my_data->socket, my_data->msg_ptr, my_data->msg_rec_len, 0,
(struct sockaddr*) &my_data->socket_address, (int *) sizeof(my_data->socket_address));
write(my_data->shm_fd, my_data->msg_ptr, my_data->msg_rec_len);
my_data->msg_ptr = (my_data->msg_ptr + my_data->msg_rec_len);
if (my_data->msg_ptr >= (my_data->msg_buffer + rcv_shm_size)) // if at end of buffer ...
my_data->msg_ptr = my_data->msg_buffer; // ... roll around to beginning of buffer
}
pthread_cleanup_pop(1); // as required by the standard, program won't compile without this.
} // end receiver()
void sender(int *my_id) {
int addr_len;
sender_thread_data *my_data;
int last_cancel_state, last_cancel_type;
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &last_cancel_state);
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &last_cancel_type);
pthread_mutex_lock(&count_lock); // lock count_lock and begin data initialization
my_data = (sender_thread_data *) malloc(sizeof(sender_thread_data));
my_data->thread_id = *my_id; // thread identifier
my_data->shm_buff_size = snd_shm_size;
my_data->msg_ptr = my_data->msg_buffer = malloc(my_data->shm_buff_size);
my_data->msg_rec_len = snd_rec_len;
static pthread_key_t key_sender_thread_data;
pthread_key_create(&key_sender_thread_data, (void *)end_sender);
pthread_setspecific(key_sender_thread_data, (void *)my_data);
pthread_cleanup_push((void *) end_sender, (void *) my_data);
if ((my_data->socket = socket(AF_INET, SOCK_DGRAM, 0)) > 0)
syslog(LOG_NOTICE, "\nOutput socket created\n");
else {
syslog(LOG_EMERG, "\nUnable to create output socket! error: %m\n");
pthread_mutex_lock(&jump_lock);
end_it_all();
}
/* Prepare socket address */
bzero(&my_data->socket_address, sizeof(my_data->socket_address)); /* fill with zeros */
my_data->socket_address.sin_family = AF_INET;
my_data->socket_address.sin_port = htons(15002); // later, use a configurable port number
/* Bind the socket to the address prepared */
if (bind(my_data->socket, (struct sockaddr *) &my_data->socket_address,
sizeof(my_data->socket_address)) == -1) {
syslog(LOG_EMERG, "\nUnable to bind to output socket! error: %m\n");
end_it_all();
}
addr_len = sizeof(struct sockaddr);
my_data->shm_key = 7654321;
my_data->shm_fd = shmget(my_data->shm_key, snd_shm_size, IPC_CREAT | IPC_EXCL | SHM_W | (SHM_W >> 3) | SHM_R | (SHM_R >> 3));
syslog(LOG_NOTICE, "\nsender shared memory allocated\n");
my_data->shm_ptr = my_data->shm_buffer = shmat(my_data->shm_fd, 0, 0);
thread_count -= 1; // decrement
pthread_cond_signal(&init_done); // signal that initialization is finished for this thread
pthread_mutex_unlock(&count_lock); // release lock
sleep(9); // pause a bit, for the user's sake
printf("\nHello from sender thread!\n\n");
for(;;) {
pthread_testcancel();
read(my_data->shm_fd, my_data->msg_ptr, my_data->msg_rec_len);
sendto(my_data->socket, my_data->msg_ptr, my_data->msg_rec_len, 0,
(struct sockaddr*) &my_data->socket_address, sizeof(my_data->socket_address));
my_data->msg_ptr = (my_data->msg_ptr + my_data->msg_rec_len);
if (my_data->msg_ptr >= (my_data->msg_buffer + snd_shm_size)) // if at end of buffer ...
my_data->msg_ptr = my_data->msg_buffer; // ... roll around to beginning of buffer
} // end for
pthread_cleanup_pop(1); // as required by the standard
} // end sender()
void end_receiver(receiver_thread_data *my_data) {
int thread_id = my_data->thread_id; // need to do this because data structures will be de-allocated
shmdt((void *) my_data->shm_ptr); // detach shared memory
syslog(LOG_NOTICE, "\nreceiver thread %i shared memory segment detached\n", thread_id);
close(my_data->socket); // close socket
syslog(LOG_NOTICE, "\nreceiver thread %i socket closed\n", my_data->thread_id);
free(my_data->msg_buffer); // free holding buffer
syslog(LOG_NOTICE, "\nreceiver thread %i message buffer de-allocated\n", thread_id);
free(my_data); // free data structure
syslog(LOG_NOTICE, "\nreceiver thread %i data structure de-allocated\n", thread_id);
} // end end_receiver()
void end_sender(sender_thread_data *my_data) {
int thread_id = my_data->thread_id; // need to do this because data structures will be de-allocated
shmdt((void *) my_data->shm_ptr); // detach shared memory
syslog(LOG_NOTICE, "\nsender thread %i shared memory segment detached\n", thread_id);
close(my_data->socket); // close socket
syslog(LOG_NOTICE, "\nsender thread %i socket closed\n", my_data->thread_id);
free(my_data->msg_buffer); // free holding buffer
syslog(LOG_NOTICE, "\nsender thread %i message buffer de-allocated\n", thread_id);
free(my_data); // free data structure
syslog(LOG_NOTICE, "\nsender thread %i data structure de-allocated\n", thread_id);
} // end end_sender()
static void sig_hup(int signo) {
syslog(LOG_NOTICE, "\nSIGHUP (PID = %i) received\n", (int *) getpid());
pthread_mutex_lock(&jump_lock); // set our jump lock
end_it_all(); // shut down gracefully
} // end sig_hup()
/* mesgd.h - Header file for mesgd, a multi-threaded daemon used to handle interprocess messaging between different hosts.
Copyright 2004-2006, Adrien Lamothe.
This program is free software; you can redistribute it and/or modify
it under the terms of the Version 2 GNU General Public License as published by
the Free Software Foundation.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */
#ifndef __mesgd_h
#define __mesgd_h
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <netdb.h>
#include <sys/un.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <netinet/in.h>
#include <syslog.h>
#include <pthread.h>
#define MAXLINE 4096 // max line length
int rcv_shm_size = 128000; // size of shared memory segment used as input to receiver thread.
int snd_shm_size = 128000; // size of shared memory segment used as input to sender thread.
int rcv_rec_len = 128; // message record length, inlcuding any newline or special characters.
int snd_rec_len = 128; // message record length, inlcuding any newline or special characters.
// the following two structures are identical (for now), but are separately defined to allow for future changes
typedef struct {
int thread_id; // thread identifier
int socket; // socket descriptor
struct sockaddr_in socket_address; // socket address structure
char *msg_buffer; // base address of holding area for messages
char *msg_ptr; // pointer into message holding area
int msg_buff_size; // total size of message buffer
int msg_rec_len; // message record length, inlcuding any newline or special characters.
key_t shm_key; // key to create shared memory segment
int shm_fd; // shared memory file descriptor
int shm_buff_size; // total size of shared memory segment
char *shm_buffer; // base address of shared memory segment
char *shm_ptr; // pointer into shared memory segment
} receiver_thread_data;
typedef struct {
int thread_id; // thread identifier
int socket; // socket descriptor
struct sockaddr_in socket_address; // socket address structure
char *msg_buffer; // holding area for messages
char *msg_ptr; // pointer into message holding area
int msg_buff_size; // total size of message buffer
int msg_rec_len; // message record length, inlcuding any newline or special characters.
key_t shm_key; // key to create shared memory segment
int shm_fd; // shared memory file descriptor
int shm_buff_size; // total size of shared memory segment
char *shm_buffer; // base address of shared memory segment
char *shm_ptr; // pointer into shared memory segment
} sender_thread_data;
pthread_t receiver_thread, sender_thread; // create separate threads for receiving and sending messages
pthread_attr_t rcv_thread_attr, snd_thread_attr;
void receiver(int *);
void sender(int *);
void end_receiver(receiver_thread_data *);
void end_sender(sender_thread_data *);
int receiver_thread_id = 1;
int sender_thread_id = 1;
int thread_count = 2;
pthread_mutex_t count_lock=PTHREAD_MUTEX_INITIALIZER; // used to lock thread data initialization
pthread_cond_t init_done=PTHREAD_COND_INITIALIZER; // used to check thread data initialization
pthread_mutex_t jump_lock=PTHREAD_MUTEX_INITIALIZER; // used to lock function calls
sigset_t signal_set, new_signal_set;
static void sig_hup(int);
int end_it_all(void);
#endif