Skip to content

decorators.pipeline_decorator

pipeline_decorator

Classes:

Name Description
Pipeline

Functions:

Name Description
pipeline

Decorator to create a pipeline.

Attributes:

Name Type Description
F

F = TypeVar('F', bound=Callable[..., Any]) module-attribute

Pipeline(context, name, log_folder_path, remove_logs_on_completion, entrypoint)

Parameters:

Name Type Description Default

context

Any

The context of the pipeline. This object can be used to store and share data between steps. It will be accessible by calling the method Pipeline.get_active_context().

required

name

str

The name of the pipeline.

required

log_folder_path

str | None

The path to the log folder. If left empty, a temporary folder will be created.

required

remove_logs_on_completion

bool

Whether to remove the logs on completion. Defaults to True.

required

entrypoint

F

The entrypoint of the pipeline. This is the function that will be called when the pipeline is run.

required

Methods:

Name Description
log_pipeline_info

Log the provided content inside the pipeline log file.

log_pipeline_warning

Log the provided content inside the pipeline log file.

register_active_step_metadata

Register the metadata of a step found during the pipeline scan.

flag_pipeline

Flags the pipeline with the provided state.

get_active_context

Get the context of the currently running pipeline.

register_step_metadata

Register a step metadata into the global steps' registry.

Attributes:

Name Type Description
ACTIVE_PIPELINE Optional[Pipeline]
STEPS_REGISTRY dict[str, StepMetadata]
name
logger_manager
remove_logs_on_completion
entrypoint
initialization_log_file_path str | None
state PipelineState

The state of the pipeline.

steps_metadata list[StepMetadata]

All the pipeline's steps' metadata.

ACTIVE_PIPELINE = None class-attribute instance-attribute

STEPS_REGISTRY = {} class-attribute instance-attribute

name = name instance-attribute

logger_manager = LoggerManager(pipeline_name=name, log_folder_root_path=log_folder_path) instance-attribute

remove_logs_on_completion = remove_logs_on_completion instance-attribute

entrypoint = entrypoint instance-attribute

initialization_log_file_path = None instance-attribute

state property

The state of the pipeline.

The pipeline's state is determined by the states of its steps. The pipeline can be in one of the following states:

  • PENDING: All the steps are pending.
  • RUNNING: At least one step is running, and no step has failed.
  • SUCCESS: All the steps have succeeded.
  • FAILED: At least one step has failed and no step has succeeded after it. By default, a step is skipped if a previous step has failed. This behavior can be changed by setting the continue_on_failure parameter to True when defining a step.
  • PARTIAL_SUCCESS: At least one step has succeeded after a failed step.

Returns:

Type Description
PipelineState

The state of the pipeline.

steps_metadata property

All the pipeline's steps' metadata.

Returns:

Type Description
list[StepMetadata]

All the pipeline's steps' metadata.

log_pipeline_info(log_content)

Log the provided content inside the pipeline log file.

Parameters:

Name Type Description Default
log_content
str

The content to log.

required

log_pipeline_warning(log_content)

Log the provided content inside the pipeline log file.

Parameters:

Name Type Description Default
log_content
str

The content to log.

required

register_active_step_metadata(step_metadata)

Register the metadata of a step found during the pipeline scan.

Parameters:

Name Type Description Default
step_metadata
StepMetadata

The metadata of the step to register.

required

flag_pipeline(state)

Flags the pipeline with the provided state.

Parameters:

Name Type Description Default
state
PipelineState

The state to flag the pipeline with.

required

get_active_context() staticmethod

Get the context of the currently running pipeline.

Returns:

Type Description
Any

The context of the currently running pipeline.

register_step_metadata(step_metadata) staticmethod

Register a step metadata into the global steps' registry.

This method is only used to register the steps' metadata within the global steps registry, typically when a step is defined. This registry will then be used during the pipeline scan to identify the steps used inside.

Parameters:

Name Type Description Default
step_metadata
StepMetadata

The metadata of the step to register.

required

pipeline(_func=None, context=None, name=None, log_folder_path=None, remove_logs_on_completion=True)

pipeline(_func: F) -> Pipeline
pipeline(*, context: Any | None = None, name: str | None = None, log_folder_path: str | None = None, remove_logs_on_completion: bool = True) -> Callable[[F], Pipeline]

Decorator to create a pipeline.

Parameters:

Name Type Description Default

_func

Optional[F]

The decorated function.

None

context

Any | None

The context of the pipeline. This object can be used to store and share data between steps. It will be accessible by calling the method Pipeline.get_active_context().

None

name

str | None

The name of the pipeline. If left empty, the name of the decorated function will be used as a fallback.

None

log_folder_path

str | None

The path to the log folder. If left empty, a temporary folder will be created.

None

remove_logs_on_completion

bool

Whether to remove the logs on completion. Defaults to True.

True

Returns:

Type Description
Union[Pipeline, Callable[[F], Pipeline]]

A pipeline instance.