sotastream.pipelines.base module

class sotastream.pipelines.base.DocumentPipeline(**kwargs)[source]

Bases: Pipeline

Extends Pipeline base with document-level CLI args.

classmethod add_cli_args(parser)[source]

Add document-specific arguments.

class sotastream.pipelines.base.Pipeline(**kwargs)[source]

Bases: ABC

Pipeline base class

  • To add a pipeline, extend this class and add @pipeline(“pipeline_name”) decorator

  • CLI arguments
    • To specify the data source names for argparse, override get_data_sources_for_argparse class method.

    • To specify default mixing weights for argparse, override get_data_sources_default_weights class method.

    • To specify any other CLI arguments override add_cli_args class method.

    • Don’t forget the @classmethod decorator and cls as first argument.

    • The filename should match the pattern {name}_pipeline.py, where {name} is the name used in the decorator.

  • All CLI arguments are passed as arguments to constrctor.

  • Refer to default.py or t1_pipeline.py for example pipelines.

classmethod add_cli_args(parser)[source]

Add CLI arguments to pipeline specific subparser. These arguments are shared across all pipelines and appear after the pipeline name in the CLI. For global args that appear before the pipeline name, see sotastream.cli.add_cli_args

static create(name: str, *args, **kwargs)[source]

Create an instance of Pipeline for a given pipeline name

create_data_stream(data_path, processor: ~typing.Callable = <function UTF8File>, buffer_size: int = None, ext: str = '.gz')[source]

Wrapper around data source creation to allow for easy overriding in subclasses.

The worker ID and number of workers is passed to the DataSource class, which uses them to select the subset of shards this process will have access to.

Parameters:
  • data_path – Path to data source

  • processor – Augmentor processor function to apply to each chunk

  • buffer_size – The buffer size to use

  • ext – The extension of the data source

classmethod get_data_sources_default_weights() List[float][source]

A list of floats corresponding to the number of data sources and specifying the mixture weights among them. These will be provided to the argparse subcommand as the default values for the –mix-weights argument. To get the actual instantiated values, use self.mix_weights. The function is named in an overly explicit way to avoid confusion between these two sources.

classmethod get_data_sources_for_argparse() List[Tuple[str, str]][source]

This returns a list of (name, description) pairs for each data source. This is used to instantiate the argparse subcommand with named positional arguments. These are not the actual instantiated data paths; for that, each class has The function name is quite verbose in order to minimize confusion.

Returns:

List[Tuple]: List of (name, description)