🖥️ CLI
Pipewine provides a command-line interface (CLI) that allows you to perform various operations on your data without writing any code. The CLI is built using Typer, a library that makes it easy to create command-line interfaces in Python.
Usage
To use the CLI, all you need to do is install Pipewine and run the pipewine
command in your terminal. This will display a list of available commands and options.
Equivalently, you can run the command with the --help
flag to display the same information.
The --help
flag can also be used with any command to display information about that specific command.
To display the version of the current Pipewine installation, you can use the --version
flag.
Mappers and Operators
To apply an operator to your data, you can use the op
command followed by the operator name and the required arguments. You can get a full list of available operators using the --help
flag with the op
command.
Similarly, for mapper commands, you can use the map
command followed by the mapper name and the required arguments. You can get a full list of available mappers using the --help
flag with the map
command.
Options differ between each operator, so you should use the --help
flag with the operator name to get more information about the required arguments. E.g., to get more information about the repeat
operator, you can run the following command:
Example
Suppose you want to repeat a dataset 10 times, you can do so using the repeat
operator. The following command will repeat the dataset my_dataset
10 times and save the result to my_repeated_dataset
:
Success
When you successfully run a command, you will see a message indicating that the operation was successful.
Failure
If an error occurs during the execution of a command, you will see an error message indicating what went wrong.
╭─ Workflow Status ─────────────────────────────────────────────────────────────────────────────────────────────────╮
│ Workflow failed. │
│ │
│ Traceback (most recent call last): │
│ File "/home/luca/repos/pipewine/pipewine/cli/utils.py", line 168, in run_cli_workflow │
│ run_workflow(workflow, tracker=CursesTracker() if tui else None) │
│ File "/home/luca/repos/pipewine/pipewine/workflows/__init__.py", line 73, in run_workflow │
│ raise e │
│ File "/home/luca/repos/pipewine/pipewine/workflows/__init__.py", line 70, in run_workflow │
│ executor.execute(workflow) │
│ File "/home/luca/repos/pipewine/pipewine/workflows/execution.py", line 273, in execute │
│ self._execute_node(workflow, node, state, id_.hex, wf_opts) │
│ File "/home/luca/repos/pipewine/pipewine/workflows/execution.py", line 167, in _execute_node │
│ output = action(input_) │
│ ^^^^^^^^^^^^^^ │
│ File "/home/luca/repos/pipewine/pipewine/sinks/underfolder.py", line 129, in __call__ │
│ raise FileExistsError( │
│ FileExistsError: Folder /tmp/output already exists and policy OverwritePolicy.FORBID is used. Either change the │
│ destination path or set a weaker policy. │
│ │
│ Started: 2025-02-22 11:40:42 │
│ Finished: 2025-02-22 11:40:42 │
│ Total duration: 0:00:00.104007 │
╰───────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
Both the op
and map
commands support changing the input and output formats using the --input-format
(-I
) and --output-format
(-O
) options. You can print the
available input and output formats using the --format-help
flag.
For convenience, this flag will also print all the recognized file extensions that can be parsed by Pipewine items.
By default, the input and output formats are set to underfolder
. When using an underfolder as an output format, you can optionally specify, alongside the output path, the overwrite policy and copy policy to use.
By default, the overwrite policy is set to forbid
and the copy policy is set to hard_link
.
Example
Example, suppose we want to repeat a dataset 10 times and save the result to an underfolder named my_repeated_dataset
that may already exist. We can do so using the following command:
Suppose we also want to change the copy policy and instead of hard linking the files, we want a full copy. We can do so using the following command:
You can also use a custom Grabber
to run the command using multi-processing parallelism. To do so, you can use the --grabber
(-g
) option followed by the number of processes to use and optionally the chunk size.
Example
By default, commands report progress in a TUI interface. You can disable this behavior by using the --no-tui
flag.
Workflows
Pipewine also supports running workflows from the CLI. You can use the wf
command followed by the workflow name and the required arguments. You can get a full list of available workflows using the --help
flag with the wf
command.
Workflows can optionally be drawn using the --draw [path]
option. This will disable the workflow execution and instead draw the workflow to the specified path.
Extension
Pipewine CLI is designed to be easily extensible, similarly to the old Pipelime CLI, by specifying a list of custom modules to load dynamically. These modules can be loaded using the --module
(-m
) option followed by the module name.
Extension modules can be provided as a path to a Python file or a module path.
Warning
When loading an extension module from a python file, multiprocessing is disabled due to a limitation of the multiprocessing
module.
This issue was already present in the old Pipelime package and is due to the fact that the multiprocessing
module fails to correctly modules that are imported dynamically from a single file, without a parent package.
Multiprocessing should work correctly when loading modules from a package.
Adding custom operators
To allow operators defined in python packages (or scripts) other than the pipewine
package, all you need to do is create a function that creates the operator object decorate it with the op_cli
decorator.
The decorated function is parsed by Typer
and its signature is used to generate the CLI interface. You can use any type hints supported by Typer
to specify the operator arguments, if any.
Do not worry about the input and output parameters, as Pipewine
will automatically add them to the function signature dynamically, based on the type of operator returned by your function.
If, for some reason, you want to give a custom name to the command, you can use the name
argument of the op_cli
decorator.
Example
This is how the repeat
operator is defined in the pipewine
package:
@op_cli()
def repeat(
times: Annotated[int, Option(..., "--times", "-t")],
interleave: Annotated[bool, Option(..., "--interleave", "-I")] = False,
) -> RepeatOp:
"""Repeat a dataset N times replicating the samples."""
return RepeatOp(times, interleave=interleave)
Equivalently, if you don't like the Annotated
syntax you can use the old (deprecated) Option
default value syntax:
@op_cli()
def repeat(
times: int = Option(..., "--times", "-t"),
interleave: bool = Option(False, "--interleave", "-I"),
) -> RepeatOp:
"""Repeat a dataset N times replicating the samples."""
return RepeatOp(times, interleave=interleave)
If you are rapid-protyping and don't want to write the help strings, you can avoid
using Option
altogether and just use the default values:
Note
It is crucial that the function return type is correctly annotated with the type of operator that it returns. This is used by Pipewine
to correctly parse the operator and its arguments, and to generate the correct CLI interface.
If the command has some eager behavior that can be parallelized using a Grabber
, you can add a grabber: Grabber
positional argument to the function signature. Pipewine will automatically recognize this argument and pass a Grabber
object to the function when it is called.
Example
This is how the sort
operator, which has some eager computation that can be parallelized, is defined in the pipewine
package:
Adding custom mappers
Adding custom mappers is very similar to adding custom operators. The only difference is that you need to use the map_cli
decorator instead of the op_cli
decorator, and of course, the function should return a Mapper
object instead of an Operator
object.
Everything else is the same, including the possibility to specify a custom name for the command using the name
argument of the map_cli
decorator.
Example
This is how the filter-keys
mapper is defined in the pipewine
package:
Adding custom data formats
Currently, the Pipewine CLI only supports the underfolder
format for both input and output. However, you can easily register custom DatasetSource
and DatasetSink
classes to support additional data formats.
This can be done using the source_cli
and sink_cli
decorators, respectively.
To add a custom input format, you can use the source_cli
decorator on a function that
accepts:
- A
str
argument containing some text that must be parsed into aDatasetSource
object. - A
Grabber
argument that may be used by the dataset source, in case it requires to do some eager computation that can run in parallel. - A
type[Sample]
argument with the concrete type of the samples that the dataset source will return.
To add a custom output format, you can use the sink_cli
decorator on a function that
accepts:
- A
str
argument containing some text that must be parsed into aDatasetSink
object. - A
Grabber
argument that may be used by the dataset sink, in case it can parallelize the writing of the dataset.
(The sample type is not needed for the output format, as the dataset sink can simply infer it from the samples that it receives.)
You can use the docstring of the function to specify the help message that will be displayed with the --format-help
flag.
You can also specify a custom name for the input/output using the name
argument of the source_cli
or sink_cli
decorator.
Note
Input and output formats are not Typer commands since they cannot be called directly from the CLI. Instead, they are only used to parse the input/output text according to the format specified by the user.
Hence, the function signature should not include typer-specific annotations, such as Option
or Argument
.
Example
This is how the underfolder
format is registered to the CLI:
@source_cli()
def underfolder(
text: str, grabber: Grabber, sample_type: type[Sample]
) -> UnderfolderSource:
"""PATH: Path to the dataset folder."""
return UnderfolderSource(Path(text), sample_type=sample_type)
@sink_cli()
def underfolder(text: str, grabber: Grabber) -> UnderfolderSink:
"""PATH[,OVERWRITE=forbid[,COPY_POLICY=hard_link]]"""
path, ow_policy, copy_policy = _split_and_parse_underfolder_text(text)
return UnderfolderSink(
Path(path), grabber=grabber, overwrite_policy=ow_policy, copy_policy=copy_policy
)
Adding custom workflows
Adding custom workflows is very similar to adding custom operators and mappers, with the following differences:
- You need to use the
wf_cli
decorator instead. - The function must return a
Workflow
object. - Pipewine will not automatically add the input and output parameters to the function, so you need to specify them explicitly in the function signature.
Example
Example workflow:
@wf_cli(name="example")
def example(
input: Annotated[Path, Option(..., "-i", "--input", help="Input folder.")],
output: Annotated[Path, Option(..., "-o", "--output", help="Output folder.")],
repeat_n: Annotated[
int, Option(..., "-r", "--repeat", help="Repeat n times.")
] = 100,
workers: Annotated[int, Option(..., "-w", "--workers", help="Num workers.")] = 0,
) -> Workflow:
grabber = Grabber(workers, 50)
wf = Workflow(WfOptions(checkpoint_grabber=grabber))
data = wf.node(UnderfolderSource(input, sample_type=LetterSample))()
data = wf.node(RepeatOp(repeat_n))(data)
data = wf.node(MapOp(ColorJitter()), options=WfOptions(checkpoint=True))(data)
groups = wf.node(GroupByOp(group_fn, grabber=None))(data)
vowels = groups["vowel"]
consonants = groups["consonant"]
vowels = wf.node(SortOp(sort_fn, grabber=None))(vowels)
consonants = wf.node(SortOp(sort_fn, grabber=None))(consonants)
data = wf.node(CatOp())([vowels, consonants])
wf.node(UnderfolderSink(output, grabber=grabber))(data)
return wf