Skip to content

Text Extractor Package

Text Extractor package - Professional text extraction from multiple file formats.

Modules:

Name Description
aio

Asynchronous extraction logic package.

core

Core components for textxtract package.

exceptions
handlers

File type-specific handlers package.

sync

Synchronous extraction logic package.

Classes:

Name Description
AsyncTextExtractor

Asynchronous text extractor with support for file paths and bytes.

ExtractorConfig

Enhanced configuration options for text extraction with validation.

SyncTextExtractor

Synchronous text extractor with support for file paths and bytes.

Attributes

__all__ module-attribute

__all__ = ['SyncTextExtractor', 'AsyncTextExtractor', 'ExtractorConfig']

__version__ module-attribute

__version__ = '0.2.0'

Classes

AsyncTextExtractor

Bases: TextExtractor

Asynchronous text extractor with support for file paths and bytes.

Provides asynchronous text extraction from various file types. Logs debug and info level messages for tracing and diagnostics. Uses thread pool for I/O-bound operations.

Methods:

Name Description
__aenter__

Async context manager entry.

__aexit__

Async context manager exit with cleanup.

__enter__

Context manager entry.

__exit__

Context manager exit with cleanup.

__init__
extract

Extract text asynchronously from file path or bytes using thread pool.

Attributes:

Name Type Description
config
Source code in textxtract/aio/extractor.py
class AsyncTextExtractor(TextExtractor):
    """
    Asynchronous text extractor with support for file paths and bytes.

    Provides asynchronous text extraction from various file types.
    Logs debug and info level messages for tracing and diagnostics.
    Uses thread pool for I/O-bound operations.
    """

    def __init__(
        self,
        config: Optional[ExtractorConfig] = None,
        max_workers: Optional[int] = None,
    ):
        self.config = config or ExtractorConfig()
        self._executor = ThreadPoolExecutor(max_workers=max_workers)
        self._closed = False
        logger.debug(
            "AsyncTextExtractor initialized with config: %s",
            self.config.__dict__,
        )

    async def extract(
        self,
        source: Union[Path, str, bytes],
        filename: Optional[str] = None,
        config: Optional[dict] = None,
    ) -> str:
        """
        Extract text asynchronously from file path or bytes using thread pool.

        Args:
            source: File path (Path/str) or file bytes
            filename: Required if source is bytes, optional for file paths
            config: Optional configuration overrides

        Returns:
            str: Extracted text.

        Raises:
            ValueError: If filename is missing when source is bytes
            FileTypeNotSupportedError: If the file extension is not supported.
            ExtractionError: If extraction fails.
            InvalidFileError: If the file is invalid or corrupted.
        """
        if getattr(self, "_closed", False):
            raise RuntimeError("Extractor has been closed")

        # Get file info for logging
        file_info = get_file_info(source, filename)
        logger.debug("Processing file: %s", file_info)

        # Prepare file path (create temp file if needed)
        file_path, temp_path = await self._prepare_file_path_async(
            source, filename, config
        )

        try:
            # Validate file extension
            suffix = file_info.extension
            if not suffix:
                raise FileTypeNotSupportedError(
                    f"File has no extension: {file_info.filename}"
                )

            logger.debug("Detected file suffix: %s", suffix)

            # Get handler
            handler = registry.get_handler(suffix)
            handler_name = handler.__class__.__name__

            logger.info(
                "Using handler %s for file %s (size: %s MB, temp: %s)",
                handler_name,
                file_info.filename,
                file_info.size_mb,
                file_info.is_temp,
            )

            # Extract text asynchronously
            try:
                loop = asyncio.get_running_loop()
                result = await loop.run_in_executor(
                    self._executor,
                    handler.extract,
                    file_path,
                    config or self.config.__dict__,
                )
            except Exception as e:
                logger.error(
                    "Extraction failed for file %s (handler: %s): %s",
                    file_info.filename,
                    handler_name,
                    e,
                )

                # Re-raise custom extraction errors
                if isinstance(e, ExtractionError):
                    raise
                # Wrap known invalid file errors
                if isinstance(e, (ValueError, OSError)):
                    raise InvalidFileError(
                        f"Invalid file: {file_info.filename} (handler: {handler_name}, error: {e})"
                    ) from e
                # Wrap as general extraction error
                raise ExtractionError(
                    f"Extraction failed for file {file_info.filename} using {handler_name}: {e}"
                ) from e

            logger.info(
                "Extraction successful for file %s (extracted %d characters)",
                file_info.filename,
                len(result),
            )
            return result

        finally:
            # Clean up temporary file if created
            if temp_path:
                safe_unlink(temp_path)
                logger.debug("Temporary file %s deleted", temp_path)

    async def _prepare_file_path_async(
        self,
        source: Union[Path, str, bytes],
        filename: Optional[str],
        config: Optional[dict],
    ) -> tuple[Path, Optional[Path]]:
        """
        Prepare file path for extraction asynchronously.

        Returns:
            tuple: (file_path, temp_path_if_created)
        """
        if isinstance(source, bytes):
            # Handle bytes input - create temporary file
            if not filename:
                raise ValueError("filename is required when source is bytes")

            # Run temp file creation in thread pool to avoid blocking
            loop = asyncio.get_running_loop()
            temp_path = await loop.run_in_executor(
                self._executor,
                create_temp_file,
                source,
                filename,
                config and config.get("max_file_size"),
            )
            logger.debug(
                "Temporary file created at %s for filename %s", temp_path, filename
            )
            return temp_path, temp_path
        else:
            # Handle file path input
            file_path = Path(source)
            if not file_path.exists():
                raise InvalidFileError(f"File not found: {file_path}")
            if not file_path.is_file():
                raise InvalidFileError(f"Path is not a file: {file_path}")

            logger.debug("Using existing file: %s", file_path)
            return file_path, None

    def __enter__(self):
        """Context manager entry."""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit with cleanup."""
        if hasattr(self, "_executor"):
            self._executor.shutdown(wait=False)
        self._closed = True

    async def __aenter__(self):
        """Async context manager entry."""
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit with cleanup."""
        if hasattr(self, "_executor"):
            self._executor.shutdown(wait=False)
        self._closed = True

Attributes

config instance-attribute
config = config or ExtractorConfig()

Functions

__aenter__ async
__aenter__()

Async context manager entry.

Source code in textxtract/aio/extractor.py
async def __aenter__(self):
    """Async context manager entry."""
    return self
__aexit__ async
__aexit__(exc_type, exc_val, exc_tb)

Async context manager exit with cleanup.

Source code in textxtract/aio/extractor.py
async def __aexit__(self, exc_type, exc_val, exc_tb):
    """Async context manager exit with cleanup."""
    if hasattr(self, "_executor"):
        self._executor.shutdown(wait=False)
    self._closed = True
__enter__
__enter__()

Context manager entry.

Source code in textxtract/aio/extractor.py
def __enter__(self):
    """Context manager entry."""
    return self
__exit__
__exit__(exc_type, exc_val, exc_tb)

Context manager exit with cleanup.

Source code in textxtract/aio/extractor.py
def __exit__(self, exc_type, exc_val, exc_tb):
    """Context manager exit with cleanup."""
    if hasattr(self, "_executor"):
        self._executor.shutdown(wait=False)
    self._closed = True
__init__
__init__(config=None, max_workers=None)
Source code in textxtract/aio/extractor.py
def __init__(
    self,
    config: Optional[ExtractorConfig] = None,
    max_workers: Optional[int] = None,
):
    self.config = config or ExtractorConfig()
    self._executor = ThreadPoolExecutor(max_workers=max_workers)
    self._closed = False
    logger.debug(
        "AsyncTextExtractor initialized with config: %s",
        self.config.__dict__,
    )
extract async
extract(source, filename=None, config=None)

Extract text asynchronously from file path or bytes using thread pool.

Parameters:

Name Type Description Default
source
Union[Path, str, bytes]

File path (Path/str) or file bytes

required
filename
Optional[str]

Required if source is bytes, optional for file paths

None
config
Optional[dict]

Optional configuration overrides

None

Returns:

Name Type Description
str str

Extracted text.

Raises:

Type Description
ValueError

If filename is missing when source is bytes

FileTypeNotSupportedError

If the file extension is not supported.

ExtractionError

If extraction fails.

InvalidFileError

If the file is invalid or corrupted.

Source code in textxtract/aio/extractor.py
async def extract(
    self,
    source: Union[Path, str, bytes],
    filename: Optional[str] = None,
    config: Optional[dict] = None,
) -> str:
    """
    Extract text asynchronously from file path or bytes using thread pool.

    Args:
        source: File path (Path/str) or file bytes
        filename: Required if source is bytes, optional for file paths
        config: Optional configuration overrides

    Returns:
        str: Extracted text.

    Raises:
        ValueError: If filename is missing when source is bytes
        FileTypeNotSupportedError: If the file extension is not supported.
        ExtractionError: If extraction fails.
        InvalidFileError: If the file is invalid or corrupted.
    """
    if getattr(self, "_closed", False):
        raise RuntimeError("Extractor has been closed")

    # Get file info for logging
    file_info = get_file_info(source, filename)
    logger.debug("Processing file: %s", file_info)

    # Prepare file path (create temp file if needed)
    file_path, temp_path = await self._prepare_file_path_async(
        source, filename, config
    )

    try:
        # Validate file extension
        suffix = file_info.extension
        if not suffix:
            raise FileTypeNotSupportedError(
                f"File has no extension: {file_info.filename}"
            )

        logger.debug("Detected file suffix: %s", suffix)

        # Get handler
        handler = registry.get_handler(suffix)
        handler_name = handler.__class__.__name__

        logger.info(
            "Using handler %s for file %s (size: %s MB, temp: %s)",
            handler_name,
            file_info.filename,
            file_info.size_mb,
            file_info.is_temp,
        )

        # Extract text asynchronously
        try:
            loop = asyncio.get_running_loop()
            result = await loop.run_in_executor(
                self._executor,
                handler.extract,
                file_path,
                config or self.config.__dict__,
            )
        except Exception as e:
            logger.error(
                "Extraction failed for file %s (handler: %s): %s",
                file_info.filename,
                handler_name,
                e,
            )

            # Re-raise custom extraction errors
            if isinstance(e, ExtractionError):
                raise
            # Wrap known invalid file errors
            if isinstance(e, (ValueError, OSError)):
                raise InvalidFileError(
                    f"Invalid file: {file_info.filename} (handler: {handler_name}, error: {e})"
                ) from e
            # Wrap as general extraction error
            raise ExtractionError(
                f"Extraction failed for file {file_info.filename} using {handler_name}: {e}"
            ) from e

        logger.info(
            "Extraction successful for file %s (extracted %d characters)",
            file_info.filename,
            len(result),
        )
        return result

    finally:
        # Clean up temporary file if created
        if temp_path:
            safe_unlink(temp_path)
            logger.debug("Temporary file %s deleted", temp_path)

ExtractorConfig

Enhanced configuration options for text extraction with validation.

Methods:

Name Description
__init__
__repr__
from_file

Load configuration from a file (JSON, YAML, or TOML).

get_handler

Retrieve a handler for a given file extension.

get_handler_config

Get configuration specific to a handler.

register_handler

Register a custom file type handler.

to_dict

Convert configuration to dictionary.

Attributes:

Name Type Description
custom_handlers
encoding
extra_config
logging_format
logging_level
max_file_size
max_memory_usage
timeout
Source code in textxtract/core/config.py
class ExtractorConfig:
    """Enhanced configuration options for text extraction with validation."""

    def __init__(
        self,
        encoding: str = "utf-8",
        logging_level: str = "INFO",
        logging_format: Optional[str] = None,
        timeout: Optional[float] = None,
        max_file_size: Optional[int] = None,
        max_memory_usage: Optional[int] = None,
        custom_handlers: Optional[Dict[str, Callable]] = None,
        **kwargs,
    ):
        # Validate and set basic options
        self.encoding = self._validate_encoding(encoding)
        self.logging_level = self._validate_logging_level(logging_level)
        self.logging_format = (
            logging_format or "%(asctime)s %(levelname)s %(name)s: %(message)s"
        )
        self.timeout = self._validate_timeout(timeout)
        self.max_file_size = self._validate_max_file_size(max_file_size)
        self.max_memory_usage = max_memory_usage
        self.custom_handlers = custom_handlers or {}

        # Load from environment variables
        self._load_from_env()

        # Store additional kwargs for handler-specific config
        self.extra_config = kwargs

    def _validate_encoding(self, encoding: str) -> str:
        """Validate encoding parameter."""
        if not isinstance(encoding, str):
            raise ValueError("Encoding must be a string")

        # Test if encoding is valid
        try:
            "test".encode(encoding)
        except LookupError:
            raise ValueError(f"Invalid encoding: {encoding}")

        return encoding

    def _validate_logging_level(self, level: str) -> str:
        """Validate logging level parameter."""
        valid_levels = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"}
        if level.upper() not in valid_levels:
            raise ValueError(
                f"Invalid logging level: {level}. Must be one of {valid_levels}"
            )
        return level.upper()

    def _validate_timeout(self, timeout: Optional[float]) -> Optional[float]:
        """Validate timeout parameter."""
        if timeout is not None:
            if not isinstance(timeout, (int, float)) or timeout <= 0:
                raise ValueError("Timeout must be a positive number")
        return timeout

    def _validate_max_file_size(self, size: Optional[int]) -> Optional[int]:
        """Validate max file size parameter."""
        if size is not None:
            if not isinstance(size, int) or size <= 0:
                raise ValueError("Max file size must be a positive integer")
        return size

    def _load_from_env(self):
        """Load configuration from environment variables."""
        # Override with environment variables if present
        env_encoding = os.getenv("TEXT_EXTRACTOR_ENCODING")
        if env_encoding:
            self.encoding = self._validate_encoding(env_encoding)

        env_logging = os.getenv("TEXT_EXTRACTOR_LOG_LEVEL")
        if env_logging:
            self.logging_level = self._validate_logging_level(env_logging)

        env_timeout = os.getenv("TEXT_EXTRACTOR_TIMEOUT")
        if env_timeout:
            try:
                self.timeout = float(env_timeout)
            except ValueError:
                pass  # Ignore invalid values

        env_max_size = os.getenv("TEXT_EXTRACTOR_MAX_FILE_SIZE")
        if env_max_size:
            try:
                self.max_file_size = int(env_max_size)
            except ValueError:
                pass  # Ignore invalid values

    def register_handler(self, extension: str, handler: Callable):
        """Register a custom file type handler."""
        if not extension.startswith("."):
            extension = f".{extension}"
        self.custom_handlers[extension.lower()] = handler

    def get_handler(self, extension: str) -> Optional[Callable]:
        """Retrieve a handler for a given file extension."""
        return self.custom_handlers.get(extension.lower())

    def get_handler_config(self, handler_name: str) -> Dict[str, Any]:
        """Get configuration specific to a handler."""
        base_config = {
            "encoding": self.encoding,
            "timeout": self.timeout,
            "max_file_size": self.max_file_size,
            "max_memory_usage": self.max_memory_usage,
        }

        # Add handler-specific config
        handler_config_key = f"{handler_name.lower()}_config"
        if handler_config_key in self.extra_config:
            base_config.update(self.extra_config[handler_config_key])

        return base_config

    def to_dict(self) -> Dict[str, Any]:
        """Convert configuration to dictionary."""
        return {
            "encoding": self.encoding,
            "logging_level": self.logging_level,
            "logging_format": self.logging_format,
            "timeout": self.timeout,
            "max_file_size": self.max_file_size,
            "max_memory_usage": self.max_memory_usage,
            "custom_handlers": {k: str(v) for k, v in self.custom_handlers.items()},
            **self.extra_config,
        }

    @classmethod
    def from_file(cls, config_path: Union[str, Path]) -> "ExtractorConfig":
        """Load configuration from a file (JSON, YAML, or TOML)."""
        config_path = Path(config_path)

        if not config_path.exists():
            raise FileNotFoundError(f"Configuration file not found: {config_path}")

        content = config_path.read_text()

        if config_path.suffix.lower() == ".json":
            import json

            config_data = json.loads(content)
        elif config_path.suffix.lower() in (".yaml", ".yml"):
            try:
                import yaml

                config_data = yaml.safe_load(content)
            except ImportError:
                raise ImportError("PyYAML is required to load YAML configuration files")
        elif config_path.suffix.lower() == ".toml":
            try:
                import tomli

                config_data = tomli.loads(content)
            except ImportError:
                raise ImportError("tomli is required to load TOML configuration files")
        else:
            raise ValueError(
                f"Unsupported configuration file format: {config_path.suffix}"
            )

        return cls(**config_data)

    def __repr__(self) -> str:
        return f"ExtractorConfig(encoding='{self.encoding}', logging_level='{self.logging_level}', timeout={self.timeout})"

Attributes

custom_handlers instance-attribute
custom_handlers = custom_handlers or {}
encoding instance-attribute
encoding = _validate_encoding(encoding)
extra_config instance-attribute
extra_config = kwargs
logging_format instance-attribute
logging_format = logging_format or '%(asctime)s %(levelname)s %(name)s: %(message)s'
logging_level instance-attribute
logging_level = _validate_logging_level(logging_level)
max_file_size instance-attribute
max_file_size = _validate_max_file_size(max_file_size)
max_memory_usage instance-attribute
max_memory_usage = max_memory_usage
timeout instance-attribute
timeout = _validate_timeout(timeout)

Functions

__init__
__init__(encoding='utf-8', logging_level='INFO', logging_format=None, timeout=None, max_file_size=None, max_memory_usage=None, custom_handlers=None, **kwargs)
Source code in textxtract/core/config.py
def __init__(
    self,
    encoding: str = "utf-8",
    logging_level: str = "INFO",
    logging_format: Optional[str] = None,
    timeout: Optional[float] = None,
    max_file_size: Optional[int] = None,
    max_memory_usage: Optional[int] = None,
    custom_handlers: Optional[Dict[str, Callable]] = None,
    **kwargs,
):
    # Validate and set basic options
    self.encoding = self._validate_encoding(encoding)
    self.logging_level = self._validate_logging_level(logging_level)
    self.logging_format = (
        logging_format or "%(asctime)s %(levelname)s %(name)s: %(message)s"
    )
    self.timeout = self._validate_timeout(timeout)
    self.max_file_size = self._validate_max_file_size(max_file_size)
    self.max_memory_usage = max_memory_usage
    self.custom_handlers = custom_handlers or {}

    # Load from environment variables
    self._load_from_env()

    # Store additional kwargs for handler-specific config
    self.extra_config = kwargs
__repr__
__repr__()
Source code in textxtract/core/config.py
def __repr__(self) -> str:
    return f"ExtractorConfig(encoding='{self.encoding}', logging_level='{self.logging_level}', timeout={self.timeout})"
from_file classmethod
from_file(config_path)

Load configuration from a file (JSON, YAML, or TOML).

Source code in textxtract/core/config.py
@classmethod
def from_file(cls, config_path: Union[str, Path]) -> "ExtractorConfig":
    """Load configuration from a file (JSON, YAML, or TOML)."""
    config_path = Path(config_path)

    if not config_path.exists():
        raise FileNotFoundError(f"Configuration file not found: {config_path}")

    content = config_path.read_text()

    if config_path.suffix.lower() == ".json":
        import json

        config_data = json.loads(content)
    elif config_path.suffix.lower() in (".yaml", ".yml"):
        try:
            import yaml

            config_data = yaml.safe_load(content)
        except ImportError:
            raise ImportError("PyYAML is required to load YAML configuration files")
    elif config_path.suffix.lower() == ".toml":
        try:
            import tomli

            config_data = tomli.loads(content)
        except ImportError:
            raise ImportError("tomli is required to load TOML configuration files")
    else:
        raise ValueError(
            f"Unsupported configuration file format: {config_path.suffix}"
        )

    return cls(**config_data)
get_handler
get_handler(extension)

Retrieve a handler for a given file extension.

Source code in textxtract/core/config.py
def get_handler(self, extension: str) -> Optional[Callable]:
    """Retrieve a handler for a given file extension."""
    return self.custom_handlers.get(extension.lower())
get_handler_config
get_handler_config(handler_name)

Get configuration specific to a handler.

Source code in textxtract/core/config.py
def get_handler_config(self, handler_name: str) -> Dict[str, Any]:
    """Get configuration specific to a handler."""
    base_config = {
        "encoding": self.encoding,
        "timeout": self.timeout,
        "max_file_size": self.max_file_size,
        "max_memory_usage": self.max_memory_usage,
    }

    # Add handler-specific config
    handler_config_key = f"{handler_name.lower()}_config"
    if handler_config_key in self.extra_config:
        base_config.update(self.extra_config[handler_config_key])

    return base_config
register_handler
register_handler(extension, handler)

Register a custom file type handler.

Source code in textxtract/core/config.py
def register_handler(self, extension: str, handler: Callable):
    """Register a custom file type handler."""
    if not extension.startswith("."):
        extension = f".{extension}"
    self.custom_handlers[extension.lower()] = handler
to_dict
to_dict()

Convert configuration to dictionary.

Source code in textxtract/core/config.py
def to_dict(self) -> Dict[str, Any]:
    """Convert configuration to dictionary."""
    return {
        "encoding": self.encoding,
        "logging_level": self.logging_level,
        "logging_format": self.logging_format,
        "timeout": self.timeout,
        "max_file_size": self.max_file_size,
        "max_memory_usage": self.max_memory_usage,
        "custom_handlers": {k: str(v) for k, v in self.custom_handlers.items()},
        **self.extra_config,
    }

SyncTextExtractor

Bases: TextExtractor

Synchronous text extractor with support for file paths and bytes.

Provides synchronous text extraction from various file types. Logs debug and info level messages for tracing and diagnostics. Supports context manager protocol for proper cleanup.

Methods:

Name Description
__enter__

Context manager entry.

__exit__

Context manager exit.

__init__
extract

Extract text synchronously from file path or bytes.

Attributes:

Name Type Description
config
Source code in textxtract/sync/extractor.py
class SyncTextExtractor(TextExtractor):
    """
    Synchronous text extractor with support for file paths and bytes.

    Provides synchronous text extraction from various file types.
    Logs debug and info level messages for tracing and diagnostics.
    Supports context manager protocol for proper cleanup.
    """

    def __init__(self, config: Optional[ExtractorConfig] = None):
        self.config = config or ExtractorConfig()
        logger.debug(
            "SyncTextExtractor initialized with config: %s", self.config.__dict__
        )

    def extract(
        self,
        source: Union[Path, str, bytes],
        filename: Optional[str] = None,
        config: Optional[dict] = None,
    ) -> str:
        """
        Extract text synchronously from file path or bytes.

        Args:
            source: File path (Path/str) or file bytes
            filename: Required if source is bytes, optional for file paths
            config: Optional configuration overrides

        Returns:
            str: Extracted text.

        Raises:
            ValueError: If filename is missing when source is bytes
            FileTypeNotSupportedError: If the file extension is not supported.
            ExtractionError: If extraction fails.
            InvalidFileError: If the file is invalid or corrupted.
        """
        # Get file info for logging
        file_info = get_file_info(source, filename)
        logger.debug("Processing file: %s", file_info)

        # Prepare file path (create temp file if needed)
        file_path, temp_path = self._prepare_file_path(source, filename, config)

        try:
            # Validate file extension
            suffix = file_info.extension
            if not suffix:
                raise FileTypeNotSupportedError(
                    f"File has no extension: {file_info.filename}"
                )

            logger.debug("Detected file suffix: %s", suffix)

            # Get handler
            handler = registry.get_handler(suffix)
            handler_name = handler.__class__.__name__

            logger.info(
                "Using handler %s for file %s (size: %s MB, temp: %s)",
                handler_name,
                file_info.filename,
                file_info.size_mb,
                file_info.is_temp,
            )

            # Extract text
            try:
                result = handler.extract(file_path, config or self.config.__dict__)
            except Exception as e:
                logger.error(
                    "Extraction failed for file %s (handler: %s): %s",
                    file_info.filename,
                    handler_name,
                    e,
                )

                # Re-raise custom extraction errors
                if isinstance(e, ExtractionError):
                    raise
                # Wrap known invalid file errors
                if isinstance(e, (ValueError, OSError)):
                    raise InvalidFileError(
                        f"Invalid file: {file_info.filename} (handler: {handler_name}, error: {e})"
                    ) from e
                # Wrap as general extraction error
                raise ExtractionError(
                    f"Extraction failed for file {file_info.filename} using {handler_name}: {e}"
                ) from e

            logger.info(
                "Extraction successful for file %s (extracted %d characters)",
                file_info.filename,
                len(result),
            )
            return result

        finally:
            # Clean up temporary file if created
            if temp_path:
                safe_unlink(temp_path)
                logger.debug("Temporary file %s deleted", temp_path)

    def _prepare_file_path(
        self,
        source: Union[Path, str, bytes],
        filename: Optional[str],
        config: Optional[dict],
    ) -> tuple[Path, Optional[Path]]:
        """
        Prepare file path for extraction.

        Returns:
            tuple: (file_path, temp_path_if_created)
        """
        if isinstance(source, bytes):
            # Handle bytes input - create temporary file
            if not filename:
                raise ValueError("filename is required when source is bytes")

            temp_path = create_temp_file(
                source, filename, config and config.get("max_file_size")
            )
            logger.debug(
                "Temporary file created at %s for filename %s", temp_path, filename
            )
            return temp_path, temp_path
        else:
            # Handle file path input
            file_path = Path(source)
            if not file_path.exists():
                raise InvalidFileError(f"File not found: {file_path}")
            if not file_path.is_file():
                raise InvalidFileError(f"Path is not a file: {file_path}")

            logger.debug("Using existing file: %s", file_path)
            return file_path, None

    def __enter__(self):
        """Context manager entry."""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit."""
        pass  # No resources to clean up for sync extractor

Attributes

config instance-attribute
config = config or ExtractorConfig()

Functions

__enter__
__enter__()

Context manager entry.

Source code in textxtract/sync/extractor.py
def __enter__(self):
    """Context manager entry."""
    return self
__exit__
__exit__(exc_type, exc_val, exc_tb)

Context manager exit.

Source code in textxtract/sync/extractor.py
def __exit__(self, exc_type, exc_val, exc_tb):
    """Context manager exit."""
    pass  # No resources to clean up for sync extractor
__init__
__init__(config=None)
Source code in textxtract/sync/extractor.py
def __init__(self, config: Optional[ExtractorConfig] = None):
    self.config = config or ExtractorConfig()
    logger.debug(
        "SyncTextExtractor initialized with config: %s", self.config.__dict__
    )
extract
extract(source, filename=None, config=None)

Extract text synchronously from file path or bytes.

Parameters:

Name Type Description Default
source
Union[Path, str, bytes]

File path (Path/str) or file bytes

required
filename
Optional[str]

Required if source is bytes, optional for file paths

None
config
Optional[dict]

Optional configuration overrides

None

Returns:

Name Type Description
str str

Extracted text.

Raises:

Type Description
ValueError

If filename is missing when source is bytes

FileTypeNotSupportedError

If the file extension is not supported.

ExtractionError

If extraction fails.

InvalidFileError

If the file is invalid or corrupted.

Source code in textxtract/sync/extractor.py
def extract(
    self,
    source: Union[Path, str, bytes],
    filename: Optional[str] = None,
    config: Optional[dict] = None,
) -> str:
    """
    Extract text synchronously from file path or bytes.

    Args:
        source: File path (Path/str) or file bytes
        filename: Required if source is bytes, optional for file paths
        config: Optional configuration overrides

    Returns:
        str: Extracted text.

    Raises:
        ValueError: If filename is missing when source is bytes
        FileTypeNotSupportedError: If the file extension is not supported.
        ExtractionError: If extraction fails.
        InvalidFileError: If the file is invalid or corrupted.
    """
    # Get file info for logging
    file_info = get_file_info(source, filename)
    logger.debug("Processing file: %s", file_info)

    # Prepare file path (create temp file if needed)
    file_path, temp_path = self._prepare_file_path(source, filename, config)

    try:
        # Validate file extension
        suffix = file_info.extension
        if not suffix:
            raise FileTypeNotSupportedError(
                f"File has no extension: {file_info.filename}"
            )

        logger.debug("Detected file suffix: %s", suffix)

        # Get handler
        handler = registry.get_handler(suffix)
        handler_name = handler.__class__.__name__

        logger.info(
            "Using handler %s for file %s (size: %s MB, temp: %s)",
            handler_name,
            file_info.filename,
            file_info.size_mb,
            file_info.is_temp,
        )

        # Extract text
        try:
            result = handler.extract(file_path, config or self.config.__dict__)
        except Exception as e:
            logger.error(
                "Extraction failed for file %s (handler: %s): %s",
                file_info.filename,
                handler_name,
                e,
            )

            # Re-raise custom extraction errors
            if isinstance(e, ExtractionError):
                raise
            # Wrap known invalid file errors
            if isinstance(e, (ValueError, OSError)):
                raise InvalidFileError(
                    f"Invalid file: {file_info.filename} (handler: {handler_name}, error: {e})"
                ) from e
            # Wrap as general extraction error
            raise ExtractionError(
                f"Extraction failed for file {file_info.filename} using {handler_name}: {e}"
            ) from e

        logger.info(
            "Extraction successful for file %s (extracted %d characters)",
            file_info.filename,
            len(result),
        )
        return result

    finally:
        # Clean up temporary file if created
        if temp_path:
            safe_unlink(temp_path)
            logger.debug("Temporary file %s deleted", temp_path)

Modules

aio

Asynchronous extraction logic package.

Modules:

Name Description
extractor

Asynchronous text extraction logic with support for file paths and bytes.

Classes:

Name Description
AsyncTextExtractor

Asynchronous text extractor with support for file paths and bytes.

Attributes

__all__ module-attribute
__all__ = ['AsyncTextExtractor']

Classes

AsyncTextExtractor

Bases: TextExtractor

Asynchronous text extractor with support for file paths and bytes.

Provides asynchronous text extraction from various file types. Logs debug and info level messages for tracing and diagnostics. Uses thread pool for I/O-bound operations.

Methods:

Name Description
__aenter__

Async context manager entry.

__aexit__

Async context manager exit with cleanup.

__enter__

Context manager entry.

__exit__

Context manager exit with cleanup.

__init__
extract

Extract text asynchronously from file path or bytes using thread pool.

Attributes:

Name Type Description
config
Source code in textxtract/aio/extractor.py
class AsyncTextExtractor(TextExtractor):
    """
    Asynchronous text extractor with support for file paths and bytes.

    Provides asynchronous text extraction from various file types.
    Logs debug and info level messages for tracing and diagnostics.
    Uses thread pool for I/O-bound operations.
    """

    def __init__(
        self,
        config: Optional[ExtractorConfig] = None,
        max_workers: Optional[int] = None,
    ):
        self.config = config or ExtractorConfig()
        self._executor = ThreadPoolExecutor(max_workers=max_workers)
        self._closed = False
        logger.debug(
            "AsyncTextExtractor initialized with config: %s",
            self.config.__dict__,
        )

    async def extract(
        self,
        source: Union[Path, str, bytes],
        filename: Optional[str] = None,
        config: Optional[dict] = None,
    ) -> str:
        """
        Extract text asynchronously from file path or bytes using thread pool.

        Args:
            source: File path (Path/str) or file bytes
            filename: Required if source is bytes, optional for file paths
            config: Optional configuration overrides

        Returns:
            str: Extracted text.

        Raises:
            ValueError: If filename is missing when source is bytes
            FileTypeNotSupportedError: If the file extension is not supported.
            ExtractionError: If extraction fails.
            InvalidFileError: If the file is invalid or corrupted.
        """
        if getattr(self, "_closed", False):
            raise RuntimeError("Extractor has been closed")

        # Get file info for logging
        file_info = get_file_info(source, filename)
        logger.debug("Processing file: %s", file_info)

        # Prepare file path (create temp file if needed)
        file_path, temp_path = await self._prepare_file_path_async(
            source, filename, config
        )

        try:
            # Validate file extension
            suffix = file_info.extension
            if not suffix:
                raise FileTypeNotSupportedError(
                    f"File has no extension: {file_info.filename}"
                )

            logger.debug("Detected file suffix: %s", suffix)

            # Get handler
            handler = registry.get_handler(suffix)
            handler_name = handler.__class__.__name__

            logger.info(
                "Using handler %s for file %s (size: %s MB, temp: %s)",
                handler_name,
                file_info.filename,
                file_info.size_mb,
                file_info.is_temp,
            )

            # Extract text asynchronously
            try:
                loop = asyncio.get_running_loop()
                result = await loop.run_in_executor(
                    self._executor,
                    handler.extract,
                    file_path,
                    config or self.config.__dict__,
                )
            except Exception as e:
                logger.error(
                    "Extraction failed for file %s (handler: %s): %s",
                    file_info.filename,
                    handler_name,
                    e,
                )

                # Re-raise custom extraction errors
                if isinstance(e, ExtractionError):
                    raise
                # Wrap known invalid file errors
                if isinstance(e, (ValueError, OSError)):
                    raise InvalidFileError(
                        f"Invalid file: {file_info.filename} (handler: {handler_name}, error: {e})"
                    ) from e
                # Wrap as general extraction error
                raise ExtractionError(
                    f"Extraction failed for file {file_info.filename} using {handler_name}: {e}"
                ) from e

            logger.info(
                "Extraction successful for file %s (extracted %d characters)",
                file_info.filename,
                len(result),
            )
            return result

        finally:
            # Clean up temporary file if created
            if temp_path:
                safe_unlink(temp_path)
                logger.debug("Temporary file %s deleted", temp_path)

    async def _prepare_file_path_async(
        self,
        source: Union[Path, str, bytes],
        filename: Optional[str],
        config: Optional[dict],
    ) -> tuple[Path, Optional[Path]]:
        """
        Prepare file path for extraction asynchronously.

        Returns:
            tuple: (file_path, temp_path_if_created)
        """
        if isinstance(source, bytes):
            # Handle bytes input - create temporary file
            if not filename:
                raise ValueError("filename is required when source is bytes")

            # Run temp file creation in thread pool to avoid blocking
            loop = asyncio.get_running_loop()
            temp_path = await loop.run_in_executor(
                self._executor,
                create_temp_file,
                source,
                filename,
                config and config.get("max_file_size"),
            )
            logger.debug(
                "Temporary file created at %s for filename %s", temp_path, filename
            )
            return temp_path, temp_path
        else:
            # Handle file path input
            file_path = Path(source)
            if not file_path.exists():
                raise InvalidFileError(f"File not found: {file_path}")
            if not file_path.is_file():
                raise InvalidFileError(f"Path is not a file: {file_path}")

            logger.debug("Using existing file: %s", file_path)
            return file_path, None

    def __enter__(self):
        """Context manager entry."""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit with cleanup."""
        if hasattr(self, "_executor"):
            self._executor.shutdown(wait=False)
        self._closed = True

    async def __aenter__(self):
        """Async context manager entry."""
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit with cleanup."""
        if hasattr(self, "_executor"):
            self._executor.shutdown(wait=False)
        self._closed = True
Attributes
config instance-attribute
config = config or ExtractorConfig()
Functions
__aenter__ async
__aenter__()

Async context manager entry.

Source code in textxtract/aio/extractor.py
async def __aenter__(self):
    """Async context manager entry."""
    return self
__aexit__ async
__aexit__(exc_type, exc_val, exc_tb)

Async context manager exit with cleanup.

Source code in textxtract/aio/extractor.py
async def __aexit__(self, exc_type, exc_val, exc_tb):
    """Async context manager exit with cleanup."""
    if hasattr(self, "_executor"):
        self._executor.shutdown(wait=False)
    self._closed = True
__enter__
__enter__()

Context manager entry.

Source code in textxtract/aio/extractor.py
def __enter__(self):
    """Context manager entry."""
    return self
__exit__
__exit__(exc_type, exc_val, exc_tb)

Context manager exit with cleanup.

Source code in textxtract/aio/extractor.py
def __exit__(self, exc_type, exc_val, exc_tb):
    """Context manager exit with cleanup."""
    if hasattr(self, "_executor"):
        self._executor.shutdown(wait=False)
    self._closed = True
__init__
__init__(config=None, max_workers=None)
Source code in textxtract/aio/extractor.py
def __init__(
    self,
    config: Optional[ExtractorConfig] = None,
    max_workers: Optional[int] = None,
):
    self.config = config or ExtractorConfig()
    self._executor = ThreadPoolExecutor(max_workers=max_workers)
    self._closed = False
    logger.debug(
        "AsyncTextExtractor initialized with config: %s",
        self.config.__dict__,
    )
extract async
extract(source, filename=None, config=None)

Extract text asynchronously from file path or bytes using thread pool.

Parameters:

Name Type Description Default
source Union[Path, str, bytes]

File path (Path/str) or file bytes

required
filename Optional[str]

Required if source is bytes, optional for file paths

None
config Optional[dict]

Optional configuration overrides

None

Returns:

Name Type Description
str str

Extracted text.

Raises:

Type Description
ValueError

If filename is missing when source is bytes

FileTypeNotSupportedError

If the file extension is not supported.

ExtractionError

If extraction fails.

InvalidFileError

If the file is invalid or corrupted.

Source code in textxtract/aio/extractor.py
async def extract(
    self,
    source: Union[Path, str, bytes],
    filename: Optional[str] = None,
    config: Optional[dict] = None,
) -> str:
    """
    Extract text asynchronously from file path or bytes using thread pool.

    Args:
        source: File path (Path/str) or file bytes
        filename: Required if source is bytes, optional for file paths
        config: Optional configuration overrides

    Returns:
        str: Extracted text.

    Raises:
        ValueError: If filename is missing when source is bytes
        FileTypeNotSupportedError: If the file extension is not supported.
        ExtractionError: If extraction fails.
        InvalidFileError: If the file is invalid or corrupted.
    """
    if getattr(self, "_closed", False):
        raise RuntimeError("Extractor has been closed")

    # Get file info for logging
    file_info = get_file_info(source, filename)
    logger.debug("Processing file: %s", file_info)

    # Prepare file path (create temp file if needed)
    file_path, temp_path = await self._prepare_file_path_async(
        source, filename, config
    )

    try:
        # Validate file extension
        suffix = file_info.extension
        if not suffix:
            raise FileTypeNotSupportedError(
                f"File has no extension: {file_info.filename}"
            )

        logger.debug("Detected file suffix: %s", suffix)

        # Get handler
        handler = registry.get_handler(suffix)
        handler_name = handler.__class__.__name__

        logger.info(
            "Using handler %s for file %s (size: %s MB, temp: %s)",
            handler_name,
            file_info.filename,
            file_info.size_mb,
            file_info.is_temp,
        )

        # Extract text asynchronously
        try:
            loop = asyncio.get_running_loop()
            result = await loop.run_in_executor(
                self._executor,
                handler.extract,
                file_path,
                config or self.config.__dict__,
            )
        except Exception as e:
            logger.error(
                "Extraction failed for file %s (handler: %s): %s",
                file_info.filename,
                handler_name,
                e,
            )

            # Re-raise custom extraction errors
            if isinstance(e, ExtractionError):
                raise
            # Wrap known invalid file errors
            if isinstance(e, (ValueError, OSError)):
                raise InvalidFileError(
                    f"Invalid file: {file_info.filename} (handler: {handler_name}, error: {e})"
                ) from e
            # Wrap as general extraction error
            raise ExtractionError(
                f"Extraction failed for file {file_info.filename} using {handler_name}: {e}"
            ) from e

        logger.info(
            "Extraction successful for file %s (extracted %d characters)",
            file_info.filename,
            len(result),
        )
        return result

    finally:
        # Clean up temporary file if created
        if temp_path:
            safe_unlink(temp_path)
            logger.debug("Temporary file %s deleted", temp_path)

Modules

extractor

Asynchronous text extraction logic with support for file paths and bytes.

Classes:

Name Description
AsyncTextExtractor

Asynchronous text extractor with support for file paths and bytes.

Attributes:

Name Type Description
logger
Attributes
logger module-attribute
logger = getLogger('textxtract.aio')
Classes
AsyncTextExtractor

Bases: TextExtractor

Asynchronous text extractor with support for file paths and bytes.

Provides asynchronous text extraction from various file types. Logs debug and info level messages for tracing and diagnostics. Uses thread pool for I/O-bound operations.

Methods:

Name Description
__aenter__

Async context manager entry.

__aexit__

Async context manager exit with cleanup.

__enter__

Context manager entry.

__exit__

Context manager exit with cleanup.

__init__
extract

Extract text asynchronously from file path or bytes using thread pool.

Attributes:

Name Type Description
config
Source code in textxtract/aio/extractor.py
class AsyncTextExtractor(TextExtractor):
    """
    Asynchronous text extractor with support for file paths and bytes.

    Provides asynchronous text extraction from various file types.
    Logs debug and info level messages for tracing and diagnostics.
    Uses thread pool for I/O-bound operations.
    """

    def __init__(
        self,
        config: Optional[ExtractorConfig] = None,
        max_workers: Optional[int] = None,
    ):
        self.config = config or ExtractorConfig()
        self._executor = ThreadPoolExecutor(max_workers=max_workers)
        self._closed = False
        logger.debug(
            "AsyncTextExtractor initialized with config: %s",
            self.config.__dict__,
        )

    async def extract(
        self,
        source: Union[Path, str, bytes],
        filename: Optional[str] = None,
        config: Optional[dict] = None,
    ) -> str:
        """
        Extract text asynchronously from file path or bytes using thread pool.

        Args:
            source: File path (Path/str) or file bytes
            filename: Required if source is bytes, optional for file paths
            config: Optional configuration overrides

        Returns:
            str: Extracted text.

        Raises:
            ValueError: If filename is missing when source is bytes
            FileTypeNotSupportedError: If the file extension is not supported.
            ExtractionError: If extraction fails.
            InvalidFileError: If the file is invalid or corrupted.
        """
        if getattr(self, "_closed", False):
            raise RuntimeError("Extractor has been closed")

        # Get file info for logging
        file_info = get_file_info(source, filename)
        logger.debug("Processing file: %s", file_info)

        # Prepare file path (create temp file if needed)
        file_path, temp_path = await self._prepare_file_path_async(
            source, filename, config
        )

        try:
            # Validate file extension
            suffix = file_info.extension
            if not suffix:
                raise FileTypeNotSupportedError(
                    f"File has no extension: {file_info.filename}"
                )

            logger.debug("Detected file suffix: %s", suffix)

            # Get handler
            handler = registry.get_handler(suffix)
            handler_name = handler.__class__.__name__

            logger.info(
                "Using handler %s for file %s (size: %s MB, temp: %s)",
                handler_name,
                file_info.filename,
                file_info.size_mb,
                file_info.is_temp,
            )

            # Extract text asynchronously
            try:
                loop = asyncio.get_running_loop()
                result = await loop.run_in_executor(
                    self._executor,
                    handler.extract,
                    file_path,
                    config or self.config.__dict__,
                )
            except Exception as e:
                logger.error(
                    "Extraction failed for file %s (handler: %s): %s",
                    file_info.filename,
                    handler_name,
                    e,
                )

                # Re-raise custom extraction errors
                if isinstance(e, ExtractionError):
                    raise
                # Wrap known invalid file errors
                if isinstance(e, (ValueError, OSError)):
                    raise InvalidFileError(
                        f"Invalid file: {file_info.filename} (handler: {handler_name}, error: {e})"
                    ) from e
                # Wrap as general extraction error
                raise ExtractionError(
                    f"Extraction failed for file {file_info.filename} using {handler_name}: {e}"
                ) from e

            logger.info(
                "Extraction successful for file %s (extracted %d characters)",
                file_info.filename,
                len(result),
            )
            return result

        finally:
            # Clean up temporary file if created
            if temp_path:
                safe_unlink(temp_path)
                logger.debug("Temporary file %s deleted", temp_path)

    async def _prepare_file_path_async(
        self,
        source: Union[Path, str, bytes],
        filename: Optional[str],
        config: Optional[dict],
    ) -> tuple[Path, Optional[Path]]:
        """
        Prepare file path for extraction asynchronously.

        Returns:
            tuple: (file_path, temp_path_if_created)
        """
        if isinstance(source, bytes):
            # Handle bytes input - create temporary file
            if not filename:
                raise ValueError("filename is required when source is bytes")

            # Run temp file creation in thread pool to avoid blocking
            loop = asyncio.get_running_loop()
            temp_path = await loop.run_in_executor(
                self._executor,
                create_temp_file,
                source,
                filename,
                config and config.get("max_file_size"),
            )
            logger.debug(
                "Temporary file created at %s for filename %s", temp_path, filename
            )
            return temp_path, temp_path
        else:
            # Handle file path input
            file_path = Path(source)
            if not file_path.exists():
                raise InvalidFileError(f"File not found: {file_path}")
            if not file_path.is_file():
                raise InvalidFileError(f"Path is not a file: {file_path}")

            logger.debug("Using existing file: %s", file_path)
            return file_path, None

    def __enter__(self):
        """Context manager entry."""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit with cleanup."""
        if hasattr(self, "_executor"):
            self._executor.shutdown(wait=False)
        self._closed = True

    async def __aenter__(self):
        """Async context manager entry."""
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit with cleanup."""
        if hasattr(self, "_executor"):
            self._executor.shutdown(wait=False)
        self._closed = True
Attributes
config instance-attribute
config = config or ExtractorConfig()
Functions
__aenter__ async
__aenter__()

Async context manager entry.

Source code in textxtract/aio/extractor.py
async def __aenter__(self):
    """Async context manager entry."""
    return self
__aexit__ async
__aexit__(exc_type, exc_val, exc_tb)

Async context manager exit with cleanup.

Source code in textxtract/aio/extractor.py
async def __aexit__(self, exc_type, exc_val, exc_tb):
    """Async context manager exit with cleanup."""
    if hasattr(self, "_executor"):
        self._executor.shutdown(wait=False)
    self._closed = True
__enter__
__enter__()

Context manager entry.

Source code in textxtract/aio/extractor.py
def __enter__(self):
    """Context manager entry."""
    return self
__exit__
__exit__(exc_type, exc_val, exc_tb)

Context manager exit with cleanup.

Source code in textxtract/aio/extractor.py
def __exit__(self, exc_type, exc_val, exc_tb):
    """Context manager exit with cleanup."""
    if hasattr(self, "_executor"):
        self._executor.shutdown(wait=False)
    self._closed = True
__init__
__init__(config=None, max_workers=None)
Source code in textxtract/aio/extractor.py
def __init__(
    self,
    config: Optional[ExtractorConfig] = None,
    max_workers: Optional[int] = None,
):
    self.config = config or ExtractorConfig()
    self._executor = ThreadPoolExecutor(max_workers=max_workers)
    self._closed = False
    logger.debug(
        "AsyncTextExtractor initialized with config: %s",
        self.config.__dict__,
    )
extract async
extract(source, filename=None, config=None)

Extract text asynchronously from file path or bytes using thread pool.

Parameters:

Name Type Description Default
source Union[Path, str, bytes]

File path (Path/str) or file bytes

required
filename Optional[str]

Required if source is bytes, optional for file paths

None
config Optional[dict]

Optional configuration overrides

None

Returns:

Name Type Description
str str

Extracted text.

Raises:

Type Description
ValueError

If filename is missing when source is bytes

FileTypeNotSupportedError

If the file extension is not supported.

ExtractionError

If extraction fails.

InvalidFileError

If the file is invalid or corrupted.

Source code in textxtract/aio/extractor.py
async def extract(
    self,
    source: Union[Path, str, bytes],
    filename: Optional[str] = None,
    config: Optional[dict] = None,
) -> str:
    """
    Extract text asynchronously from file path or bytes using thread pool.

    Args:
        source: File path (Path/str) or file bytes
        filename: Required if source is bytes, optional for file paths
        config: Optional configuration overrides

    Returns:
        str: Extracted text.

    Raises:
        ValueError: If filename is missing when source is bytes
        FileTypeNotSupportedError: If the file extension is not supported.
        ExtractionError: If extraction fails.
        InvalidFileError: If the file is invalid or corrupted.
    """
    if getattr(self, "_closed", False):
        raise RuntimeError("Extractor has been closed")

    # Get file info for logging
    file_info = get_file_info(source, filename)
    logger.debug("Processing file: %s", file_info)

    # Prepare file path (create temp file if needed)
    file_path, temp_path = await self._prepare_file_path_async(
        source, filename, config
    )

    try:
        # Validate file extension
        suffix = file_info.extension
        if not suffix:
            raise FileTypeNotSupportedError(
                f"File has no extension: {file_info.filename}"
            )

        logger.debug("Detected file suffix: %s", suffix)

        # Get handler
        handler = registry.get_handler(suffix)
        handler_name = handler.__class__.__name__

        logger.info(
            "Using handler %s for file %s (size: %s MB, temp: %s)",
            handler_name,
            file_info.filename,
            file_info.size_mb,
            file_info.is_temp,
        )

        # Extract text asynchronously
        try:
            loop = asyncio.get_running_loop()
            result = await loop.run_in_executor(
                self._executor,
                handler.extract,
                file_path,
                config or self.config.__dict__,
            )
        except Exception as e:
            logger.error(
                "Extraction failed for file %s (handler: %s): %s",
                file_info.filename,
                handler_name,
                e,
            )

            # Re-raise custom extraction errors
            if isinstance(e, ExtractionError):
                raise
            # Wrap known invalid file errors
            if isinstance(e, (ValueError, OSError)):
                raise InvalidFileError(
                    f"Invalid file: {file_info.filename} (handler: {handler_name}, error: {e})"
                ) from e
            # Wrap as general extraction error
            raise ExtractionError(
                f"Extraction failed for file {file_info.filename} using {handler_name}: {e}"
            ) from e

        logger.info(
            "Extraction successful for file %s (extracted %d characters)",
            file_info.filename,
            len(result),
        )
        return result

    finally:
        # Clean up temporary file if created
        if temp_path:
            safe_unlink(temp_path)
            logger.debug("Temporary file %s deleted", temp_path)
Functions

core

Core components for textxtract package.

Modules:

Name Description
base

Abstract base classes for text extraction.

config

Configuration and customization for textxtract package.

exceptions

Custom exceptions for textxtract package.

logging_config

Logging configuration for textxtract package.

registry

Handler registry for centralized handler management.

utils

Utility functions for textxtract package.

Modules

base

Abstract base classes for text extraction.

Classes:

Name Description
FileTypeHandler

Abstract base class for file type-specific handlers.

TextExtractor

Abstract base class for text extractors.

Classes
FileTypeHandler

Bases: ABC

Abstract base class for file type-specific handlers.

Methods:

Name Description
extract

Extract text synchronously from a file.

extract_async

Extract text asynchronously from a file.

Source code in textxtract/core/base.py
class FileTypeHandler(ABC):
    """Abstract base class for file type-specific handlers."""

    @abstractmethod
    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        """Extract text synchronously from a file."""
        pass

    @abstractmethod
    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        """Extract text asynchronously from a file."""
        pass
Functions
extract abstractmethod
extract(file_path, config=None)

Extract text synchronously from a file.

Source code in textxtract/core/base.py
@abstractmethod
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    """Extract text synchronously from a file."""
    pass
extract_async abstractmethod async
extract_async(file_path, config=None)

Extract text asynchronously from a file.

Source code in textxtract/core/base.py
@abstractmethod
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    """Extract text asynchronously from a file."""
    pass
TextExtractor

Bases: ABC

Abstract base class for text extractors.

Methods:

Name Description
extract

Extract text synchronously from file path or bytes.

Source code in textxtract/core/base.py
class TextExtractor(ABC):
    """Abstract base class for text extractors."""

    @abstractmethod
    def extract(
        self,
        source: Union[Path, str, bytes],
        filename: Optional[str] = None,
        config: Optional[dict] = None,
    ) -> str:
        """Extract text synchronously from file path or bytes."""
        pass
Functions
extract abstractmethod
extract(source, filename=None, config=None)

Extract text synchronously from file path or bytes.

Source code in textxtract/core/base.py
@abstractmethod
def extract(
    self,
    source: Union[Path, str, bytes],
    filename: Optional[str] = None,
    config: Optional[dict] = None,
) -> str:
    """Extract text synchronously from file path or bytes."""
    pass
config

Configuration and customization for textxtract package.

Classes:

Name Description
ExtractorConfig

Enhanced configuration options for text extraction with validation.

Classes
ExtractorConfig

Enhanced configuration options for text extraction with validation.

Methods:

Name Description
__init__
__repr__
from_file

Load configuration from a file (JSON, YAML, or TOML).

get_handler

Retrieve a handler for a given file extension.

get_handler_config

Get configuration specific to a handler.

register_handler

Register a custom file type handler.

to_dict

Convert configuration to dictionary.

Attributes:

Name Type Description
custom_handlers
encoding
extra_config
logging_format
logging_level
max_file_size
max_memory_usage
timeout
Source code in textxtract/core/config.py
class ExtractorConfig:
    """Enhanced configuration options for text extraction with validation."""

    def __init__(
        self,
        encoding: str = "utf-8",
        logging_level: str = "INFO",
        logging_format: Optional[str] = None,
        timeout: Optional[float] = None,
        max_file_size: Optional[int] = None,
        max_memory_usage: Optional[int] = None,
        custom_handlers: Optional[Dict[str, Callable]] = None,
        **kwargs,
    ):
        # Validate and set basic options
        self.encoding = self._validate_encoding(encoding)
        self.logging_level = self._validate_logging_level(logging_level)
        self.logging_format = (
            logging_format or "%(asctime)s %(levelname)s %(name)s: %(message)s"
        )
        self.timeout = self._validate_timeout(timeout)
        self.max_file_size = self._validate_max_file_size(max_file_size)
        self.max_memory_usage = max_memory_usage
        self.custom_handlers = custom_handlers or {}

        # Load from environment variables
        self._load_from_env()

        # Store additional kwargs for handler-specific config
        self.extra_config = kwargs

    def _validate_encoding(self, encoding: str) -> str:
        """Validate encoding parameter."""
        if not isinstance(encoding, str):
            raise ValueError("Encoding must be a string")

        # Test if encoding is valid
        try:
            "test".encode(encoding)
        except LookupError:
            raise ValueError(f"Invalid encoding: {encoding}")

        return encoding

    def _validate_logging_level(self, level: str) -> str:
        """Validate logging level parameter."""
        valid_levels = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"}
        if level.upper() not in valid_levels:
            raise ValueError(
                f"Invalid logging level: {level}. Must be one of {valid_levels}"
            )
        return level.upper()

    def _validate_timeout(self, timeout: Optional[float]) -> Optional[float]:
        """Validate timeout parameter."""
        if timeout is not None:
            if not isinstance(timeout, (int, float)) or timeout <= 0:
                raise ValueError("Timeout must be a positive number")
        return timeout

    def _validate_max_file_size(self, size: Optional[int]) -> Optional[int]:
        """Validate max file size parameter."""
        if size is not None:
            if not isinstance(size, int) or size <= 0:
                raise ValueError("Max file size must be a positive integer")
        return size

    def _load_from_env(self):
        """Load configuration from environment variables."""
        # Override with environment variables if present
        env_encoding = os.getenv("TEXT_EXTRACTOR_ENCODING")
        if env_encoding:
            self.encoding = self._validate_encoding(env_encoding)

        env_logging = os.getenv("TEXT_EXTRACTOR_LOG_LEVEL")
        if env_logging:
            self.logging_level = self._validate_logging_level(env_logging)

        env_timeout = os.getenv("TEXT_EXTRACTOR_TIMEOUT")
        if env_timeout:
            try:
                self.timeout = float(env_timeout)
            except ValueError:
                pass  # Ignore invalid values

        env_max_size = os.getenv("TEXT_EXTRACTOR_MAX_FILE_SIZE")
        if env_max_size:
            try:
                self.max_file_size = int(env_max_size)
            except ValueError:
                pass  # Ignore invalid values

    def register_handler(self, extension: str, handler: Callable):
        """Register a custom file type handler."""
        if not extension.startswith("."):
            extension = f".{extension}"
        self.custom_handlers[extension.lower()] = handler

    def get_handler(self, extension: str) -> Optional[Callable]:
        """Retrieve a handler for a given file extension."""
        return self.custom_handlers.get(extension.lower())

    def get_handler_config(self, handler_name: str) -> Dict[str, Any]:
        """Get configuration specific to a handler."""
        base_config = {
            "encoding": self.encoding,
            "timeout": self.timeout,
            "max_file_size": self.max_file_size,
            "max_memory_usage": self.max_memory_usage,
        }

        # Add handler-specific config
        handler_config_key = f"{handler_name.lower()}_config"
        if handler_config_key in self.extra_config:
            base_config.update(self.extra_config[handler_config_key])

        return base_config

    def to_dict(self) -> Dict[str, Any]:
        """Convert configuration to dictionary."""
        return {
            "encoding": self.encoding,
            "logging_level": self.logging_level,
            "logging_format": self.logging_format,
            "timeout": self.timeout,
            "max_file_size": self.max_file_size,
            "max_memory_usage": self.max_memory_usage,
            "custom_handlers": {k: str(v) for k, v in self.custom_handlers.items()},
            **self.extra_config,
        }

    @classmethod
    def from_file(cls, config_path: Union[str, Path]) -> "ExtractorConfig":
        """Load configuration from a file (JSON, YAML, or TOML)."""
        config_path = Path(config_path)

        if not config_path.exists():
            raise FileNotFoundError(f"Configuration file not found: {config_path}")

        content = config_path.read_text()

        if config_path.suffix.lower() == ".json":
            import json

            config_data = json.loads(content)
        elif config_path.suffix.lower() in (".yaml", ".yml"):
            try:
                import yaml

                config_data = yaml.safe_load(content)
            except ImportError:
                raise ImportError("PyYAML is required to load YAML configuration files")
        elif config_path.suffix.lower() == ".toml":
            try:
                import tomli

                config_data = tomli.loads(content)
            except ImportError:
                raise ImportError("tomli is required to load TOML configuration files")
        else:
            raise ValueError(
                f"Unsupported configuration file format: {config_path.suffix}"
            )

        return cls(**config_data)

    def __repr__(self) -> str:
        return f"ExtractorConfig(encoding='{self.encoding}', logging_level='{self.logging_level}', timeout={self.timeout})"
Attributes
custom_handlers instance-attribute
custom_handlers = custom_handlers or {}
encoding instance-attribute
encoding = _validate_encoding(encoding)
extra_config instance-attribute
extra_config = kwargs
logging_format instance-attribute
logging_format = logging_format or '%(asctime)s %(levelname)s %(name)s: %(message)s'
logging_level instance-attribute
logging_level = _validate_logging_level(logging_level)
max_file_size instance-attribute
max_file_size = _validate_max_file_size(max_file_size)
max_memory_usage instance-attribute
max_memory_usage = max_memory_usage
timeout instance-attribute
timeout = _validate_timeout(timeout)
Functions
__init__
__init__(encoding='utf-8', logging_level='INFO', logging_format=None, timeout=None, max_file_size=None, max_memory_usage=None, custom_handlers=None, **kwargs)
Source code in textxtract/core/config.py
def __init__(
    self,
    encoding: str = "utf-8",
    logging_level: str = "INFO",
    logging_format: Optional[str] = None,
    timeout: Optional[float] = None,
    max_file_size: Optional[int] = None,
    max_memory_usage: Optional[int] = None,
    custom_handlers: Optional[Dict[str, Callable]] = None,
    **kwargs,
):
    # Validate and set basic options
    self.encoding = self._validate_encoding(encoding)
    self.logging_level = self._validate_logging_level(logging_level)
    self.logging_format = (
        logging_format or "%(asctime)s %(levelname)s %(name)s: %(message)s"
    )
    self.timeout = self._validate_timeout(timeout)
    self.max_file_size = self._validate_max_file_size(max_file_size)
    self.max_memory_usage = max_memory_usage
    self.custom_handlers = custom_handlers or {}

    # Load from environment variables
    self._load_from_env()

    # Store additional kwargs for handler-specific config
    self.extra_config = kwargs
__repr__
__repr__()
Source code in textxtract/core/config.py
def __repr__(self) -> str:
    return f"ExtractorConfig(encoding='{self.encoding}', logging_level='{self.logging_level}', timeout={self.timeout})"
from_file classmethod
from_file(config_path)

Load configuration from a file (JSON, YAML, or TOML).

Source code in textxtract/core/config.py
@classmethod
def from_file(cls, config_path: Union[str, Path]) -> "ExtractorConfig":
    """Load configuration from a file (JSON, YAML, or TOML)."""
    config_path = Path(config_path)

    if not config_path.exists():
        raise FileNotFoundError(f"Configuration file not found: {config_path}")

    content = config_path.read_text()

    if config_path.suffix.lower() == ".json":
        import json

        config_data = json.loads(content)
    elif config_path.suffix.lower() in (".yaml", ".yml"):
        try:
            import yaml

            config_data = yaml.safe_load(content)
        except ImportError:
            raise ImportError("PyYAML is required to load YAML configuration files")
    elif config_path.suffix.lower() == ".toml":
        try:
            import tomli

            config_data = tomli.loads(content)
        except ImportError:
            raise ImportError("tomli is required to load TOML configuration files")
    else:
        raise ValueError(
            f"Unsupported configuration file format: {config_path.suffix}"
        )

    return cls(**config_data)
get_handler
get_handler(extension)

Retrieve a handler for a given file extension.

Source code in textxtract/core/config.py
def get_handler(self, extension: str) -> Optional[Callable]:
    """Retrieve a handler for a given file extension."""
    return self.custom_handlers.get(extension.lower())
get_handler_config
get_handler_config(handler_name)

Get configuration specific to a handler.

Source code in textxtract/core/config.py
def get_handler_config(self, handler_name: str) -> Dict[str, Any]:
    """Get configuration specific to a handler."""
    base_config = {
        "encoding": self.encoding,
        "timeout": self.timeout,
        "max_file_size": self.max_file_size,
        "max_memory_usage": self.max_memory_usage,
    }

    # Add handler-specific config
    handler_config_key = f"{handler_name.lower()}_config"
    if handler_config_key in self.extra_config:
        base_config.update(self.extra_config[handler_config_key])

    return base_config
register_handler
register_handler(extension, handler)

Register a custom file type handler.

Source code in textxtract/core/config.py
def register_handler(self, extension: str, handler: Callable):
    """Register a custom file type handler."""
    if not extension.startswith("."):
        extension = f".{extension}"
    self.custom_handlers[extension.lower()] = handler
to_dict
to_dict()

Convert configuration to dictionary.

Source code in textxtract/core/config.py
def to_dict(self) -> Dict[str, Any]:
    """Convert configuration to dictionary."""
    return {
        "encoding": self.encoding,
        "logging_level": self.logging_level,
        "logging_format": self.logging_format,
        "timeout": self.timeout,
        "max_file_size": self.max_file_size,
        "max_memory_usage": self.max_memory_usage,
        "custom_handlers": {k: str(v) for k, v in self.custom_handlers.items()},
        **self.extra_config,
    }
exceptions

Custom exceptions for textxtract package.

Classes:

Name Description
ExtractionError

Raised when a general extraction error occurs.

ExtractionTimeoutError

Raised when extraction exceeds the allowed timeout.

FileTypeNotSupportedError

Raised when the file type is not supported.

InvalidFileError

Raised when the file is invalid or unsupported.

Classes
ExtractionError

Bases: Exception

Raised when a general extraction error occurs.

Source code in textxtract/core/exceptions.py
class ExtractionError(Exception):
    """Raised when a general extraction error occurs."""
ExtractionTimeoutError

Bases: ExtractionError

Raised when extraction exceeds the allowed timeout.

Source code in textxtract/core/exceptions.py
class ExtractionTimeoutError(ExtractionError):
    """Raised when extraction exceeds the allowed timeout."""
FileTypeNotSupportedError

Bases: ExtractionError

Raised when the file type is not supported.

Source code in textxtract/core/exceptions.py
class FileTypeNotSupportedError(ExtractionError):
    """Raised when the file type is not supported."""
InvalidFileError

Bases: ExtractionError

Raised when the file is invalid or unsupported.

Source code in textxtract/core/exceptions.py
class InvalidFileError(ExtractionError):
    """Raised when the file is invalid or unsupported."""
logging_config

Logging configuration for textxtract package.

Functions:

Name Description
setup_logging

Configure logging for the package.

Functions
setup_logging
setup_logging(level='INFO', fmt='%(asctime)s %(levelname)s %(name)s: %(message)s')

Configure logging for the package.

Source code in textxtract/core/logging_config.py
def setup_logging(
    level: str = "INFO", fmt: str = "%(asctime)s %(levelname)s %(name)s: %(message)s"
):
    """Configure logging for the package."""
    logging.basicConfig(level=getattr(logging, level.upper(), logging.INFO), format=fmt)
registry

Handler registry for centralized handler management.

Classes:

Name Description
HandlerRegistry

Central registry for file type handlers with caching and lazy loading.

Attributes:

Name Type Description
logger
registry
Attributes
logger module-attribute
logger = getLogger('textxtract.registry')
registry module-attribute
registry = HandlerRegistry()
Classes
HandlerRegistry

Central registry for file type handlers with caching and lazy loading.

Methods:

Name Description
__init__
__new__
get_handler

Get handler instance for file extension with caching.

get_supported_extensions

Get list of all supported file extensions.

is_supported

Check if a file extension is supported.

register_handler

Register a custom handler for a file extension.

Source code in textxtract/core/registry.py
class HandlerRegistry:
    """Central registry for file type handlers with caching and lazy loading."""

    _instance: Optional["HandlerRegistry"] = None
    _handlers: Dict[str, Type[FileTypeHandler]] = {}
    _initialized = False

    def __new__(cls) -> "HandlerRegistry":
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance

    def __init__(self):
        if not self._initialized:
            self._load_default_handlers()
            self._initialized = True

    def _load_default_handlers(self):
        """Load default handlers with lazy imports."""
        # Import handlers at runtime to support optional dependencies
        try:
            from textxtract.handlers.pdf import PDFHandler

            self._handlers[".pdf"] = PDFHandler
        except ImportError:
            logger.debug("PDF handler not available - pymupdf not installed")

        try:
            from textxtract.handlers.docx import DOCXHandler

            self._handlers[".docx"] = DOCXHandler
        except ImportError:
            logger.debug("DOCX handler not available - python-docx not installed")

        try:
            from textxtract.handlers.doc import DOCHandler

            self._handlers[".doc"] = DOCHandler
        except ImportError:
            logger.debug("DOC handler not available - antiword not installed")

        # Always available handlers
        from textxtract.handlers.txt import TXTHandler
        from textxtract.handlers.zip import ZIPHandler

        self._handlers[".txt"] = TXTHandler
        self._handlers[".text"] = TXTHandler
        self._handlers[".zip"] = ZIPHandler

        # Optional handlers with graceful fallback
        try:
            from textxtract.handlers.md import MDHandler

            self._handlers[".md"] = MDHandler
        except ImportError:
            logger.debug("MD handler not available - markdown not installed")

        try:
            from textxtract.handlers.rtf import RTFHandler

            self._handlers[".rtf"] = RTFHandler
        except ImportError:
            logger.debug("RTF handler not available - pyrtf-ng not installed")

        try:
            from textxtract.handlers.html import HTMLHandler

            self._handlers[".html"] = HTMLHandler
            self._handlers[".htm"] = HTMLHandler
        except ImportError:
            logger.debug("HTML handler not available - beautifulsoup4 not installed")

        # Standard library handlers
        from textxtract.handlers.csv import CSVHandler
        from textxtract.handlers.json import JSONHandler

        self._handlers[".csv"] = CSVHandler
        self._handlers[".json"] = JSONHandler

        try:
            from textxtract.handlers.xml import XMLHandler

            self._handlers[".xml"] = XMLHandler
        except ImportError:
            logger.debug("XML handler not available - lxml not installed")

    @lru_cache(maxsize=128)
    def get_handler(self, extension: str) -> FileTypeHandler:
        """Get handler instance for file extension with caching."""
        ext = extension.lower()
        handler_cls = self._handlers.get(ext)

        if not handler_cls:
            available = list(self._handlers.keys())
            raise FileTypeNotSupportedError(
                f"Unsupported file extension: {ext}. "
                f"Supported extensions: {', '.join(available)}"
            )

        # Create handler instance (handlers are lightweight and stateless)
        return handler_cls()

    def register_handler(self, extension: str, handler_cls: Type[FileTypeHandler]):
        """Register a custom handler for a file extension."""
        ext = extension.lower()
        if not ext.startswith("."):
            ext = f".{ext}"

        self._handlers[ext] = handler_cls
        # Clear cache when new handlers are registered
        self.get_handler.cache_clear()
        logger.info(
            "Registered custom handler %s for extension %s", handler_cls.__name__, ext
        )

    def get_supported_extensions(self) -> List[str]:
        """Get list of all supported file extensions."""
        return list(self._handlers.keys())

    def is_supported(self, extension: str) -> bool:
        """Check if a file extension is supported."""
        return extension.lower() in self._handlers
Functions
__init__
__init__()
Source code in textxtract/core/registry.py
def __init__(self):
    if not self._initialized:
        self._load_default_handlers()
        self._initialized = True
__new__
__new__()
Source code in textxtract/core/registry.py
def __new__(cls) -> "HandlerRegistry":
    if cls._instance is None:
        cls._instance = super().__new__(cls)
    return cls._instance
get_handler cached
get_handler(extension)

Get handler instance for file extension with caching.

Source code in textxtract/core/registry.py
@lru_cache(maxsize=128)
def get_handler(self, extension: str) -> FileTypeHandler:
    """Get handler instance for file extension with caching."""
    ext = extension.lower()
    handler_cls = self._handlers.get(ext)

    if not handler_cls:
        available = list(self._handlers.keys())
        raise FileTypeNotSupportedError(
            f"Unsupported file extension: {ext}. "
            f"Supported extensions: {', '.join(available)}"
        )

    # Create handler instance (handlers are lightweight and stateless)
    return handler_cls()
get_supported_extensions
get_supported_extensions()

Get list of all supported file extensions.

Source code in textxtract/core/registry.py
def get_supported_extensions(self) -> List[str]:
    """Get list of all supported file extensions."""
    return list(self._handlers.keys())
is_supported
is_supported(extension)

Check if a file extension is supported.

Source code in textxtract/core/registry.py
def is_supported(self, extension: str) -> bool:
    """Check if a file extension is supported."""
    return extension.lower() in self._handlers
register_handler
register_handler(extension, handler_cls)

Register a custom handler for a file extension.

Source code in textxtract/core/registry.py
def register_handler(self, extension: str, handler_cls: Type[FileTypeHandler]):
    """Register a custom handler for a file extension."""
    ext = extension.lower()
    if not ext.startswith("."):
        ext = f".{ext}"

    self._handlers[ext] = handler_cls
    # Clear cache when new handlers are registered
    self.get_handler.cache_clear()
    logger.info(
        "Registered custom handler %s for extension %s", handler_cls.__name__, ext
    )
utils

Utility functions for textxtract package.

Classes:

Name Description
FileInfo

File information data class.

Functions:

Name Description
create_temp_file

Create a temporary file from bytes and return its path with security validation.

get_file_info

Get file information for logging and debugging.

safe_unlink

Safely delete a file if it exists, optionally logging errors.

validate_file_extension

Check if the file has an allowed extension.

validate_file_size

Validate file size doesn't exceed limits.

validate_filename

Validate filename for security issues.

Attributes:

Name Type Description
DEFAULT_MAX_FILE_SIZE
DEFAULT_MAX_TEMP_FILES
Attributes
DEFAULT_MAX_FILE_SIZE module-attribute
DEFAULT_MAX_FILE_SIZE = 100 * 1024 * 1024
DEFAULT_MAX_TEMP_FILES module-attribute
DEFAULT_MAX_TEMP_FILES = 1000
Classes
FileInfo dataclass

File information data class.

Methods:

Name Description
__init__

Attributes:

Name Type Description
extension str
filename str
is_temp bool
size_bytes int
size_kb float

File size in KB.

size_mb float
Source code in textxtract/core/utils.py
@dataclass
class FileInfo:
    """File information data class."""

    filename: str
    size_bytes: int
    size_mb: float
    extension: str
    is_temp: bool = False

    @property
    def size_kb(self) -> float:
        """File size in KB."""
        return round(self.size_bytes / 1024, 2)
Attributes
extension instance-attribute
extension
filename instance-attribute
filename
is_temp class-attribute instance-attribute
is_temp = False
size_bytes instance-attribute
size_bytes
size_kb property
size_kb

File size in KB.

size_mb instance-attribute
size_mb
Functions
__init__
__init__(filename, size_bytes, size_mb, extension, is_temp=False)
Functions
create_temp_file
create_temp_file(file_bytes, filename, max_size=None)

Create a temporary file from bytes and return its path with security validation.

Source code in textxtract/core/utils.py
def create_temp_file(
    file_bytes: bytes, filename: str, max_size: Optional[int] = None
) -> Path:
    """Create a temporary file from bytes and return its path with security validation."""
    validate_filename(filename)
    validate_file_size(file_bytes, max_size)

    file_ext = Path(filename).suffix
    with tempfile.NamedTemporaryFile(delete=False, suffix=file_ext) as temp_file:
        temp_file.write(file_bytes)
        temp_path = Path(temp_file.name)

    # Ensure file was created successfully
    if not temp_path.exists():
        raise RuntimeError("Failed to create temporary file")

    return temp_path
get_file_info
get_file_info(source, filename=None)

Get file information for logging and debugging.

Parameters:

Name Type Description Default
source Union[Path, str, bytes]

File path or file bytes

required
filename Optional[str]

Required if source is bytes, optional for file paths

None

Returns:

Name Type Description
FileInfo FileInfo

Data class with file information

Source code in textxtract/core/utils.py
def get_file_info(
    source: Union[Path, str, bytes], filename: Optional[str] = None
) -> FileInfo:
    """
    Get file information for logging and debugging.

    Args:
        source: File path or file bytes
        filename: Required if source is bytes, optional for file paths

    Returns:
        FileInfo: Data class with file information
    """
    if isinstance(source, bytes):
        if not filename:
            raise ValueError("filename is required when source is bytes")
        return FileInfo(
            filename=filename,
            size_bytes=len(source),
            size_mb=round(len(source) / (1024 * 1024), 2),
            extension=Path(filename).suffix.lower(),
            is_temp=True,
        )
    else:
        # Handle file path
        file_path = Path(source)
        if not file_path.exists():
            raise FileNotFoundError(f"File not found: {file_path}")
        if not file_path.is_file():
            raise ValueError(f"Path is not a file: {file_path}")

        file_size = file_path.stat().st_size
        return FileInfo(
            filename=filename or file_path.name,
            size_bytes=file_size,
            size_mb=round(file_size / (1024 * 1024), 2),
            extension=file_path.suffix.lower(),
            is_temp=False,
        )
safe_unlink
safe_unlink(path, log_errors=True)

Safely delete a file if it exists, optionally logging errors.

Source code in textxtract/core/utils.py
def safe_unlink(path: Path, log_errors: bool = True) -> bool:
    """Safely delete a file if it exists, optionally logging errors."""
    try:
        if path.exists():
            path.unlink()
            return True
        return False
    except Exception as e:
        if log_errors:
            import logging

            logger = logging.getLogger("textxtract.utils")
            logger.warning("Failed to delete temporary file %s: %s", path, e)
        return False
validate_file_extension
validate_file_extension(filename, allowed_extensions)

Check if the file has an allowed extension.

Source code in textxtract/core/utils.py
def validate_file_extension(filename: str, allowed_extensions: list[str]) -> bool:
    """Check if the file has an allowed extension."""
    return Path(filename).suffix.lower() in allowed_extensions
validate_file_size
validate_file_size(file_bytes, max_size=None)

Validate file size doesn't exceed limits.

Source code in textxtract/core/utils.py
def validate_file_size(file_bytes: bytes, max_size: Optional[int] = None) -> None:
    """Validate file size doesn't exceed limits."""
    max_size = max_size or DEFAULT_MAX_FILE_SIZE
    if len(file_bytes) == 0:
        raise ValueError("File is empty (0 bytes)")
    if len(file_bytes) > max_size:
        raise ValueError(
            f"File size ({len(file_bytes):,} bytes) exceeds maximum "
            f"allowed size ({max_size:,} bytes)"
        )
validate_filename
validate_filename(filename)

Validate filename for security issues.

Source code in textxtract/core/utils.py
def validate_filename(filename: str) -> None:
    """Validate filename for security issues."""
    if not filename:
        raise ValueError("Filename cannot be empty")

    # Check for null bytes
    if "\x00" in filename:
        raise ValueError("Invalid filename: contains null byte")

    # Check for path traversal attempts
    if ".." in filename:
        raise ValueError("Invalid filename: path traversal detected")

    # Check for absolute paths (both Unix and Windows)
    if filename.startswith("/") or (len(filename) > 1 and filename[1] == ":"):
        raise ValueError("Invalid filename: absolute path not allowed")

    # Check for Windows path separators in suspicious contexts
    if "\\" in filename and (".." in filename or filename.count("\\") > 2):
        raise ValueError("Invalid filename: suspicious path structure")

    # Check filename length
    if len(filename) > 255:
        raise ValueError("Filename too long")

exceptions

Classes:

Name Description
ExtractionError

Raised when a general extraction error occurs.

ExtractionTimeoutError

Raised when extraction exceeds the allowed timeout.

FileTypeNotSupportedError

Raised when the file type is not supported.

InvalidFileError

Raised when the file is invalid or unsupported.

Attributes

__all__ module-attribute
__all__ = ['ExtractionError', 'InvalidFileError', 'FileTypeNotSupportedError', 'ExtractionTimeoutError']

Classes

ExtractionError

Bases: Exception

Raised when a general extraction error occurs.

Source code in textxtract/core/exceptions.py
class ExtractionError(Exception):
    """Raised when a general extraction error occurs."""
ExtractionTimeoutError

Bases: ExtractionError

Raised when extraction exceeds the allowed timeout.

Source code in textxtract/core/exceptions.py
class ExtractionTimeoutError(ExtractionError):
    """Raised when extraction exceeds the allowed timeout."""
FileTypeNotSupportedError

Bases: ExtractionError

Raised when the file type is not supported.

Source code in textxtract/core/exceptions.py
class FileTypeNotSupportedError(ExtractionError):
    """Raised when the file type is not supported."""
InvalidFileError

Bases: ExtractionError

Raised when the file is invalid or unsupported.

Source code in textxtract/core/exceptions.py
class InvalidFileError(ExtractionError):
    """Raised when the file is invalid or unsupported."""

handlers

File type-specific handlers package.

Modules:

Name Description
csv

CSV file handler for text extraction.

doc

DOC file handler for text extraction.

docx

DOCX file handler for text extraction.

html

HTML file handler for text extraction.

json

JSON file handler for text extraction.

md

Markdown (.md) file handler for text extraction.

pdf

PDF file handler for text extraction.

rtf

RTF file handler for text extraction.

txt

TXT file handler for text extraction.

xml

XML file handler for text extraction.

zip

ZIP file handler for text extraction.

Modules

csv

CSV file handler for text extraction.

Classes:

Name Description
CSVHandler

Handler for extracting text from CSV files.

Classes
CSVHandler

Bases: FileTypeHandler

Handler for extracting text from CSV files.

Methods:

Name Description
extract
extract_async
Source code in textxtract/handlers/csv.py
class CSVHandler(FileTypeHandler):
    """Handler for extracting text from CSV files."""

    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        try:
            encoding = (config or {}).get("encoding", "utf-8")
            with open(file_path, "r", encoding=encoding, newline="") as f:
                reader = csv.reader(f)
                return "\n".join([", ".join(row) for row in reader])
        except Exception as e:
            raise ExtractionError(f"CSV extraction failed: {e}")

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/csv.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    try:
        encoding = (config or {}).get("encoding", "utf-8")
        with open(file_path, "r", encoding=encoding, newline="") as f:
            reader = csv.reader(f)
            return "\n".join([", ".join(row) for row in reader])
    except Exception as e:
        raise ExtractionError(f"CSV extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/csv.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)
doc

DOC file handler for text extraction.

Classes:

Name Description
DOCHandler

Handler for extracting text from DOC files with fallback options.

Classes
DOCHandler

Bases: FileTypeHandler

Handler for extracting text from DOC files with fallback options.

Methods:

Name Description
extract
extract_async
Source code in textxtract/handlers/doc.py
class DOCHandler(FileTypeHandler):
    """Handler for extracting text from DOC files with fallback options."""

    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        # Try antiword first
        try:
            return self._extract_with_antiword(file_path)
        except FileNotFoundError:
            # Try alternative methods if antiword is not available
            return self._extract_with_fallback(file_path, config)
        except Exception as e:
            if isinstance(e, ExtractionError):
                raise
            raise ExtractionError(f"DOC extraction failed: {e}")

    def _extract_with_antiword(self, file_path: Path) -> str:
        """Extract text using antiword command."""
        import subprocess

        try:
            result = subprocess.run(
                ["antiword", str(file_path)],
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                check=True,
                timeout=30,  # Add timeout
            )
            content = result.stdout.decode("utf-8").strip()
            if not content:
                raise ExtractionError("antiword returned empty content")
            return content
        except subprocess.TimeoutExpired:
            raise ExtractionError("antiword extraction timed out")
        except subprocess.CalledProcessError as e:
            error_msg = e.stderr.decode() if e.stderr else str(e)
            raise ExtractionError(f"antiword extraction failed: {error_msg}")

    def _extract_with_fallback(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        """Fallback extraction methods when antiword is not available."""

        # Try python-docx (works for some DOC files)
        try:
            from docx import Document

            doc = Document(file_path)
            text = "\n".join(paragraph.text for paragraph in doc.paragraphs)
            if text.strip():
                return text
        except Exception:
            pass  # Silent fail, try next method

        # Try reading as binary and looking for text patterns
        try:
            with open(file_path, "rb") as f:
                content = f.read()

            # Simple heuristic: look for readable text in the binary
            text_content = []
            current_text = []

            for byte in content:
                if 32 <= byte <= 126:  # Printable ASCII
                    current_text.append(chr(byte))
                else:
                    if len(current_text) > 3:  # Minimum word length
                        text_content.append("".join(current_text))
                    current_text = []

            if current_text and len(current_text) > 3:
                text_content.append("".join(current_text))

            result = " ".join(text_content)
            if result.strip():
                return f"[Extracted using fallback method - may contain formatting artifacts]\n{result}"

        except Exception:
            pass

        # If all methods fail
        raise ExtractionError(
            "DOC extraction failed. Please install 'antiword' command for better DOC support: "
            "sudo apt-get install antiword (Ubuntu/Debian) or brew install antiword (macOS)"
        )

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/doc.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    # Try antiword first
    try:
        return self._extract_with_antiword(file_path)
    except FileNotFoundError:
        # Try alternative methods if antiword is not available
        return self._extract_with_fallback(file_path, config)
    except Exception as e:
        if isinstance(e, ExtractionError):
            raise
        raise ExtractionError(f"DOC extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/doc.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)
docx

DOCX file handler for text extraction.

Classes:

Name Description
DOCXHandler

Handler for extracting text from DOCX files.

Classes
DOCXHandler

Bases: FileTypeHandler

Handler for extracting text from DOCX files.

Methods:

Name Description
extract
extract_async
Source code in textxtract/handlers/docx.py
class DOCXHandler(FileTypeHandler):
    """Handler for extracting text from DOCX files."""

    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        try:
            from docx import Document

            doc = Document(file_path)
            return "\n".join(paragraph.text for paragraph in doc.paragraphs)
        except Exception as e:
            raise ExtractionError(f"DOCX extraction failed: {e}")

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/docx.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    try:
        from docx import Document

        doc = Document(file_path)
        return "\n".join(paragraph.text for paragraph in doc.paragraphs)
    except Exception as e:
        raise ExtractionError(f"DOCX extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/docx.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)
html

HTML file handler for text extraction.

Classes:

Name Description
HTMLHandler

Handler for extracting text from HTML files.

Classes
HTMLHandler

Bases: FileTypeHandler

Handler for extracting text from HTML files.

Methods:

Name Description
extract
extract_async
Source code in textxtract/handlers/html.py
class HTMLHandler(FileTypeHandler):
    """Handler for extracting text from HTML files."""

    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        try:
            try:
                from bs4 import BeautifulSoup
            except ImportError:
                raise ExtractionError(
                    "beautifulsoup4 package is not installed. Install with 'pip install text-extractor[html]'"
                )
            text = file_path.read_text(encoding=(config or {}).get("encoding", "utf-8"))
            soup = BeautifulSoup(text, "html.parser")
            return soup.get_text()
        except Exception as e:
            raise ExtractionError(f"HTML extraction failed: {e}")

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/html.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    try:
        try:
            from bs4 import BeautifulSoup
        except ImportError:
            raise ExtractionError(
                "beautifulsoup4 package is not installed. Install with 'pip install text-extractor[html]'"
            )
        text = file_path.read_text(encoding=(config or {}).get("encoding", "utf-8"))
        soup = BeautifulSoup(text, "html.parser")
        return soup.get_text()
    except Exception as e:
        raise ExtractionError(f"HTML extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/html.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)
json

JSON file handler for text extraction.

Classes:

Name Description
JSONHandler

Handler for extracting text from JSON files.

Classes
JSONHandler

Bases: FileTypeHandler

Handler for extracting text from JSON files.

Methods:

Name Description
extract
extract_async
Source code in textxtract/handlers/json.py
class JSONHandler(FileTypeHandler):
    """Handler for extracting text from JSON files."""

    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        try:
            encoding = (config or {}).get("encoding", "utf-8")
            with open(file_path, "r", encoding=encoding) as f:
                data = json.load(f)
                # Pretty print JSON as text
                return json.dumps(data, indent=2, ensure_ascii=False)
        except Exception as e:
            raise ExtractionError(f"JSON extraction failed: {e}")

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/json.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    try:
        encoding = (config or {}).get("encoding", "utf-8")
        with open(file_path, "r", encoding=encoding) as f:
            data = json.load(f)
            # Pretty print JSON as text
            return json.dumps(data, indent=2, ensure_ascii=False)
    except Exception as e:
        raise ExtractionError(f"JSON extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/json.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)
md

Markdown (.md) file handler for text extraction.

Classes:

Name Description
MDHandler

Handler for extracting text from Markdown files.

Classes
MDHandler

Bases: FileTypeHandler

Handler for extracting text from Markdown files.

Methods:

Name Description
extract
extract_async
Source code in textxtract/handlers/md.py
class MDHandler(FileTypeHandler):
    """Handler for extracting text from Markdown files."""

    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        try:
            try:
                import markdown
            except ImportError:
                raise ExtractionError(
                    "markdown package is not installed. Install with 'pip install text-extractor[md]'"
                )
            text = file_path.read_text(encoding=(config or {}).get("encoding", "utf-8"))
            # Optionally, convert markdown to plain text (strip HTML)
            html = markdown.markdown(text)
            # Remove HTML tags (best effort, fallback to raw text)
            try:
                from bs4 import BeautifulSoup

                soup = BeautifulSoup(html, "html.parser")
                return soup.get_text()
            except ImportError:
                return text
        except Exception as e:
            raise ExtractionError(f"MD extraction failed: {e}")

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/md.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    try:
        try:
            import markdown
        except ImportError:
            raise ExtractionError(
                "markdown package is not installed. Install with 'pip install text-extractor[md]'"
            )
        text = file_path.read_text(encoding=(config or {}).get("encoding", "utf-8"))
        # Optionally, convert markdown to plain text (strip HTML)
        html = markdown.markdown(text)
        # Remove HTML tags (best effort, fallback to raw text)
        try:
            from bs4 import BeautifulSoup

            soup = BeautifulSoup(html, "html.parser")
            return soup.get_text()
        except ImportError:
            return text
    except Exception as e:
        raise ExtractionError(f"MD extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/md.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)
pdf

PDF file handler for text extraction.

Classes:

Name Description
PDFHandler

Handler for extracting text from PDF files with improved error handling.

Classes
PDFHandler

Bases: FileTypeHandler

Handler for extracting text from PDF files with improved error handling.

Methods:

Name Description
extract
extract_async
Source code in textxtract/handlers/pdf.py
class PDFHandler(FileTypeHandler):
    """Handler for extracting text from PDF files with improved error handling."""

    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        try:
            try:
                import fitz  # PyMuPDF
            except ImportError:
                raise ExtractionError(
                    "PyMuPDF package is not installed. Install with 'pip install text-extractor[pdf]'"
                )

            doc = fitz.open(file_path)
            extracted_text = []
            empty_pages = 0

            for page_num, page in enumerate(doc):
                page_text = page.get_text("text").strip()
                if not page_text:
                    empty_pages += 1
                    # Try OCR-like text extraction for images
                    page_text = page.get_text("dict")  # Get structured text
                    if page_text and "blocks" in page_text:
                        # Check if page has images but no text
                        has_images = any(
                            block.get("type") == 1
                            for block in page_text.get("blocks", [])
                        )
                        if has_images:
                            extracted_text.append(
                                f"[Page {page_num + 1}: Contains images but no extractable text]"
                            )
                        else:
                            extracted_text.append(f"[Page {page_num + 1}: Empty page]")
                    else:
                        extracted_text.append(f"[Page {page_num + 1}: Empty page]")
                else:
                    extracted_text.append(page_text)

            doc.close()

            # Only raise error if ALL pages are empty and there's no content at all
            if not any(
                text.strip() and not text.startswith("[Page") for text in extracted_text
            ):
                if empty_pages == len(extracted_text):
                    raise InvalidFileError(
                        f"PDF contains {empty_pages} empty pages with no extractable text. "
                        "This may be a scanned PDF that requires OCR."
                    )

            result = "\n".join(extracted_text)
            return result

        except fitz.FileDataError as e:
            raise InvalidFileError(f"Invalid or corrupted PDF file: {e}")
        except fitz.EmptyFileError:
            raise InvalidFileError("PDF file is empty")
        except Exception as e:
            if isinstance(e, (ExtractionError, InvalidFileError)):
                raise
            raise ExtractionError(f"PDF extraction failed: {e}")

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/pdf.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    try:
        try:
            import fitz  # PyMuPDF
        except ImportError:
            raise ExtractionError(
                "PyMuPDF package is not installed. Install with 'pip install text-extractor[pdf]'"
            )

        doc = fitz.open(file_path)
        extracted_text = []
        empty_pages = 0

        for page_num, page in enumerate(doc):
            page_text = page.get_text("text").strip()
            if not page_text:
                empty_pages += 1
                # Try OCR-like text extraction for images
                page_text = page.get_text("dict")  # Get structured text
                if page_text and "blocks" in page_text:
                    # Check if page has images but no text
                    has_images = any(
                        block.get("type") == 1
                        for block in page_text.get("blocks", [])
                    )
                    if has_images:
                        extracted_text.append(
                            f"[Page {page_num + 1}: Contains images but no extractable text]"
                        )
                    else:
                        extracted_text.append(f"[Page {page_num + 1}: Empty page]")
                else:
                    extracted_text.append(f"[Page {page_num + 1}: Empty page]")
            else:
                extracted_text.append(page_text)

        doc.close()

        # Only raise error if ALL pages are empty and there's no content at all
        if not any(
            text.strip() and not text.startswith("[Page") for text in extracted_text
        ):
            if empty_pages == len(extracted_text):
                raise InvalidFileError(
                    f"PDF contains {empty_pages} empty pages with no extractable text. "
                    "This may be a scanned PDF that requires OCR."
                )

        result = "\n".join(extracted_text)
        return result

    except fitz.FileDataError as e:
        raise InvalidFileError(f"Invalid or corrupted PDF file: {e}")
    except fitz.EmptyFileError:
        raise InvalidFileError("PDF file is empty")
    except Exception as e:
        if isinstance(e, (ExtractionError, InvalidFileError)):
            raise
        raise ExtractionError(f"PDF extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/pdf.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)
rtf

RTF file handler for text extraction.

Classes:

Name Description
RTFHandler

Handler for extracting text from RTF files.

Classes
RTFHandler

Bases: FileTypeHandler

Handler for extracting text from RTF files.

Methods:

Name Description
extract
extract_async
Source code in textxtract/handlers/rtf.py
class RTFHandler(FileTypeHandler):
    """Handler for extracting text from RTF files."""

    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        try:
            try:
                from striprtf.striprtf import rtf_to_text
            except ImportError:
                raise ExtractionError(
                    "striprtf package is not installed. Install with 'pip install text-extractor[rtf]'"
                )

            with open(
                file_path, "r", encoding=(config or {}).get("encoding", "utf-8")
            ) as f:
                rtf_content = f.read()
                return rtf_to_text(rtf_content)
        except Exception as e:
            raise ExtractionError(f"RTF extraction failed: {e}")

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/rtf.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    try:
        try:
            from striprtf.striprtf import rtf_to_text
        except ImportError:
            raise ExtractionError(
                "striprtf package is not installed. Install with 'pip install text-extractor[rtf]'"
            )

        with open(
            file_path, "r", encoding=(config or {}).get("encoding", "utf-8")
        ) as f:
            rtf_content = f.read()
            return rtf_to_text(rtf_content)
    except Exception as e:
        raise ExtractionError(f"RTF extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/rtf.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)
txt

TXT file handler for text extraction.

Classes:

Name Description
TXTHandler

Handler for extracting text from TXT files.

Classes
TXTHandler

Bases: FileTypeHandler

Handler for extracting text from TXT files.

Methods:

Name Description
extract
extract_async
Source code in textxtract/handlers/txt.py
class TXTHandler(FileTypeHandler):
    """Handler for extracting text from TXT files."""

    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        encoding = (config or {}).get("encoding", "utf-8")
        try:
            return file_path.read_text(encoding=encoding)
        except Exception as e:
            raise ExtractionError(f"TXT extraction failed: {e}")

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/txt.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    encoding = (config or {}).get("encoding", "utf-8")
    try:
        return file_path.read_text(encoding=encoding)
    except Exception as e:
        raise ExtractionError(f"TXT extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/txt.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)
xml

XML file handler for text extraction.

Classes:

Name Description
XMLHandler

Handler for extracting text from XML files.

Classes
XMLHandler

Bases: FileTypeHandler

Handler for extracting text from XML files.

Methods:

Name Description
extract
extract_async
Source code in textxtract/handlers/xml.py
class XMLHandler(FileTypeHandler):
    """Handler for extracting text from XML files."""

    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        try:
            try:
                from lxml import etree
            except ImportError:
                raise ExtractionError(
                    "lxml package is not installed. Install with 'pip install text-extractor[xml]'"
                )
            encoding = (config or {}).get("encoding", "utf-8")
            with open(file_path, "r", encoding=encoding) as f:
                tree = etree.parse(f)
                return " ".join(tree.xpath("//text()"))
        except Exception as e:
            raise ExtractionError(f"XML extraction failed: {e}")

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/xml.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    try:
        try:
            from lxml import etree
        except ImportError:
            raise ExtractionError(
                "lxml package is not installed. Install with 'pip install text-extractor[xml]'"
            )
        encoding = (config or {}).get("encoding", "utf-8")
        with open(file_path, "r", encoding=encoding) as f:
            tree = etree.parse(f)
            return " ".join(tree.xpath("//text()"))
    except Exception as e:
        raise ExtractionError(f"XML extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/xml.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)
zip

ZIP file handler for text extraction.

Classes:

Name Description
ZIPHandler

Handler for extracting text from ZIP archives with security checks.

Attributes:

Name Type Description
logger
Attributes
logger module-attribute
logger = getLogger('textxtract.handlers.zip')
Classes
ZIPHandler

Bases: FileTypeHandler

Handler for extracting text from ZIP archives with security checks.

Methods:

Name Description
extract
extract_async

Attributes:

Name Type Description
MAX_EXTRACT_SIZE
MAX_FILES
Source code in textxtract/handlers/zip.py
class ZIPHandler(FileTypeHandler):
    """Handler for extracting text from ZIP archives with security checks."""

    MAX_EXTRACT_SIZE = 1024 * 1024 * 1024  # 1GB total
    MAX_FILES = 1000  # Maximum files to process

    def extract(self, file_path: Path, config: Optional[dict] = None) -> List[str]:
        extracted_texts = []
        total_size = 0
        file_count = 0

        try:
            with zipfile.ZipFile(file_path, "r") as zip_file:
                for file_info in zip_file.infolist():
                    if file_info.is_dir():
                        continue

                    # Security checks
                    if file_count >= self.MAX_FILES:
                        logger.warning("Maximum file limit reached in ZIP archive")
                        break

                    # Check for path traversal
                    if self._is_unsafe_path(file_info.filename):
                        logger.warning("Skipping unsafe path: %s", file_info.filename)
                        continue

                    # Check file size
                    if file_info.file_size > 100 * 1024 * 1024:  # 100MB per file
                        logger.warning(
                            "Skipping large file: %s (%d bytes)",
                            file_info.filename,
                            file_info.file_size,
                        )
                        continue

                    total_size += file_info.file_size
                    if total_size > self.MAX_EXTRACT_SIZE:
                        logger.warning("Total extract size limit reached")
                        break

                    file_count += 1

                    try:
                        with zip_file.open(file_info.filename) as source_file:
                            file_bytes = source_file.read()
                            suffix = Path(file_info.filename).suffix.lower()

                            # Use registry to get handler
                            from textxtract.core.registry import registry

                            if registry.is_supported(suffix):
                                handler = registry.get_handler(suffix)
                                with tempfile.NamedTemporaryFile(
                                    delete=False, suffix=suffix
                                ) as temp_file:
                                    temp_file.write(file_bytes)
                                    temp_path = Path(temp_file.name)
                                try:
                                    text = handler.extract(temp_path, config)
                                    extracted_texts.append(text)
                                    logger.debug(
                                        "Extracted text from %s", file_info.filename
                                    )
                                except Exception as e:
                                    logger.warning(
                                        "Failed to extract text from %s: %s",
                                        file_info.filename,
                                        e,
                                    )
                                finally:
                                    temp_path.unlink(missing_ok=True)
                            else:
                                logger.debug(
                                    "Unsupported file type: %s", file_info.filename
                                )

                    except Exception as e:
                        logger.warning(
                            "Error processing file %s: %s", file_info.filename, e
                        )
                        continue

            logger.info(
                "Extracted text from %d files in ZIP archive", len(extracted_texts)
            )
            return extracted_texts

        except Exception as e:
            raise ExtractionError(f"ZIP extraction failed: {e}")

    def _is_unsafe_path(self, path: str) -> bool:
        """Check if a path contains unsafe elements."""
        # Normalize path separators
        normalized = path.replace("\\", "/")

        # Check for path traversal attempts
        if ".." in normalized or normalized.startswith("/"):
            return True

        # Check for absolute paths on Windows
        if len(normalized) > 1 and normalized[1] == ":":
            return True

        return False

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> List[str]:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Attributes
MAX_EXTRACT_SIZE class-attribute instance-attribute
MAX_EXTRACT_SIZE = 1024 * 1024 * 1024
MAX_FILES class-attribute instance-attribute
MAX_FILES = 1000
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/zip.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> List[str]:
    extracted_texts = []
    total_size = 0
    file_count = 0

    try:
        with zipfile.ZipFile(file_path, "r") as zip_file:
            for file_info in zip_file.infolist():
                if file_info.is_dir():
                    continue

                # Security checks
                if file_count >= self.MAX_FILES:
                    logger.warning("Maximum file limit reached in ZIP archive")
                    break

                # Check for path traversal
                if self._is_unsafe_path(file_info.filename):
                    logger.warning("Skipping unsafe path: %s", file_info.filename)
                    continue

                # Check file size
                if file_info.file_size > 100 * 1024 * 1024:  # 100MB per file
                    logger.warning(
                        "Skipping large file: %s (%d bytes)",
                        file_info.filename,
                        file_info.file_size,
                    )
                    continue

                total_size += file_info.file_size
                if total_size > self.MAX_EXTRACT_SIZE:
                    logger.warning("Total extract size limit reached")
                    break

                file_count += 1

                try:
                    with zip_file.open(file_info.filename) as source_file:
                        file_bytes = source_file.read()
                        suffix = Path(file_info.filename).suffix.lower()

                        # Use registry to get handler
                        from textxtract.core.registry import registry

                        if registry.is_supported(suffix):
                            handler = registry.get_handler(suffix)
                            with tempfile.NamedTemporaryFile(
                                delete=False, suffix=suffix
                            ) as temp_file:
                                temp_file.write(file_bytes)
                                temp_path = Path(temp_file.name)
                            try:
                                text = handler.extract(temp_path, config)
                                extracted_texts.append(text)
                                logger.debug(
                                    "Extracted text from %s", file_info.filename
                                )
                            except Exception as e:
                                logger.warning(
                                    "Failed to extract text from %s: %s",
                                    file_info.filename,
                                    e,
                                )
                            finally:
                                temp_path.unlink(missing_ok=True)
                        else:
                            logger.debug(
                                "Unsupported file type: %s", file_info.filename
                            )

                except Exception as e:
                    logger.warning(
                        "Error processing file %s: %s", file_info.filename, e
                    )
                    continue

        logger.info(
            "Extracted text from %d files in ZIP archive", len(extracted_texts)
        )
        return extracted_texts

    except Exception as e:
        raise ExtractionError(f"ZIP extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/zip.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> List[str]:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)

sync

Synchronous extraction logic package.

Modules:

Name Description
extractor

Synchronous text extraction logic with support for file paths and bytes.

Modules

extractor

Synchronous text extraction logic with support for file paths and bytes.

Classes:

Name Description
SyncTextExtractor

Synchronous text extractor with support for file paths and bytes.

Attributes:

Name Type Description
logger
Attributes
logger module-attribute
logger = getLogger('textxtract.sync')
Classes
SyncTextExtractor

Bases: TextExtractor

Synchronous text extractor with support for file paths and bytes.

Provides synchronous text extraction from various file types. Logs debug and info level messages for tracing and diagnostics. Supports context manager protocol for proper cleanup.

Methods:

Name Description
__enter__

Context manager entry.

__exit__

Context manager exit.

__init__
extract

Extract text synchronously from file path or bytes.

Attributes:

Name Type Description
config
Source code in textxtract/sync/extractor.py
class SyncTextExtractor(TextExtractor):
    """
    Synchronous text extractor with support for file paths and bytes.

    Provides synchronous text extraction from various file types.
    Logs debug and info level messages for tracing and diagnostics.
    Supports context manager protocol for proper cleanup.
    """

    def __init__(self, config: Optional[ExtractorConfig] = None):
        self.config = config or ExtractorConfig()
        logger.debug(
            "SyncTextExtractor initialized with config: %s", self.config.__dict__
        )

    def extract(
        self,
        source: Union[Path, str, bytes],
        filename: Optional[str] = None,
        config: Optional[dict] = None,
    ) -> str:
        """
        Extract text synchronously from file path or bytes.

        Args:
            source: File path (Path/str) or file bytes
            filename: Required if source is bytes, optional for file paths
            config: Optional configuration overrides

        Returns:
            str: Extracted text.

        Raises:
            ValueError: If filename is missing when source is bytes
            FileTypeNotSupportedError: If the file extension is not supported.
            ExtractionError: If extraction fails.
            InvalidFileError: If the file is invalid or corrupted.
        """
        # Get file info for logging
        file_info = get_file_info(source, filename)
        logger.debug("Processing file: %s", file_info)

        # Prepare file path (create temp file if needed)
        file_path, temp_path = self._prepare_file_path(source, filename, config)

        try:
            # Validate file extension
            suffix = file_info.extension
            if not suffix:
                raise FileTypeNotSupportedError(
                    f"File has no extension: {file_info.filename}"
                )

            logger.debug("Detected file suffix: %s", suffix)

            # Get handler
            handler = registry.get_handler(suffix)
            handler_name = handler.__class__.__name__

            logger.info(
                "Using handler %s for file %s (size: %s MB, temp: %s)",
                handler_name,
                file_info.filename,
                file_info.size_mb,
                file_info.is_temp,
            )

            # Extract text
            try:
                result = handler.extract(file_path, config or self.config.__dict__)
            except Exception as e:
                logger.error(
                    "Extraction failed for file %s (handler: %s): %s",
                    file_info.filename,
                    handler_name,
                    e,
                )

                # Re-raise custom extraction errors
                if isinstance(e, ExtractionError):
                    raise
                # Wrap known invalid file errors
                if isinstance(e, (ValueError, OSError)):
                    raise InvalidFileError(
                        f"Invalid file: {file_info.filename} (handler: {handler_name}, error: {e})"
                    ) from e
                # Wrap as general extraction error
                raise ExtractionError(
                    f"Extraction failed for file {file_info.filename} using {handler_name}: {e}"
                ) from e

            logger.info(
                "Extraction successful for file %s (extracted %d characters)",
                file_info.filename,
                len(result),
            )
            return result

        finally:
            # Clean up temporary file if created
            if temp_path:
                safe_unlink(temp_path)
                logger.debug("Temporary file %s deleted", temp_path)

    def _prepare_file_path(
        self,
        source: Union[Path, str, bytes],
        filename: Optional[str],
        config: Optional[dict],
    ) -> tuple[Path, Optional[Path]]:
        """
        Prepare file path for extraction.

        Returns:
            tuple: (file_path, temp_path_if_created)
        """
        if isinstance(source, bytes):
            # Handle bytes input - create temporary file
            if not filename:
                raise ValueError("filename is required when source is bytes")

            temp_path = create_temp_file(
                source, filename, config and config.get("max_file_size")
            )
            logger.debug(
                "Temporary file created at %s for filename %s", temp_path, filename
            )
            return temp_path, temp_path
        else:
            # Handle file path input
            file_path = Path(source)
            if not file_path.exists():
                raise InvalidFileError(f"File not found: {file_path}")
            if not file_path.is_file():
                raise InvalidFileError(f"Path is not a file: {file_path}")

            logger.debug("Using existing file: %s", file_path)
            return file_path, None

    def __enter__(self):
        """Context manager entry."""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit."""
        pass  # No resources to clean up for sync extractor
Attributes
config instance-attribute
config = config or ExtractorConfig()
Functions
__enter__
__enter__()

Context manager entry.

Source code in textxtract/sync/extractor.py
def __enter__(self):
    """Context manager entry."""
    return self
__exit__
__exit__(exc_type, exc_val, exc_tb)

Context manager exit.

Source code in textxtract/sync/extractor.py
def __exit__(self, exc_type, exc_val, exc_tb):
    """Context manager exit."""
    pass  # No resources to clean up for sync extractor
__init__
__init__(config=None)
Source code in textxtract/sync/extractor.py
def __init__(self, config: Optional[ExtractorConfig] = None):
    self.config = config or ExtractorConfig()
    logger.debug(
        "SyncTextExtractor initialized with config: %s", self.config.__dict__
    )
extract
extract(source, filename=None, config=None)

Extract text synchronously from file path or bytes.

Parameters:

Name Type Description Default
source Union[Path, str, bytes]

File path (Path/str) or file bytes

required
filename Optional[str]

Required if source is bytes, optional for file paths

None
config Optional[dict]

Optional configuration overrides

None

Returns:

Name Type Description
str str

Extracted text.

Raises:

Type Description
ValueError

If filename is missing when source is bytes

FileTypeNotSupportedError

If the file extension is not supported.

ExtractionError

If extraction fails.

InvalidFileError

If the file is invalid or corrupted.

Source code in textxtract/sync/extractor.py
def extract(
    self,
    source: Union[Path, str, bytes],
    filename: Optional[str] = None,
    config: Optional[dict] = None,
) -> str:
    """
    Extract text synchronously from file path or bytes.

    Args:
        source: File path (Path/str) or file bytes
        filename: Required if source is bytes, optional for file paths
        config: Optional configuration overrides

    Returns:
        str: Extracted text.

    Raises:
        ValueError: If filename is missing when source is bytes
        FileTypeNotSupportedError: If the file extension is not supported.
        ExtractionError: If extraction fails.
        InvalidFileError: If the file is invalid or corrupted.
    """
    # Get file info for logging
    file_info = get_file_info(source, filename)
    logger.debug("Processing file: %s", file_info)

    # Prepare file path (create temp file if needed)
    file_path, temp_path = self._prepare_file_path(source, filename, config)

    try:
        # Validate file extension
        suffix = file_info.extension
        if not suffix:
            raise FileTypeNotSupportedError(
                f"File has no extension: {file_info.filename}"
            )

        logger.debug("Detected file suffix: %s", suffix)

        # Get handler
        handler = registry.get_handler(suffix)
        handler_name = handler.__class__.__name__

        logger.info(
            "Using handler %s for file %s (size: %s MB, temp: %s)",
            handler_name,
            file_info.filename,
            file_info.size_mb,
            file_info.is_temp,
        )

        # Extract text
        try:
            result = handler.extract(file_path, config or self.config.__dict__)
        except Exception as e:
            logger.error(
                "Extraction failed for file %s (handler: %s): %s",
                file_info.filename,
                handler_name,
                e,
            )

            # Re-raise custom extraction errors
            if isinstance(e, ExtractionError):
                raise
            # Wrap known invalid file errors
            if isinstance(e, (ValueError, OSError)):
                raise InvalidFileError(
                    f"Invalid file: {file_info.filename} (handler: {handler_name}, error: {e})"
                ) from e
            # Wrap as general extraction error
            raise ExtractionError(
                f"Extraction failed for file {file_info.filename} using {handler_name}: {e}"
            ) from e

        logger.info(
            "Extraction successful for file %s (extracted %d characters)",
            file_info.filename,
            len(result),
        )
        return result

    finally:
        # Clean up temporary file if created
        if temp_path:
            safe_unlink(temp_path)
            logger.debug("Temporary file %s deleted", temp_path)
Functions