Skip to content

Handlers Module

Overview

File format handlers for different document types.

File type-specific handlers package.

Modules:

Name Description
csv

CSV file handler for text extraction.

doc

DOC file handler for text extraction.

docx

DOCX file handler for text extraction.

html

HTML file handler for text extraction.

json

JSON file handler for text extraction.

md

Markdown (.md) file handler for text extraction.

pdf

PDF file handler for text extraction.

rtf

RTF file handler for text extraction.

txt

TXT file handler for text extraction.

xml

XML file handler for text extraction.

zip

ZIP file handler for text extraction.

Modules

csv

CSV file handler for text extraction.

Classes:

Name Description
CSVHandler

Handler for extracting text from CSV files.

Classes

CSVHandler

Bases: FileTypeHandler

Handler for extracting text from CSV files.

Methods:

Name Description
extract
extract_async
Source code in textxtract/handlers/csv.py
class CSVHandler(FileTypeHandler):
    """Handler for extracting text from CSV files."""

    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        try:
            encoding = (config or {}).get("encoding", "utf-8")
            with open(file_path, "r", encoding=encoding, newline="") as f:
                reader = csv.reader(f)
                return "\n".join([", ".join(row) for row in reader])
        except Exception as e:
            raise ExtractionError(f"CSV extraction failed: {e}")

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/csv.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    try:
        encoding = (config or {}).get("encoding", "utf-8")
        with open(file_path, "r", encoding=encoding, newline="") as f:
            reader = csv.reader(f)
            return "\n".join([", ".join(row) for row in reader])
    except Exception as e:
        raise ExtractionError(f"CSV extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/csv.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)

doc

DOC file handler for text extraction.

Classes:

Name Description
DOCHandler

Handler for extracting text from DOC files with fallback options.

Classes

DOCHandler

Bases: FileTypeHandler

Handler for extracting text from DOC files with fallback options.

Methods:

Name Description
extract
extract_async
Source code in textxtract/handlers/doc.py
class DOCHandler(FileTypeHandler):
    """Handler for extracting text from DOC files with fallback options."""

    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        # Try antiword first
        try:
            return self._extract_with_antiword(file_path)
        except FileNotFoundError:
            # Try alternative methods if antiword is not available
            return self._extract_with_fallback(file_path, config)
        except Exception as e:
            if isinstance(e, ExtractionError):
                raise
            raise ExtractionError(f"DOC extraction failed: {e}")

    def _extract_with_antiword(self, file_path: Path) -> str:
        """Extract text using antiword command."""
        import subprocess

        try:
            result = subprocess.run(
                ["antiword", str(file_path)],
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                check=True,
                timeout=30,  # Add timeout
            )
            content = result.stdout.decode("utf-8").strip()
            if not content:
                raise ExtractionError("antiword returned empty content")
            return content
        except subprocess.TimeoutExpired:
            raise ExtractionError("antiword extraction timed out")
        except subprocess.CalledProcessError as e:
            error_msg = e.stderr.decode() if e.stderr else str(e)
            raise ExtractionError(f"antiword extraction failed: {error_msg}")

    def _extract_with_fallback(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        """Fallback extraction methods when antiword is not available."""

        # Try python-docx (works for some DOC files)
        try:
            from docx import Document

            doc = Document(file_path)
            text = "\n".join(paragraph.text for paragraph in doc.paragraphs)
            if text.strip():
                return text
        except Exception:
            pass  # Silent fail, try next method

        # Try reading as binary and looking for text patterns
        try:
            with open(file_path, "rb") as f:
                content = f.read()

            # Simple heuristic: look for readable text in the binary
            text_content = []
            current_text = []

            for byte in content:
                if 32 <= byte <= 126:  # Printable ASCII
                    current_text.append(chr(byte))
                else:
                    if len(current_text) > 3:  # Minimum word length
                        text_content.append("".join(current_text))
                    current_text = []

            if current_text and len(current_text) > 3:
                text_content.append("".join(current_text))

            result = " ".join(text_content)
            if result.strip():
                return f"[Extracted using fallback method - may contain formatting artifacts]\n{result}"

        except Exception:
            pass

        # If all methods fail
        raise ExtractionError(
            "DOC extraction failed. Please install 'antiword' command for better DOC support: "
            "sudo apt-get install antiword (Ubuntu/Debian) or brew install antiword (macOS)"
        )

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/doc.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    # Try antiword first
    try:
        return self._extract_with_antiword(file_path)
    except FileNotFoundError:
        # Try alternative methods if antiword is not available
        return self._extract_with_fallback(file_path, config)
    except Exception as e:
        if isinstance(e, ExtractionError):
            raise
        raise ExtractionError(f"DOC extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/doc.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)

docx

DOCX file handler for text extraction.

Classes:

Name Description
DOCXHandler

Handler for extracting text from DOCX files.

Classes

DOCXHandler

Bases: FileTypeHandler

Handler for extracting text from DOCX files.

Methods:

Name Description
extract
extract_async
Source code in textxtract/handlers/docx.py
class DOCXHandler(FileTypeHandler):
    """Handler for extracting text from DOCX files."""

    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        try:
            from docx import Document

            doc = Document(file_path)
            return "\n".join(paragraph.text for paragraph in doc.paragraphs)
        except Exception as e:
            raise ExtractionError(f"DOCX extraction failed: {e}")

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/docx.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    try:
        from docx import Document

        doc = Document(file_path)
        return "\n".join(paragraph.text for paragraph in doc.paragraphs)
    except Exception as e:
        raise ExtractionError(f"DOCX extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/docx.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)

html

HTML file handler for text extraction.

Classes:

Name Description
HTMLHandler

Handler for extracting text from HTML files.

Classes

HTMLHandler

Bases: FileTypeHandler

Handler for extracting text from HTML files.

Methods:

Name Description
extract
extract_async
Source code in textxtract/handlers/html.py
class HTMLHandler(FileTypeHandler):
    """Handler for extracting text from HTML files."""

    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        try:
            try:
                from bs4 import BeautifulSoup
            except ImportError:
                raise ExtractionError(
                    "beautifulsoup4 package is not installed. Install with 'pip install text-extractor[html]'"
                )
            text = file_path.read_text(encoding=(config or {}).get("encoding", "utf-8"))
            soup = BeautifulSoup(text, "html.parser")
            return soup.get_text()
        except Exception as e:
            raise ExtractionError(f"HTML extraction failed: {e}")

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/html.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    try:
        try:
            from bs4 import BeautifulSoup
        except ImportError:
            raise ExtractionError(
                "beautifulsoup4 package is not installed. Install with 'pip install text-extractor[html]'"
            )
        text = file_path.read_text(encoding=(config or {}).get("encoding", "utf-8"))
        soup = BeautifulSoup(text, "html.parser")
        return soup.get_text()
    except Exception as e:
        raise ExtractionError(f"HTML extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/html.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)

json

JSON file handler for text extraction.

Classes:

Name Description
JSONHandler

Handler for extracting text from JSON files.

Classes

JSONHandler

Bases: FileTypeHandler

Handler for extracting text from JSON files.

Methods:

Name Description
extract
extract_async
Source code in textxtract/handlers/json.py
class JSONHandler(FileTypeHandler):
    """Handler for extracting text from JSON files."""

    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        try:
            encoding = (config or {}).get("encoding", "utf-8")
            with open(file_path, "r", encoding=encoding) as f:
                data = json.load(f)
                # Pretty print JSON as text
                return json.dumps(data, indent=2, ensure_ascii=False)
        except Exception as e:
            raise ExtractionError(f"JSON extraction failed: {e}")

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/json.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    try:
        encoding = (config or {}).get("encoding", "utf-8")
        with open(file_path, "r", encoding=encoding) as f:
            data = json.load(f)
            # Pretty print JSON as text
            return json.dumps(data, indent=2, ensure_ascii=False)
    except Exception as e:
        raise ExtractionError(f"JSON extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/json.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)

md

Markdown (.md) file handler for text extraction.

Classes:

Name Description
MDHandler

Handler for extracting text from Markdown files.

Classes

MDHandler

Bases: FileTypeHandler

Handler for extracting text from Markdown files.

Methods:

Name Description
extract
extract_async
Source code in textxtract/handlers/md.py
class MDHandler(FileTypeHandler):
    """Handler for extracting text from Markdown files."""

    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        try:
            try:
                import markdown
            except ImportError:
                raise ExtractionError(
                    "markdown package is not installed. Install with 'pip install text-extractor[md]'"
                )
            text = file_path.read_text(encoding=(config or {}).get("encoding", "utf-8"))
            # Optionally, convert markdown to plain text (strip HTML)
            html = markdown.markdown(text)
            # Remove HTML tags (best effort, fallback to raw text)
            try:
                from bs4 import BeautifulSoup

                soup = BeautifulSoup(html, "html.parser")
                return soup.get_text()
            except ImportError:
                return text
        except Exception as e:
            raise ExtractionError(f"MD extraction failed: {e}")

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/md.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    try:
        try:
            import markdown
        except ImportError:
            raise ExtractionError(
                "markdown package is not installed. Install with 'pip install text-extractor[md]'"
            )
        text = file_path.read_text(encoding=(config or {}).get("encoding", "utf-8"))
        # Optionally, convert markdown to plain text (strip HTML)
        html = markdown.markdown(text)
        # Remove HTML tags (best effort, fallback to raw text)
        try:
            from bs4 import BeautifulSoup

            soup = BeautifulSoup(html, "html.parser")
            return soup.get_text()
        except ImportError:
            return text
    except Exception as e:
        raise ExtractionError(f"MD extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/md.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)

pdf

PDF file handler for text extraction.

Classes:

Name Description
PDFHandler

Handler for extracting text from PDF files with improved error handling.

Classes

PDFHandler

Bases: FileTypeHandler

Handler for extracting text from PDF files with improved error handling.

Methods:

Name Description
extract
extract_async
Source code in textxtract/handlers/pdf.py
class PDFHandler(FileTypeHandler):
    """Handler for extracting text from PDF files with improved error handling."""

    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        try:
            try:
                import fitz  # PyMuPDF
            except ImportError:
                raise ExtractionError(
                    "PyMuPDF package is not installed. Install with 'pip install text-extractor[pdf]'"
                )

            doc = fitz.open(file_path)
            extracted_text = []
            empty_pages = 0

            for page_num, page in enumerate(doc):
                page_text = page.get_text("text").strip()
                if not page_text:
                    empty_pages += 1
                    # Try OCR-like text extraction for images
                    page_text = page.get_text("dict")  # Get structured text
                    if page_text and "blocks" in page_text:
                        # Check if page has images but no text
                        has_images = any(
                            block.get("type") == 1
                            for block in page_text.get("blocks", [])
                        )
                        if has_images:
                            extracted_text.append(
                                f"[Page {page_num + 1}: Contains images but no extractable text]"
                            )
                        else:
                            extracted_text.append(f"[Page {page_num + 1}: Empty page]")
                    else:
                        extracted_text.append(f"[Page {page_num + 1}: Empty page]")
                else:
                    extracted_text.append(page_text)

            doc.close()

            # Only raise error if ALL pages are empty and there's no content at all
            if not any(
                text.strip() and not text.startswith("[Page") for text in extracted_text
            ):
                if empty_pages == len(extracted_text):
                    raise InvalidFileError(
                        f"PDF contains {empty_pages} empty pages with no extractable text. "
                        "This may be a scanned PDF that requires OCR."
                    )

            result = "\n".join(extracted_text)
            return result

        except fitz.FileDataError as e:
            raise InvalidFileError(f"Invalid or corrupted PDF file: {e}")
        except fitz.EmptyFileError:
            raise InvalidFileError("PDF file is empty")
        except Exception as e:
            if isinstance(e, (ExtractionError, InvalidFileError)):
                raise
            raise ExtractionError(f"PDF extraction failed: {e}")

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/pdf.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    try:
        try:
            import fitz  # PyMuPDF
        except ImportError:
            raise ExtractionError(
                "PyMuPDF package is not installed. Install with 'pip install text-extractor[pdf]'"
            )

        doc = fitz.open(file_path)
        extracted_text = []
        empty_pages = 0

        for page_num, page in enumerate(doc):
            page_text = page.get_text("text").strip()
            if not page_text:
                empty_pages += 1
                # Try OCR-like text extraction for images
                page_text = page.get_text("dict")  # Get structured text
                if page_text and "blocks" in page_text:
                    # Check if page has images but no text
                    has_images = any(
                        block.get("type") == 1
                        for block in page_text.get("blocks", [])
                    )
                    if has_images:
                        extracted_text.append(
                            f"[Page {page_num + 1}: Contains images but no extractable text]"
                        )
                    else:
                        extracted_text.append(f"[Page {page_num + 1}: Empty page]")
                else:
                    extracted_text.append(f"[Page {page_num + 1}: Empty page]")
            else:
                extracted_text.append(page_text)

        doc.close()

        # Only raise error if ALL pages are empty and there's no content at all
        if not any(
            text.strip() and not text.startswith("[Page") for text in extracted_text
        ):
            if empty_pages == len(extracted_text):
                raise InvalidFileError(
                    f"PDF contains {empty_pages} empty pages with no extractable text. "
                    "This may be a scanned PDF that requires OCR."
                )

        result = "\n".join(extracted_text)
        return result

    except fitz.FileDataError as e:
        raise InvalidFileError(f"Invalid or corrupted PDF file: {e}")
    except fitz.EmptyFileError:
        raise InvalidFileError("PDF file is empty")
    except Exception as e:
        if isinstance(e, (ExtractionError, InvalidFileError)):
            raise
        raise ExtractionError(f"PDF extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/pdf.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)

rtf

RTF file handler for text extraction.

Classes:

Name Description
RTFHandler

Handler for extracting text from RTF files.

Classes

RTFHandler

Bases: FileTypeHandler

Handler for extracting text from RTF files.

Methods:

Name Description
extract
extract_async
Source code in textxtract/handlers/rtf.py
class RTFHandler(FileTypeHandler):
    """Handler for extracting text from RTF files."""

    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        try:
            try:
                from striprtf.striprtf import rtf_to_text
            except ImportError:
                raise ExtractionError(
                    "striprtf package is not installed. Install with 'pip install text-extractor[rtf]'"
                )

            with open(
                file_path, "r", encoding=(config or {}).get("encoding", "utf-8")
            ) as f:
                rtf_content = f.read()
                return rtf_to_text(rtf_content)
        except Exception as e:
            raise ExtractionError(f"RTF extraction failed: {e}")

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/rtf.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    try:
        try:
            from striprtf.striprtf import rtf_to_text
        except ImportError:
            raise ExtractionError(
                "striprtf package is not installed. Install with 'pip install text-extractor[rtf]'"
            )

        with open(
            file_path, "r", encoding=(config or {}).get("encoding", "utf-8")
        ) as f:
            rtf_content = f.read()
            return rtf_to_text(rtf_content)
    except Exception as e:
        raise ExtractionError(f"RTF extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/rtf.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)

txt

TXT file handler for text extraction.

Classes:

Name Description
TXTHandler

Handler for extracting text from TXT files.

Classes

TXTHandler

Bases: FileTypeHandler

Handler for extracting text from TXT files.

Methods:

Name Description
extract
extract_async
Source code in textxtract/handlers/txt.py
class TXTHandler(FileTypeHandler):
    """Handler for extracting text from TXT files."""

    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        encoding = (config or {}).get("encoding", "utf-8")
        try:
            return file_path.read_text(encoding=encoding)
        except Exception as e:
            raise ExtractionError(f"TXT extraction failed: {e}")

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/txt.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    encoding = (config or {}).get("encoding", "utf-8")
    try:
        return file_path.read_text(encoding=encoding)
    except Exception as e:
        raise ExtractionError(f"TXT extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/txt.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)

xml

XML file handler for text extraction.

Classes:

Name Description
XMLHandler

Handler for extracting text from XML files.

Classes

XMLHandler

Bases: FileTypeHandler

Handler for extracting text from XML files.

Methods:

Name Description
extract
extract_async
Source code in textxtract/handlers/xml.py
class XMLHandler(FileTypeHandler):
    """Handler for extracting text from XML files."""

    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        try:
            try:
                from lxml import etree
            except ImportError:
                raise ExtractionError(
                    "lxml package is not installed. Install with 'pip install text-extractor[xml]'"
                )
            encoding = (config or {}).get("encoding", "utf-8")
            with open(file_path, "r", encoding=encoding) as f:
                tree = etree.parse(f)
                return " ".join(tree.xpath("//text()"))
        except Exception as e:
            raise ExtractionError(f"XML extraction failed: {e}")

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/xml.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    try:
        try:
            from lxml import etree
        except ImportError:
            raise ExtractionError(
                "lxml package is not installed. Install with 'pip install text-extractor[xml]'"
            )
        encoding = (config or {}).get("encoding", "utf-8")
        with open(file_path, "r", encoding=encoding) as f:
            tree = etree.parse(f)
            return " ".join(tree.xpath("//text()"))
    except Exception as e:
        raise ExtractionError(f"XML extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/xml.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)

zip

ZIP file handler for text extraction.

Classes:

Name Description
ZIPHandler

Handler for extracting text from ZIP archives with security checks.

Attributes:

Name Type Description
logger

Attributes

logger module-attribute
logger = getLogger('textxtract.handlers.zip')

Classes

ZIPHandler

Bases: FileTypeHandler

Handler for extracting text from ZIP archives with security checks.

Methods:

Name Description
extract
extract_async

Attributes:

Name Type Description
MAX_EXTRACT_SIZE
MAX_FILES
Source code in textxtract/handlers/zip.py
class ZIPHandler(FileTypeHandler):
    """Handler for extracting text from ZIP archives with security checks."""

    MAX_EXTRACT_SIZE = 1024 * 1024 * 1024  # 1GB total
    MAX_FILES = 1000  # Maximum files to process

    def extract(self, file_path: Path, config: Optional[dict] = None) -> List[str]:
        extracted_texts = []
        total_size = 0
        file_count = 0

        try:
            with zipfile.ZipFile(file_path, "r") as zip_file:
                for file_info in zip_file.infolist():
                    if file_info.is_dir():
                        continue

                    # Security checks
                    if file_count >= self.MAX_FILES:
                        logger.warning("Maximum file limit reached in ZIP archive")
                        break

                    # Check for path traversal
                    if self._is_unsafe_path(file_info.filename):
                        logger.warning("Skipping unsafe path: %s", file_info.filename)
                        continue

                    # Check file size
                    if file_info.file_size > 100 * 1024 * 1024:  # 100MB per file
                        logger.warning(
                            "Skipping large file: %s (%d bytes)",
                            file_info.filename,
                            file_info.file_size,
                        )
                        continue

                    total_size += file_info.file_size
                    if total_size > self.MAX_EXTRACT_SIZE:
                        logger.warning("Total extract size limit reached")
                        break

                    file_count += 1

                    try:
                        with zip_file.open(file_info.filename) as source_file:
                            file_bytes = source_file.read()
                            suffix = Path(file_info.filename).suffix.lower()

                            # Use registry to get handler
                            from textxtract.core.registry import registry

                            if registry.is_supported(suffix):
                                handler = registry.get_handler(suffix)
                                with tempfile.NamedTemporaryFile(
                                    delete=False, suffix=suffix
                                ) as temp_file:
                                    temp_file.write(file_bytes)
                                    temp_path = Path(temp_file.name)
                                try:
                                    text = handler.extract(temp_path, config)
                                    extracted_texts.append(text)
                                    logger.debug(
                                        "Extracted text from %s", file_info.filename
                                    )
                                except Exception as e:
                                    logger.warning(
                                        "Failed to extract text from %s: %s",
                                        file_info.filename,
                                        e,
                                    )
                                finally:
                                    temp_path.unlink(missing_ok=True)
                            else:
                                logger.debug(
                                    "Unsupported file type: %s", file_info.filename
                                )

                    except Exception as e:
                        logger.warning(
                            "Error processing file %s: %s", file_info.filename, e
                        )
                        continue

            logger.info(
                "Extracted text from %d files in ZIP archive", len(extracted_texts)
            )
            return extracted_texts

        except Exception as e:
            raise ExtractionError(f"ZIP extraction failed: {e}")

    def _is_unsafe_path(self, path: str) -> bool:
        """Check if a path contains unsafe elements."""
        # Normalize path separators
        normalized = path.replace("\\", "/")

        # Check for path traversal attempts
        if ".." in normalized or normalized.startswith("/"):
            return True

        # Check for absolute paths on Windows
        if len(normalized) > 1 and normalized[1] == ":":
            return True

        return False

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> List[str]:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Attributes
MAX_EXTRACT_SIZE class-attribute instance-attribute
MAX_EXTRACT_SIZE = 1024 * 1024 * 1024
MAX_FILES class-attribute instance-attribute
MAX_FILES = 1000
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/zip.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> List[str]:
    extracted_texts = []
    total_size = 0
    file_count = 0

    try:
        with zipfile.ZipFile(file_path, "r") as zip_file:
            for file_info in zip_file.infolist():
                if file_info.is_dir():
                    continue

                # Security checks
                if file_count >= self.MAX_FILES:
                    logger.warning("Maximum file limit reached in ZIP archive")
                    break

                # Check for path traversal
                if self._is_unsafe_path(file_info.filename):
                    logger.warning("Skipping unsafe path: %s", file_info.filename)
                    continue

                # Check file size
                if file_info.file_size > 100 * 1024 * 1024:  # 100MB per file
                    logger.warning(
                        "Skipping large file: %s (%d bytes)",
                        file_info.filename,
                        file_info.file_size,
                    )
                    continue

                total_size += file_info.file_size
                if total_size > self.MAX_EXTRACT_SIZE:
                    logger.warning("Total extract size limit reached")
                    break

                file_count += 1

                try:
                    with zip_file.open(file_info.filename) as source_file:
                        file_bytes = source_file.read()
                        suffix = Path(file_info.filename).suffix.lower()

                        # Use registry to get handler
                        from textxtract.core.registry import registry

                        if registry.is_supported(suffix):
                            handler = registry.get_handler(suffix)
                            with tempfile.NamedTemporaryFile(
                                delete=False, suffix=suffix
                            ) as temp_file:
                                temp_file.write(file_bytes)
                                temp_path = Path(temp_file.name)
                            try:
                                text = handler.extract(temp_path, config)
                                extracted_texts.append(text)
                                logger.debug(
                                    "Extracted text from %s", file_info.filename
                                )
                            except Exception as e:
                                logger.warning(
                                    "Failed to extract text from %s: %s",
                                    file_info.filename,
                                    e,
                                )
                            finally:
                                temp_path.unlink(missing_ok=True)
                        else:
                            logger.debug(
                                "Unsupported file type: %s", file_info.filename
                            )

                except Exception as e:
                    logger.warning(
                        "Error processing file %s: %s", file_info.filename, e
                    )
                    continue

        logger.info(
            "Extracted text from %d files in ZIP archive", len(extracted_texts)
        )
        return extracted_texts

    except Exception as e:
        raise ExtractionError(f"ZIP extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/zip.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> List[str]:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)