A pipe is a descriptor pair for interprocess communication.1 It allows data to flow in one direction, from its read end to the write end. In a way, a pipe is the prototypical cod channel; they have been standing model for how channels work.
Pipes are mainly used to communicate with subprocesses; they can be accessed by everyone in the process group of the creating process. The patterns shown here will illustrate what real uses such communication can have.
Pipes can be closed on either end. If the last write end is closed, further
reads will raise a Cod::ConnectionLost[2]
.
Pipes in cod are constructed using
Cod.pipe
This returns a pipe that is either to be used for writing or for reading, but never both. If you really need both ends of a pipe in one and the same process, you should use either of these two obtain two copies:
# either call #dup
copy = pipe.dup
# or call #split (this closes pipe)
read, write = pipe.split
The penalty for not doing this is exceptions, so there, I warned you.
When you create a process fork (Kernel.fork
) on the other hand,
things become really simple. The only thing you need to be sure of is that you
don’t access the pipes you create before you create the fork. This would close
either end and render them useless:
chan = Cod.pipe
chan.put :smthng # raises <Errno::EPIPE: Broken pipe>
fork do
chan.get
end
Instead, create your pipes and only then fork all the processes you need to communicate with. They will see your pipes because they inherit memory and open IO streams from you; writing or reading from the pipes will still close the other end, but that will happen in separate processes. This is the advantage of share-nothing concurrency. Do this:
chan = Cod.pipe
fork do
chan.put :test
end
chan.get # => :test
Let’s assume we want to run a subprocess and have it produce a value for us. This is really a solved problem3, but let’s assume it isn’t. cod makes this easy:
def heavy_computation # nomen est omen
41 + 1
end
channel = Cod.pipe
fork do
channel.put heavy_computation()
end
channel.get # => 42
No need to stop the world just to get a simple answer; you fork a subprocess and read the answer from a cod channel.
A few things to note. cod is a friendly fellow, it communicates in Ruby values by default. This frequently comes in very handy indeed. If you happen to run into one of the infrequent cases where this bothers you, please skip ahead to the chapter on serialisation.
Also, cod will often block if you issue a #get
. This is really
a feature, not a bug. If you need to wait for new data to come in while doing
other work, please look at Cod.select
Have a look at this nifty trick:
channel = Cod.pipe
fork do
begin
fail "Meanwhile, back at the ranch:"
rescue => ex
channel.put ex
end
end
channel.get # => #<RuntimeError: Meanwhile, back at th...
One could even raise the error returned in the parent process, simulating a single process executing.
Want to distribute work to a few workers and gather their results? cod works for that as well. Here’s an example of N message producers, one consumer:
pipe = Cod.pipe
pids = 10.times.map {
fork {
pipe.put Process.pid } }
communicated_pids = 10.times.map { pipe.get }
pids.sort == communicated_pids.sort # => true
And here ’s an example of one producer, M consumers:
# A simple worker class
Worker = Struct.new(:n) do
def work_on(from, to)
loop do
work_item = from.get
to.put [n, work_item]
end
end
end
def Worker(n)
Worker.new(n)
end
work = Cod.pipe # for issuing work
result = Cod.pipe # for receiving results
# Set up ten worker processes
pids = 10.times.map do |i|
fork { Worker[i].work_on(work, result) }
end
# Distribute some work
10.times { |i| work.put (i+1) }
# Read back the results
10.times.map {
p result.get }
pids.each { |pid| Process.kill(:TERM, pid) }
Output will look like this:
[1, 1] [3, 2] [9, 3] [6, 4] [4, 5] [0, 6] [2, 7] [2, 8] [5, 9] [7, 10]
… in best case. Be warned, process scheduling is not as fine in granularity as thread scheduling is. The number of cores of the system you’re on and the operating system that you’re running this under will dramatically affect these results. Sometimes it is hard to get more than one processor to read from a single pipe.
1 $ man 2 pipe
2 Cod::ConnectionLost
in the YARD documentation.
3 Something that another library of mine could help you with: procrastinate by doing your work in child processes.
4 This has pitfalls. Have a look at the section titled “Signal handling might mess up library X” in the chapter on unix tricks.