Linux Multi-threading applications at times require passing information from one thread to other. One way is to use some kind of thread synchronization mechanism. This post discusses FIFO, one of the primitive IPC mechanisms which can be used effectively for Inter Thread Communication also. FIFO or named pipes is one of the simplest yet fast communication method.
To understand basics of FIFOs, following link is useful:-
http://tldp.org/LDP/lpg/node15.html
Some points on use model
- Both ends of FIFO need to be opened in the same process, then use separate threads for read and write
- Minimal information like only an integer needs to be passed
- Instead of an integer a pointer to the message or some other coding mechanism
- Implementing threads should understand how to transform the integer or pointer to the message
- Process level memory can be used for messages, so receiver does not have to reconstruct the message
How it works?
FIFOs have an inherent property that both the ends should be opened together. open() call blocks for read() till open() for write() gets called by another process. This behavior can be overridden by calling open() in non-blocking mode.
Asynchronous receiving
Any of the mechanisms like select(), poll() or epoll() can be used. I have used select() in code example below, as it is the simplest of all. I wish to write another post explaining the difference between these three.
To understand select(), following link is useful:-
http://linux.die.net/man/2/select
Implementation example
Example code in C language is given below for a kick-start.
Include files:
#include "stdio.h"
#include "sys/types.h"
#include "sys/stat.h"
#include "errno.h"
#include "fcntl.h"
#include "pthread.h"
Interval between write() so that output is visible to human eye
Callback for reader function
Callback for writer function
Main function (Opens pipe and spawns the threads)
{
pthread_t readerThread;
// Create named pipe
char pipeName[] = "/tmp/trypipe";
int ret_val = mkfifo(pipeName, 0666);
if ((ret_val == -1) && (errno != EEXIST)) {
perror("Error creating the named pipe");
}
// Open both ends within this process in on-blocking mode
// Must do like this otherwise open call will wait
// till other end of pipe is opened by another process
int readFd = open(pipeName, O_RDONLY|O_NONBLOCK);
int writeFd = open(pipeName, O_WRONLY|O_NONBLOCK);
// Now implement a reader and writer for these fds
// Launch Reader Thread
pthread_create( &readerThread, NULL, &reader, (void*) (&readFd));
// Use main thread for Writer (can invoke a separate thread instead)
writer((void*) (&writeFd));
}
Callback function for writer
{
// Form descriptor
int writeFd = (*(int*)fd);
static int var = 0;
while(1) {
// Form data to be written on pipe
printf("\nWriter: %d", var);
// Do a simple write
write(writeFd, (int*)(&var), sizeof(var));
// Increment the variable, so that we can see the incremental communication
var++;
// Give interval between next write
usleep(WRITE_INTERVAL);
}
}
Callback function for reader
{
// Form descriptor
int readFd = (*(int*)fd);
fd_set readset;
int data = 0, err = 0, size = 0;
// Initialize time out struct for select()
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 10000;
// Implement the receiver loop
while(1) {
// Initialize the set
FD_ZERO(&readset);
FD_SET(readFd, &readset);
// Now, check for readability
err = select(readFd+1, &readset, NULL, NULL, &tv);
if (err > 0 && FD_ISSET(readFd, &readset)) {
// Clear flags
FD_CLR(readFd, &readset);
// Do a simple read on data
read(readFd, &data, sizeof(data));
// Dump read data
printf("\nReader: %d", data);
}
}
}
Copy and paste the complete code above. Build the exe
Sample Output:
Writer: 0
Reader: 0
Writer: 1
Reader: 1
Writer: 2
Reader: 2
Writer: 3
Reader: 3
Writer: 4
Reader: 4
Good Work ! Keep it up.
“Instead of an integer a pointer to the message”
I would add:
This pointer must convey ownership of the message. When finished with the message, the thread must either discard the message or pass ownership to some other entity.
I changed writer to a different thread.
#include “unistd.h”
#include “stdio.h”
#include “sys/types.h”
#include “sys/stat.h”
#include “errno.h”
#include “fcntl.h”
#include “pthread.h”
#define WRITE_INTERVAL 500*1000 // 500 milliseconds
void* writer(void* fd);
void* reader(void* fd);
int main(int argc, char* argv[])
{
pthread_t readerThread;
pthread_t writerThread;
// Create named pipe
char pipeName[] = “/tmp/trypipe”;
int ret_val = mkfifo(pipeName, 0666);
if ((ret_val == -1) && (errno != EEXIST)) {
perror(“Error creating the named pipe”);
}
// Open both ends within this process in on-blocking mode
// Must do like this otherwise open call will wait
// till other end of pipe is opened by another process
int readFd = open(pipeName, O_RDONLY|O_NONBLOCK);
int writeFd = open(pipeName, O_WRONLY|O_NONBLOCK);
// Now implement a reader and writer for these fds
// Launch Reader Thread
pthread_create( &readerThread, NULL, &reader, (void*) (&readFd));
pthread_create( &writerThread, NULL, &writer, (void*) (&writeFd));
pthread_join(readerThread, NULL);
pthread_join(writerThread, NULL);
// Use main thread for Writer (can invoke a separate thread instead)
//writer((void*) (&writeFd));
return 0;
}
void* writer(void* fd)
{
// Form descriptor
int writeFd = (*(int*)fd);
static int var = 0;
while(1) {
// Form data to be written on pipe
printf(“\nWriter: %s”, “hello”);
// Do a simple write
//write(writeFd, “hello”, sizeof(“hello”));
// Increment the variable, so that we can see the incremental communication
//var++;
// Give interval between next write
usleep(WRITE_INTERVAL);
}
}
void* reader(void* fd)
{
// Form descriptor
int readFd = (*(int*)fd);
fd_set readset;
char data[1024], err = 0, size = 0;
// Initialize time out struct for select()
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 10000;
// Implement the receiver loop
while(1) {
//printf(“\nREADING – STEP I…”);
// Initialize the set
FD_ZERO(&readset);
FD_SET(readFd, &readset);
// Now, check for readability
err = select(readFd+1, &readset, NULL, NULL, &tv);
if (err > 0 && FD_ISSET(readFd, &readset)) {
//printf(“\nREADING – STEP II – SUCCESS…”);
// Do a simple read on data
read(readFd, &data, sizeof(data));
// Dump read data
printf(“\nReader: %s”, data);
} else {
//printf(“\nREADING – STEP II – FAIL…”);
}
}
// Clear flags
FD_CLR(readFd, &readset);
}
Your code doesnt work!!
Not sure why “readFD+1” is being used rather than just “readFD” in the select statement?
Great code Ashish, how can we terminate form the continuous loop? plus can we do this for two functions in a program meaning one function writes the data while the other reads it, no need for main function?
This code is written as a reference/guideline only. Functions can be used.