Linux Multi-threading applications at times require passing information from one thread to other. One way is to use some kind of thread synchronization mechanism. This post discusses FIFO, one of the primitive IPC mechanisms which can be used effectively for Inter Thread Communication also. FIFO or named pipes is one of the simplest yet fast communication method.

To understand basics of FIFOs, following link is useful:-

http://tldp.org/LDP/lpg/node15.html

Some points on use model

  • Both ends of FIFO need to be opened in the same process, then use separate threads for read and write
  • Minimal information like only an integer needs to be passed
  • Instead of an integer a pointer to the message or some other coding mechanism
  • Implementing threads should understand how to transform the integer or pointer to the message
  • Process level memory can be used for messages, so receiver does not have to reconstruct the message

How it works?

FIFOs have an inherent property that both the ends should be opened together. open() call blocks for read() till open() for write() gets called by another process. This behavior can be overridden by calling open() in non-blocking mode.

Asynchronous receiving

Any of the mechanisms like select(), poll() or epoll() can be used. I have used select() in code example below, as it is the simplest of all. I wish to write another post explaining the difference between these three.

To understand select(), following link is useful:-

http://linux.die.net/man/2/select

Implementation example

Example code in C language is given below for a kick-start.

Include files:

#include "unistd.h"
#include "stdio.h"
#include "sys/types.h"
#include "sys/stat.h"
#include "errno.h"
#include "fcntl.h"
#include "pthread.h"

Interval between write() so that output is visible to human eye

#define WRITE_INTERVAL 500*1000 // 500 milliseconds

Callback for reader function

void* reader(void* fd);

Callback for writer function

void writer(void* fd);

Main function (Opens pipe and spawns the threads)

int main()
{
  pthread_t readerThread;
  // Create named pipe
  char pipeName[] = "/tmp/trypipe";
  int ret_val = mkfifo(pipeName, 0666);
  if ((ret_val == -1) && (errno != EEXIST)) {
    perror("Error creating the named pipe");
  }
  // Open both ends within this process in on-blocking mode
  // Must do like this otherwise open call will wait
  // till other end of pipe is opened by another process
  int readFd = open(pipeName, O_RDONLY|O_NONBLOCK);
  int writeFd = open(pipeName, O_WRONLY|O_NONBLOCK);
  // Now implement a reader and writer for these fds
  // Launch Reader Thread
  pthread_create( &readerThread, NULL, &reader, (void*) (&readFd));
  // Use main thread for Writer (can invoke a separate thread instead)
  writer((void*) (&writeFd));
}

Callback function for writer

void writer(void* fd)
{
  // Form descriptor
  int writeFd = (*(int*)fd);
  static int var = 0;
  while(1) {
    // Form data to be written on pipe
    printf("\nWriter: %d", var);
    // Do a simple write
    write(writeFd, (int*)(&var), sizeof(var));
    // Increment the variable, so that we can see the incremental communication
    var++;
    // Give interval between next write
    usleep(WRITE_INTERVAL);
  }
}

Callback function for reader

void* reader(void* fd)
{
  // Form descriptor
  int readFd = (*(int*)fd);
  fd_set readset;
  int data = 0, err = 0, size = 0;
  // Initialize time out struct for select()
  struct timeval tv;
  tv.tv_sec = 0;
  tv.tv_usec = 10000;
  // Implement the receiver loop
  while(1) {
    // Initialize the set
    FD_ZERO(&readset);
    FD_SET(readFd, &readset);
    // Now, check for readability
    err = select(readFd+1, &readset, NULL, NULL, &tv);
    if (err > 0 && FD_ISSET(readFd, &readset)) {
      // Clear flags
      FD_CLR(readFd, &readset);
      // Do a simple read on data
      read(readFd, &data, sizeof(data));
      // Dump read data
      printf("\nReader: %d", data);
    }
  }
}

Copy and paste the complete code above. Build the exe

gcc a.c -o fifo -lpthread

Sample Output:

~$ ./fifo
Writer: 0
Reader: 0
Writer: 1
Reader: 1
Writer: 2
Reader: 2
Writer: 3
Reader: 3
Writer: 4
Reader: 4
Tagged with →  
Share →

7 Responses to Linux Multi-threading : FIFOs or Named Pipes

  1. Good Work ! Keep it up.

  2. Ed Kaulakis says:

    “Instead of an integer a pointer to the message”

    I would add:
    This pointer must convey ownership of the message. When finished with the message, the thread must either discard the message or pass ownership to some other entity.

  3. virgo says:

    I changed writer to a different thread.

    #include “unistd.h”
    #include “stdio.h”
    #include “sys/types.h”
    #include “sys/stat.h”
    #include “errno.h”
    #include “fcntl.h”
    #include “pthread.h”

    #define WRITE_INTERVAL 500*1000 // 500 milliseconds

    void* writer(void* fd);
    void* reader(void* fd);

    int main(int argc, char* argv[])
    {
    pthread_t readerThread;
    pthread_t writerThread;
    // Create named pipe
    char pipeName[] = “/tmp/trypipe”;
    int ret_val = mkfifo(pipeName, 0666);
    if ((ret_val == -1) && (errno != EEXIST)) {
    perror(“Error creating the named pipe”);
    }
    // Open both ends within this process in on-blocking mode
    // Must do like this otherwise open call will wait
    // till other end of pipe is opened by another process
    int readFd = open(pipeName, O_RDONLY|O_NONBLOCK);
    int writeFd = open(pipeName, O_WRONLY|O_NONBLOCK);
    // Now implement a reader and writer for these fds
    // Launch Reader Thread
    pthread_create( &readerThread, NULL, &reader, (void*) (&readFd));
    pthread_create( &writerThread, NULL, &writer, (void*) (&writeFd));
    pthread_join(readerThread, NULL);
    pthread_join(writerThread, NULL);
    // Use main thread for Writer (can invoke a separate thread instead)
    //writer((void*) (&writeFd));
    return 0;
    }

    void* writer(void* fd)
    {
    // Form descriptor
    int writeFd = (*(int*)fd);
    static int var = 0;
    while(1) {
    // Form data to be written on pipe
    printf(“\nWriter: %s”, “hello”);
    // Do a simple write
    //write(writeFd, “hello”, sizeof(“hello”));
    // Increment the variable, so that we can see the incremental communication
    //var++;
    // Give interval between next write
    usleep(WRITE_INTERVAL);
    }
    }

    void* reader(void* fd)
    {
    // Form descriptor
    int readFd = (*(int*)fd);
    fd_set readset;
    char data[1024], err = 0, size = 0;
    // Initialize time out struct for select()
    struct timeval tv;
    tv.tv_sec = 0;
    tv.tv_usec = 10000;
    // Implement the receiver loop
    while(1) {

    //printf(“\nREADING – STEP I…”);
    // Initialize the set
    FD_ZERO(&readset);
    FD_SET(readFd, &readset);
    // Now, check for readability
    err = select(readFd+1, &readset, NULL, NULL, &tv);
    if (err > 0 && FD_ISSET(readFd, &readset)) {
    //printf(“\nREADING – STEP II – SUCCESS…”);
    // Do a simple read on data
    read(readFd, &data, sizeof(data));
    // Dump read data
    printf(“\nReader: %s”, data);
    } else {
    //printf(“\nREADING – STEP II – FAIL…”);
    }
    }

    // Clear flags
    FD_CLR(readFd, &readset);
    }

  4. Kevin says:

    Not sure why “readFD+1” is being used rather than just “readFD” in the select statement?

  5. Mohsin says:

    Great code Ashish, how can we terminate form the continuous loop? plus can we do this for two functions in a program meaning one function writes the data while the other reads it, no need for main function?

Leave a Reply

Your email address will not be published. Required fields are marked *