Table of Contents

Pipes

A pipe is a descriptor pair for interprocess communication.1 It allows data to flow in one direction, from its read end to the write end. In a way, a pipe is the prototypical cod channel; they have been standing model for how channels work.

Pipes are mainly used to communicate with subprocesses; they can be accessed by everyone in the process group of the creating process. The patterns shown here will illustrate what real uses such communication can have.

Pipes can be closed on either end. If the last write end is closed, further reads will raise a Cod::ConnectionLost[2].

Basics

Pipes in cod are constructed using


  Cod.pipe

This returns a pipe that is either to be used for writing or for reading, but never both. If you really need both ends of a pipe in one and the same process, you should use either of these two obtain two copies:


  # either call #dup
  copy = pipe.dup

  # or call #split (this closes pipe)
  read, write = pipe.split

The penalty for not doing this is exceptions, so there, I warned you.

When using forked processes

When you create a process fork (Kernel.fork) on the other hand, things become really simple. The only thing you need to be sure of is that you don’t access the pipes you create before you create the fork. This would close either end and render them useless:


  chan = Cod.pipe
  chan.put :smthng # raises <Errno::EPIPE: Broken pipe>
  fork do
    chan.get
  end

Instead, create your pipes and only then fork all the processes you need to communicate with. They will see your pipes because they inherit memory and open IO streams from you; writing or reading from the pipes will still close the other end, but that will happen in separate processes. This is the advantage of share-nothing concurrency. Do this:


  chan = Cod.pipe
  fork do
    chan.put :test
  end
  chan.get # => :test

Communicate with Subprocesses, 101

Let’s assume we want to run a subprocess and have it produce a value for us. This is really a solved problem3, but let’s assume it isn’t. cod makes this easy:


  def heavy_computation # nomen est omen
    41 + 1
  end
  
  channel = Cod.pipe
  fork do
    channel.put heavy_computation()
  end

  channel.get # => 42

No need to stop the world just to get a simple answer; you fork a subprocess and read the answer from a cod channel.

A few things to note. cod is a friendly fellow, it communicates in Ruby values by default. This frequently comes in very handy indeed. If you happen to run into one of the infrequent cases where this bothers you, please skip ahead to the chapter on serialisation.

Also, cod will often block if you issue a #get. This is really a feature, not a bug. If you need to wait for new data to come in while doing other work, please look at Cod.select

Have a look at this nifty trick:


  channel = Cod.pipe
  fork do
    begin
      fail "Meanwhile, back at the ranch:"
    rescue => ex
      channel.put ex
    end
  end
  
  channel.get # => #<RuntimeError: Meanwhile, back at th...

One could even raise the error returned in the parent process, simulating a single process executing.

N:M patterns

Want to distribute work to a few workers and gather their results? cod works for that as well. Here’s an example of N message producers, one consumer:


  pipe = Cod.pipe
  
  pids = 10.times.map { 
    fork { 
      pipe.put Process.pid } }
  
  communicated_pids = 10.times.map { pipe.get }
  pids.sort == communicated_pids.sort  # => true

And here ’s an example of one producer, M consumers:

  
  # A simple worker class
  Worker = Struct.new(:n) do
    def work_on(from, to)
      loop do
        work_item = from.get
        to.put [n, work_item]
      end
    end
  end
  def Worker(n)
    Worker.new(n)
  end
  
  work = Cod.pipe   # for issuing work
  result = Cod.pipe # for receiving results 
  
  # Set up ten worker processes
  pids = 10.times.map do |i| 
    fork { Worker[i].work_on(work, result) }
  end
  
  # Distribute some work
  10.times { |i| work.put (i+1) }
  
  # Read back the results
  10.times.map { 
    p result.get } 
  
  pids.each { |pid| Process.kill(:TERM, pid) }

Output will look like this:

[1, 1]
[3, 2]
[9, 3]
[6, 4]
[4, 5]
[0, 6]
[2, 7]
[2, 8]
[5, 9]
[7, 10]

… in best case. Be warned, process scheduling is not as fine in granularity as thread scheduling is. The number of cores of the system you’re on and the operating system that you’re running this under will dramatically affect these results. Sometimes it is hard to get more than one processor to read from a single pipe.

1 $ man 2 pipe

2 Cod::ConnectionLost in the YARD documentation.

3 Something that another library of mine could help you with: procrastinate by doing your work in child processes.

4 This has pitfalls. Have a look at the section titled “Signal handling might mess up library X” in the chapter on unix tricks.