Jeff Kaufman asks (see full blog post):
How do I write:
cmd -1 <(aws s3 cp "$path1" - | gunzip) -2 <(aws s3 cp "$path2" - | gunzip) | gzip | aws s3 cp - "$pathOut"
In python?
Process substitution is really powerful, and it’s is something I wish was easier in non-shell programming languages.
Update 2023-12-17
I said originally (see below) that
“the shell implicitly executes the first two aws s3 cp ...
commands in parallel
and then waits for both to finish before continuing the pipeline”,
but as Jeff points out,
this isn’t quite right.
The shell actually waits for both to start before continuing the pipeline.
I also learned a few more things exploring this.
- Every chunk that a substituted process emits can be processed immediately by the parent command.
- Similarly, every chunk that an earlier pipeline command emits can be processed immediately by a later command.
- A process substitution can contain multiple commands separated by semicolons, like a regular command line
To incorporate all of this I had to significantly rework my solution. It’s longer, but more correct, than my first version. It can also handle recursive layers of process substitutions, such that a process substitution in the outer command can itself contain process substitutions!
Click to show procsub_pipeline_2.py
#!/usr/bin/env python3
import logging
import pdb
import subprocess
import sys
import threading
import os
import traceback
import typing
def idb_excepthook(type, value, tb):
"""Call an interactive debugger in post-mortem mode
If you do "sys.excepthook = idb_excepthook", then an interactive debugger
will be spawned at an unhandled exception
"""
if hasattr(sys, "ps1") or not sys.stderr.isatty():
sys.__excepthook__(type, value, tb)
else:
traceback.print_exception(type, value, tb)
print
pdb.pm()
def ch_close_fd_factory(fds: list[int]):
"""Return a completion handler that closes file descriptors"""
def completion_handler():
for fd in fds:
os.close(fd)
return completion_handler
class Command:
"""A command to run in a pipeline, which may contain process substitutions
Analogous to a single command in a shell pipeline,
such as "ps -aux" or "tar zxf" or "cat <(echo hello) <(echo world)".
"""
def __init__(self, *arguments: list[typing.Union[str, "ProcessSubstitution"]]):
self.arguments: list[typing.Union[str, "ProcessSubstitution"]] = list(arguments)
self.process: subprocess.Popen | None = None
self.procsub_pipes: list[tuple[int, int]] = []
self.procsub_threads: list[threading.Thread] = []
def __repr__(self):
return f"Command({self.arguments})"
def run(self, popen_kwargs: dict | None = None, pass_fds: list[int] | None = None):
"""Run the command, handling process substitution
The .process property is set to the Popen object for the command.
Arguments:
pass_fds: A list of file descriptors to pass to subprocess.Popen at each stage
This will keep any pipes in this list open for the duration of the pipeline
"""
popen_kwargs = popen_kwargs or {}
pass_fds = pass_fds or []
processed_args: list[str] = []
for argument in self.arguments:
if isinstance(argument, ProcessSubstitution):
pipe = os.pipe()
self.procsub_pipes.append(pipe)
pass_fds.append(pipe[0])
procsub_kwargs = dict(
completion_handler=ch_close_fd_factory([pipe[1]]),
# pass_fds=pass_fds,
result_pipe=pipe[1],
)
thread = threading.Thread(target=argument.run, kwargs=procsub_kwargs)
thread.start()
self.procsub_threads.append(thread)
processed_args.append(f"/dev/fd/{pipe[0]}")
# If the argument isn't a pipeline, just treat it like a regular argument
else:
processed_args.append(argument)
# Create the Popen object for the command, with all process substitutions replaced with file descriptors
popen_kwargs["pass_fds"] = pass_fds
self.process = subprocess.Popen(processed_args, **popen_kwargs)
def close_read_pipes(self):
"""Close the read end of each pipe in this command
When a command has process substitutions, it opens a pipe for each one.
The write end of each pipe is closed in a completion handler passed to the ProcessSubstitution.
The read ends must be closed by the caller of Command.run() once the command has finished.
"""
for pipe in self.procsub_pipes:
os.close(pipe[0])
class Pipeline:
"""A pipeline of commands, where the STDOUT of each command is piped to the STDIN of the next command
This is analogous to `echo foo | cat | grep bar | ...` etc.
Each command's STDOUT is piped to the next command's STDIN.
"""
def __init__(self, *commands: list[Command]):
self.commands: list[Command] = list(commands)
def __repr__(self):
return f"Pipeline({self.commands})"
def run(self, pass_fds: list[int] | None = None, result_pipe: int | None = None):
"""Run a pipeline, passing the STDOUT of each stage to the STDIN of the next stage
Arguments:
pass_fds: A list of file descriptors to pass to subprocess.Popen at each stage
This will keep any pipes in this list open for the duration of the pipeline
result_pipe: The file descriptor to use for the last command in the pipeline
If None, it will go to the STDOUT of the current process
"""
pass_fds = pass_fds or []
previous_process = None
for idx, stage in enumerate(self.commands):
logging.debug(f"Running stage {idx} of pipeline {self.commands}")
stdin = previous_process.stdout if previous_process else None
stdout = subprocess.PIPE if idx != len(self.commands) - 1 else result_pipe
stage.run(popen_kwargs=dict(stdin=stdin, stdout=stdout, pass_fds=pass_fds))
previous_process = stage.process
# The processes are all running in the background now, and we can wait for them in the order they were started
# (If they finish successfully, they'll finish in order;
# if any fail, processes trying to write to it will receive a SIGPIPE and exit)
for stage in self.commands:
stage.process.wait()
stage.close_read_pipes()
# TODO: handle errors
class ProcessSubstitution:
"""A process substitution is a list of pipelines, executed in sequence.
Analogous in shell to `<(grep user /etc/passwd; ls /home | head -n1)`.
Each separate pipeline is concatenated together and passed to the command as a file descriptor.
"""
def __init__(self, *pipelines: list[Pipeline]):
self.pipelines: list[Pipeline] = list(pipelines)
def __repr__(self):
return f"ProcessSubstitution({self.pipelines})"
def run(
self,
pass_fds: list[int] | None = None,
result_pipe: os.PathLike | None = None,
completion_handler: typing.Callable | None = None,
):
"""Run a shell pipeline, passing the STDOUT of each stage to the STDIN of the next stage
Cannot handle process substitution.
Arguments:
pass_fds: A list of file descriptors to pass to subprocess.Popen at each stage
This will keep any pipes in this list open for the duration of the pipeline
completion_handler: A function to call once the pipeline has finished
"""
pass_fds = pass_fds or []
for pipeline in self.pipelines:
logging.debug(f"Running pipeline: {pipeline.commands}")
pipeline.run(pass_fds=pass_fds, result_pipe=result_pipe)
if completion_handler is not None:
completion_handler()
def main():
# Our test pipeline is intended to verify a few things:
# 1. Commands in a pipeline process data as it's emitted from the previous command --
# they don't wait for the previous command to finish before starting.
# 2. Process substitutions are run in parallel --
# two process substitutions that each take 3 seconds should take 3 seconds, not 6.
print("Testing pipeline 1")
print("Compare with a shell command like:")
print(
"cat <(printf foo; sleep 1; echo YYYbarYYY | sed -e 's/YYY//g') <(printf hello; sleep 3; echo XXXworldXXX | sed -e 's/XXX//g') | sed -e 's/$/EOL/'"
)
# You can time it like this, but note that time will not print the output until the pipeline is finished:
# time zsh -c "cat <(printf foo; sleep 1; echo YYYbarYYY | sed -e 's/YYY//g') <(printf hello; sleep 3; echo XXXworldXXX | sed -e 's/XXX//g') | sed -e 's/$/EOL/'"
Pipeline(
Command(
"cat",
ProcessSubstitution(
Pipeline(Command("printf", "foo")),
Pipeline(Command("sleep", "1")),
Pipeline(
Command("echo", "YYYbarYYY"), Command("sed", "-e", "s/YYY//g")
),
),
ProcessSubstitution(
Pipeline(Command("printf", "hello")),
Pipeline(Command("sleep", "3")),
Pipeline(
Command("echo", "XXXworldXXX"), Command("sed", "-e", "s/XXX//g")
),
),
),
Command("sed", "-e", "s/$/EOL/"),
).run()
print("Testing pipeline 2")
print("Compare with a shell command like:")
print("paste <(echo a; sleep 3; echo b) <(echo a; sleep 3; echo b)")
Pipeline(
Command(
"paste",
ProcessSubstitution(
Pipeline(Command("echo", "a")),
Pipeline(Command("sleep", "3")),
Pipeline(Command("echo", "b")),
),
ProcessSubstitution(
Pipeline(Command("echo", "a")),
Pipeline(Command("sleep", "3")),
Pipeline(Command("echo", "b")),
),
)
).run()
print("Testing pipeline 3")
print("Compare with a shell command like:")
print("cat <(cat <(echo one) <(echo two)) <(cat <(echo three) <(echo four))")
Pipeline(
Command(
"cat",
ProcessSubstitution(
Pipeline(
Command(
"cat",
ProcessSubstitution(Pipeline(Command("echo", "one"))),
ProcessSubstitution(Pipeline(Command("echo", "two"))),
)
)
),
ProcessSubstitution(
Pipeline(
Command(
"cat",
ProcessSubstitution(Pipeline(Command("echo", "three"))),
ProcessSubstitution(Pipeline(Command("echo", "four"))),
)
)
),
)
).run()
if __name__ == "__main__":
# logging.basicConfig(level=logging.DEBUG)
sys.excepthook = idb_excepthook
main()
When I run it, I get this result:
> ./procsub_pipeline_2.py
Testing pipeline 1
Compare with a shell command like:
cat <(printf foo; sleep 1; echo YYYbarYYY | sed -e 's/YYY//g') <(printf hello; sleep 3; echo XXXworldXXX | sed -e 's/XXX//g') | sed -e 's/$/EOL/'
foobarEOL
helloworldEOL
Testing pipeline 2
Compare with a shell command like:
paste <(echo a; sleep 3; echo b) <(echo a; sleep 3; echo b)
a a
b b
Testing pipeline 3
Compare with a shell command like:
cat <(cat <(echo one) <(echo two)) <(cat <(echo three) <(echo four))
one
two
three
four
Original first post
Something tricky about this problem is that
the shell implicitly executes the first two aws s3 cp ...
commands in parallel
and then waits for both to finish before continuing the pipeline.
Replicating this in Python takes some work.
My solution
Click to show procsub_pipeline.py
#!/usr/bin/env python3
import logging
import pdb
import subprocess
import sys
import threading
import os
import traceback
import typing
def idb_excepthook(type, value, tb):
"""Call an interactive debugger in post-mortem mode
If you do "sys.excepthook = idb_excepthook", then an interactive debugger
will be spawned at an unhandled exception
"""
if hasattr(sys, "ps1") or not sys.stderr.isatty():
sys.__excepthook__(type, value, tb)
else:
traceback.print_exception(type, value, tb)
print
pdb.pm()
class Command:
"""A command to run in a pipeline, which may contain process substitutions
Process substitutions are represented as a Pipeline object.
"""
def __init__(self, *arguments: list[typing.Union[str, "Pipeline"]]):
self.arguments: list[typing.Union[str, "Pipeline"]] = list(arguments)
class Pipeline:
"""A pipeline of commands, where the STDOUT of each command is piped to the STDIN of the next command"""
def __init__(self, *commands: list[Command]):
self.commands: list[Command] = list(commands)
def run_basic_pipeline(
pipeline: Pipeline,
result_pipe: os.PathLike | None = None,
start_event: threading.Event | None = None,
pass_fds: list[int] | None = None,
):
"""Run a shell pipeline, passing the STDOUT of each stage to the STDIN of the next stage
Cannot handle process substitution.
Arguments:
pipeline: A pipeline to run
The pipeline cannot contain any process substitutions
result_pipe: The file descriptor for write end of the os.pipe() to use for the last command in the pipeline
If None, it will go to the STDOUT of the current process
start_event: An event to set once the pipeline has started (optional, for caller's use)
pass_fds: A list of file descriptors to pass to subprocess.Popen at each stage
This will keep any pipes in this list open for the duration of the pipeline
"""
pass_fds = pass_fds or []
# The input to the first command in the pipeline is empty
stdin_content = b""
for idx, stage in enumerate(pipeline.commands):
logging.debug(f"Running stage: {stage.arguments}")
# The stdin= argument should be a new pipe for all but the first command in the pipeline
stdin_arg = subprocess.PIPE if idx != 0 else None
# The stdout= argument should be a new pipe for all but the last command in the pipeline,
# which should use the result_pipe
stdout_arg = (
subprocess.PIPE if idx != len(pipeline.commands) - 1 else result_pipe
)
process = subprocess.Popen(
stage.arguments, stdin=stdin_arg, stdout=stdout_arg, pass_fds=pass_fds
)
stdout, stderr = process.communicate(input=stdin_content)
stdin_content = stdout
if start_event is not None:
start_event.set()
def run_procsub_pipeline(pipeline: Pipeline):
"""Run a pipeline, and use process substitution to run any sub-pipelines"""
# We will process the input pipeline and convert any Pipeline objects into process substitutions
processed_pipeline = Pipeline()
# Start a thread for each process substitution
threads: list[threading.Thread] = []
# We keep a start event for each process substitution so we can close the write end of each pipe once all threads have started
start_events: list[threading.Event] = []
# os.pipe() returns a tuple of two int file descriptors: read and write
pipes: list[tuple[int, int]] = []
for stage in pipeline.commands:
processed_stage = Command()
for argument in stage.arguments:
# If the argument is a pipeline, treat it as a process substitution --
# run the pipeline and pass the write end of the pipe to the command as an argument.
# E.g. if the command is `cat <(echo hello) <(echo world)`,
# then we'll run the two 'echo' commands and then something like `cat /dev/fd/3 /dev/fd/4`.
if isinstance(argument, Pipeline):
pipe = os.pipe()
pipes.append(pipe)
start_event = threading.Event()
start_events.append(start_event)
thread = threading.Thread(
target=run_basic_pipeline,
args=(argument, pipe[1], start_event, [pipe[0]]),
)
thread.start()
threads.append(thread)
processed_stage.arguments.append(f"/dev/fd/{pipe[0]}")
# If the argument isn't a pipeline, just treat it like a regular argument
else:
processed_stage.arguments.append(argument)
processed_pipeline.commands.append(processed_stage)
# Wait for all process substitutions to start
for start_event in start_events:
start_event.wait()
# Once they all have been started, we can close the write end of the pipes
for pipe in pipes:
os.close(pipe[1])
# Run the processed pipeline
# Keep the read ends of the pipes open until the pipeline is finished
run_basic_pipeline(processed_pipeline, pass_fds=[pipe[0] for pipe in pipes])
# Wait for all process substitutions to finish
# We wait for substitutions for all stages to finish before running the pipeline because that's easier;
# we could optimize this to run each stage of the pipeline as soon as it's ready.
for thread in threads:
thread.join()
# Close the read ends of the pipes
for pipe in pipes:
os.close(pipe[0])
def main():
run_procsub_pipeline(
Pipeline(
Command(
"cat",
Pipeline(Command("/bin/sh", "-c", "sleep 1; echo hello")),
Pipeline(
Command("/bin/sh", "-c", "sleep 3; echo XXXworldXXX"),
Command("sed", "-e", "s/XXX//g"),
),
),
Command("hexdump", "-C"),
)
)
# Compare with a command like:
# time zsh -c "cat <(sh -c 'sleep 1; echo hello') <(sh -c 'sleep 3; echo XXXworldXXX' | sed -e 's/XXX//g') | hexdump -C"
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
sys.excepthook = idb_excepthook
main()
I don’t have his cmd
or S3 contents, so I use the following shell command
as my reference for a pipeline that contains process substitution pipelines.
You can time(1)
this command to check that the sleep(1)
commands actually execute in parallel.
cat <(sh -c 'sleep 1; echo hello') <(sh -c 'sleep 3; echo XXXworldXXX' | sed -e 's/XXX//g') | hexdump -C
When I time(1)
that, I get a result like:
> time zsh -c "cat <(sh -c 'sleep 1; echo hello') <(sh -c 'sleep 3; echo XXXworldXXX' | sed -e 's/XXX//g') | hexdump -C"
00000000 68 65 6c 6c 6f 0a 77 6f 72 6c 64 0a |hello.world.|
0000000c
zsh -c 0.00s user 0.01s system 0% cpu 3.031 total
With my code, that short shell one-liner turns into this:
run_procsub_pipeline(
Pipeline(
Command(
"cat",
Pipeline(Command("/bin/sh", "-c", "sleep 1; echo hello")),
Pipeline(
Command("/bin/sh", "-c", "sleep 3; echo XXXworldXXX"),
Command("sed", "-e", "s/XXX//g"),
),
),
Command("hexdump", "-C"),
)
)
Update: it turns out that hexdump
is not a very good test of pipelines,
because it will not print anything until its input cat
command has completed.
This is why in the update above I eschew hexdump
for sed
.
When I time(1)
my script containing that, I get output like this:
> time ./procsub_pipeline.py
DEBUG:root:Running stage: ['/bin/sh', '-c', 'sleep 1; echo hello']
DEBUG:root:Running stage: ['/bin/sh', '-c', 'sleep 3; echo XXXworldXXX']
DEBUG:root:Running stage: ['sed', '-e', 's/XXX//g']
DEBUG:root:Running stage: ['cat', '/dev/fd/3', '/dev/fd/5']
DEBUG:root:Running stage: ['hexdump', '-C']
00000000 68 65 6c 6c 6f 0a 77 6f 72 6c 64 0a |hello.world.|
0000000c
./procsub_pipeline.py 0.03s user 0.02s system 1% cpu 3.079 total
Which, aside from the DEBUG messages, matches what the shell version does. Cool!
Previous work
I’ve written some code trying to emulate a pipeline in Python before, without attempting to add process substitution. It might be useful as a pattern to copy and paste into code that needs a pipeline pattern. I think the code is kind of messy and I wouldn’t write it this way today.
Click to show simple_pipeline.py
import logging
import subprocess
logger = logging.getLogger(__name__)
def pipe(arg_kwarg_list):
"""Construct a shell pipeline
Invokes the first command in the arglist, retrieves its STDOUT, passes that to the STDIN of the
next command in the arglist, and so on.
Logs each command, including its STDIN, STDOUT, and STDERR.
arg_kwarg_list: A list of (command, kwargs) tuples
command: A list to pass to subprocess.Popen
kwargs: Any keyword arguments to subprocess.Popen
result: The STDOUT of the final command
Example:
# Call:
pipe([
(['ls', '-1'], {'cwd': '/'}),
(['head', '-n', '2'], {}), # Can pass an empty dict...
(['grep', 'p'],) # ... or make a one-item tuple with a trailing comma
])
# Result (on my Mac):
Applications
"""
first = True
stdin = b""
for argtuple in arg_kwarg_list:
if len(argtuple) < 1:
raise Exception("Found empty tuple")
if len(argtuple) > 2:
raise Exception(f"Found tuple with {len(argtuple)} elements")
command = argtuple[0]
kwargs = argtuple[1] if len(argtuple) == 2 else {}
kwargs["stdout"] = subprocess.PIPE
kwargs["stderr"] = subprocess.PIPE
if not first:
kwargs["stdin"] = subprocess.PIPE
first = False
process = subprocess.Popen(command, **kwargs)
stdout, stderr = process.communicate(input=stdin)
# Don't log stdin/stdout because it may contain binary
logger.debug(
f"Popen call {command} with keyword arguments {kwargs} "
f"exited with code {process.returncode} "
# f"with a stdin of '{stdin}' "
# f"and with a stdout of '{stdout}' "
f"and with a stderr of '{stderr.decode()}'"
)
if process.returncode != 0:
raise subprocess.CalledProcessError(
process.returncode, command, output=stdout, stderr=stderr
)
stdin = stdout
return stdout