ViFI ViFI - 15 days ago 5
Python Question

'Broken Pipe' error when reusing the same pipe inside a loop

I am new to interprocess communication and I am trying to understand the usage of

os.pipe
and
os.fork
with each other in Python.

In the code below, If I uncomment the lines "Broken Pipe" error comes otherwise it is working fine.

Idea is to have a SIGCHLD handler when child process exits and increment respective counters when child only function (run_child) and parent only function (sigchld_handler) execute. Since forked process will have its own version of memory and changes will not reflect in parent process, attempt is to let child process send message to parent process via pipe and let parent process update counter.

import os
import signal
import time

class A(object):
def __init__(self):
self.parent = 0
self.child = 0
self._child_pid = None

self.rd , self.wr = os.pipe()
print self.rd , self.wr
signal.signal(signal.SIGCHLD, self.sigchld_handler)

def sigchld_handler(self, a, b):
self.parent += 1
print "Main run count : (parent) ", self.parent
#rf = os.fdopen(self.rd, 'r')
#self.child = int(rf.read())
#rf.close()
self._child_pid = None

def run_child(self):
self.child += 1
print "Main run count (child) : ", self.child
print "Running in child : " , os.getpid()
wr = os.fdopen(self.wr,'w')
text = "%s" % (self.child)
print "C==>", text
wr.write(text)
wr.close()
os._exit(os.EX_OK)

def run(self):
if self._child_pid:
print "Child Process", self._child_pid, " already running."
else:
self._child_pid = os.fork()

if not self._child_pid:
self.run_child()

a = A()
i = 0
while True:
a.run()
time.sleep(4)
i += 1
if i > 5:
break


Interestingly error comes after first few iterations. Can somebody please explain why the error is coming and what should I do to solve this.

EDIT 1:
There are a couple of similar examples: ex1 , ex2 , ex3 . I have actually used them only to learn but in my case, I am extending the examples to run in a loop to act more like a producer/consumer queue. I understand it might not be good approach as multiprocess/Queue modules are available in Python but I want to understand the mistake I am making here.

EDIT 2 (solution):

Based on @S.kozlov's answer, modifying code to create a new pipe for every communication. Here is the modified code.

import os
import pdb
import signal
import time

class A(object):
def __init__(self):
self.parent = 0
self.child = 0
self._child_pid = None
signal.signal(signal.SIGCHLD, self.sigchld_handler)

def sigchld_handler(self, a, b):
self.parent += 1
os.close(self.wr)
print "Main run count : (parent) ", self.parent
rd = os.fdopen(self.rd, 'r')
self.child = int(rd.read())
self._child_pid = None

def run_child(self):
self.child += 1
print "Main run count (child) : ", self.child
print "Running in child : " , os.getpid()
os.close(self.rd)
wr = os.fdopen(self.wr, 'w')
text = "%s" % (self.child)
print "C==>", text
wr.write(text)
wr.close()
os._exit(os.EX_OK)

def run(self):
if self._child_pid:
print "Child Process", self._child_pid, " already running."
else:
self.rd , self.wr = os.pipe()
self._child_pid = os.fork()

if not self._child_pid:
self.run_child()

a = A()
i = 0
while True:
a.run()
time.sleep(4)
i += 1
if i > 5:
break


With this, output should come (something) like this.

Main run count (child) : 1
Running in child : 15752
C==> 1
Main run count : (parent) 1
Main run count (child) : 2
Running in child : 15753
C==> 2
Main run count : (parent) 2
Main run count (child) : 3
Running in child : 15754
C==> 3
Main run count : (parent) 3
Main run count (child) : 4
Running in child : 15755
C==> 4
Main run count : (parent) 4
Main run count (child) : 5
Running in child : 15756
C==> 5
Main run count : (parent) 5
Main run count (child) : 6
Running in child : 15757
C==> 6
Main run count : (parent) 6

Answer

The problem with your code is that you are trying to reuse one pipe several times, and it's not the valid case for pipe in general. The exception you are getting just saying you: "Hey, you have closed this pipe on the previous run. Once a pipe is closed, it's closed.".

So you can change your code to create a pipe for each child, store one end (read) in the "parent" and give another to the child. Then it should work.

Edit 1. I've updated your code with that thing about "one pipe for every child", it's not how the good code supposed to be, but in educational sense hope it will help.

import os
import signal
import time


class A(object):
    def __init__(self):
        self.parent = 0
        self.child = 0
        self._child_pid = None
        signal.signal(signal.SIGCHLD, self.sigchld_handler)

    def sigchld_handler(self, a, b):
        self.parent += 1
        print "Main run count : (parent) ", self.parent
        os.close(self.wr)
        rf = os.fdopen(self.rd, 'r')
        message = rf.read()
        rf.close()
        print "Code from child [", self._child_pid, "]: ", message
        self.rd = None
        self._child_pid = None

    def run_child(self):
        self.child += 1
        print "Main run count (child) : ", self.child
        print "Running in child : " , os.getpid()
        os.close(self.rd)
        wr = os.fdopen(self.wr, 'w')
        text = "Hello from %s" % (self.child)
        print "C==>", text
        wr.write(text)
        wr.close()
        os._exit(os.EX_OK)

    def run(self):
        if self._child_pid:
            print "Child Process", self._child_pid, " already running."
        else:
            rd, wr = os.pipe()
            self.rd = rd
            self.wr = wr
            self._child_pid = os.fork()

            if not self._child_pid:
                self.run_child()
a = A()
i = 0
while True:
    a.run()
    time.sleep(4)
    i += 1
    if i > 5:
        break
Comments