Source code for sotastream.utils.split

#!/usr/bin/env python3

import datetime
import gzip
import hashlib
import logging
import os
import shutil
import subprocess
import time

from pathlib import Path
from typing import Type

from sotastream.pipelines import PIPELINES


logger = logging.getLogger(f"sotastream")


# The block size to use when compute MD5 hashes
MD5_BLOCK_SIZE = 8192


[docs] def split_file_into_chunks( filepath: str, tmpdir: str = "/tmp/sotastream", split_size: int = 10000, native: bool = False, overwrite: bool = False, ) -> Path: """ Splits a file into compressed chunks under a directory. The location will be in a directory named by the file's checksum, within the provided temporary directory. Results are cached, providing for quick restarting. :param filepath: The input file path :param tmpdir: The top-level temporary directory to write to :param split_size: The size of each chunk in lines :param native: If True, use Python to split, instead of a subshell :return: The directory where the chunks are stored, as a Path object """ start_time = time.perf_counter() split_func = split_native if native else split_subshell # Compute the checksum md5sum = compute_md5(filepath) logger.info(f"md5sum({filepath}) = {md5sum} computed in {time.perf_counter() - start_time:.1f}s") # Check if we already have the file split destdir = Path(tmpdir) / md5sum donefile = destdir / ".done" if destdir.exists() and overwrite: logger.info(f"Removing existing split directory {destdir}") shutil.rmtree(destdir) elif donefile.exists(): logger.info(f"Using cached splitting of {filepath} (checksum: {md5sum})") return destdir # If not, split the file logger.info(f"Splitting file {filepath} to {tmpdir}...") destdir.mkdir(parents=True, exist_ok=True) start_time = time.perf_counter() split_func(filepath, destdir, split_size) logger.info(f"File {filepath} splitting took {time.perf_counter() - start_time:.1f}s") with open(donefile, "w") as outfh: print(f"{filepath} finished splitting {datetime.datetime.now()}", file=outfh) return destdir
[docs] def split_native(filepath: str, destdir: Path, split_size: int): """ Split directly in Python by reading the file. This version is slower than the subshell version. :param filepath: The input file path :param destdir: The output directory :param split_size: The size of each chunk in lines """ def get_chunkpath(index=0): outfh = smart_open(destdir / f"part.{index:05d}.gz", "wt") index += 1 return index, outfh with smart_open(filepath) as infh: chunkno, outfh = get_chunkpath() logger.info(f"Splitting {filepath} to {destdir}") for lineno, line in enumerate(infh, 1): line = line.rstrip("\r\n") if lineno % split_size == 0: if outfh is not None: outfh.close() chunkno, outfh = get_chunkpath(chunkno) print(line, file=outfh) outfh.close()
[docs] def split_subshell(filepath: str, destdir: Path, split_size: int): """ Split using a subshell (~8x faster). :param filepath: The input file path :param destdir: The output directory :param split_size: The size of each chunk in lines """ cmd = f"pigz -cd {filepath} | sed 's/\r//g' | split -d -a5 -l {split_size} --filter 'pigz > $FILE.gz' - {destdir}/part." logger.info(cmd) subprocess.run(cmd, shell=True, check=True)
[docs] def smart_open(filepath: str, mode: str = "rt", encoding: str = "utf-8"): """Convenience function for reading and writing compressed or plain text files. :param filepath: The file to read. :param mode: The file mode (read, write). :param encoding: The file encoding. :return: a file handle. """ if Path(filepath).suffix == ".gz": return gzip.open(filepath, mode=mode, encoding=encoding, newline="\n") return open(filepath, mode=mode, encoding=encoding, newline="\n")
[docs] def compute_md5(filepath: str): """Computes an MD5 checksum over a file. Note that binary reading in this way is as fast as a subshell call. :param filepath: The file path as as string :return: The checksum as a hexdigest. """ with open(filepath, "rb") as f: m = hashlib.md5() while chunk := f.read(MD5_BLOCK_SIZE): m.update(chunk) return m.hexdigest()
if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument( "infile", type=str, help="Path to TSV input file containing source, target, and (optionally) docid fields", ) parser.add_argument("--numlines", "-l", type=int, default=10000) parser.add_argument("--prefix-dir", "-p", default="/tmp/sotastream") args = parser.parse_args() logger.basicConfig(level=logging.INFO) split_file_into_chunks(args.infile, tmpdir=args.prefix_dir, split_size=args.numlines)