Skip to content

Zip Module

ZIP file handler for text extraction.

Classes:

Name Description
ZIPHandler

Handler for extracting text from ZIP archives with security checks.

Attributes:

Name Type Description
logger

Attributes

logger module-attribute

logger = getLogger('textxtract.handlers.zip')

Classes

ZIPHandler

Bases: FileTypeHandler

Handler for extracting text from ZIP archives with security checks.

Methods:

Name Description
extract
extract_async

Attributes:

Name Type Description
MAX_EXTRACT_SIZE
MAX_FILES
Source code in textxtract/handlers/zip.py
class ZIPHandler(FileTypeHandler):
    """Handler for extracting text from ZIP archives with security checks."""

    MAX_EXTRACT_SIZE = 1024 * 1024 * 1024  # 1GB total
    MAX_FILES = 1000  # Maximum files to process

    def extract(self, file_path: Path, config: Optional[dict] = None) -> List[str]:
        extracted_texts = []
        total_size = 0
        file_count = 0

        try:
            with zipfile.ZipFile(file_path, "r") as zip_file:
                for file_info in zip_file.infolist():
                    if file_info.is_dir():
                        continue

                    # Security checks
                    if file_count >= self.MAX_FILES:
                        logger.warning("Maximum file limit reached in ZIP archive")
                        break

                    # Check for path traversal
                    if self._is_unsafe_path(file_info.filename):
                        logger.warning("Skipping unsafe path: %s", file_info.filename)
                        continue

                    # Check file size
                    if file_info.file_size > 100 * 1024 * 1024:  # 100MB per file
                        logger.warning(
                            "Skipping large file: %s (%d bytes)",
                            file_info.filename,
                            file_info.file_size,
                        )
                        continue

                    total_size += file_info.file_size
                    if total_size > self.MAX_EXTRACT_SIZE:
                        logger.warning("Total extract size limit reached")
                        break

                    file_count += 1

                    try:
                        with zip_file.open(file_info.filename) as source_file:
                            file_bytes = source_file.read()
                            suffix = Path(file_info.filename).suffix.lower()

                            # Use registry to get handler
                            from textxtract.core.registry import registry

                            if registry.is_supported(suffix):
                                handler = registry.get_handler(suffix)
                                with tempfile.NamedTemporaryFile(
                                    delete=False, suffix=suffix
                                ) as temp_file:
                                    temp_file.write(file_bytes)
                                    temp_path = Path(temp_file.name)
                                try:
                                    text = handler.extract(temp_path, config)
                                    extracted_texts.append(text)
                                    logger.debug(
                                        "Extracted text from %s", file_info.filename
                                    )
                                except Exception as e:
                                    logger.warning(
                                        "Failed to extract text from %s: %s",
                                        file_info.filename,
                                        e,
                                    )
                                finally:
                                    temp_path.unlink(missing_ok=True)
                            else:
                                logger.debug(
                                    "Unsupported file type: %s", file_info.filename
                                )

                    except Exception as e:
                        logger.warning(
                            "Error processing file %s: %s", file_info.filename, e
                        )
                        continue

            logger.info(
                "Extracted text from %d files in ZIP archive", len(extracted_texts)
            )
            return extracted_texts

        except Exception as e:
            raise ExtractionError(f"ZIP extraction failed: {e}")

    def _is_unsafe_path(self, path: str) -> bool:
        """Check if a path contains unsafe elements."""
        # Normalize path separators
        normalized = path.replace("\\", "/")

        # Check for path traversal attempts
        if ".." in normalized or normalized.startswith("/"):
            return True

        # Check for absolute paths on Windows
        if len(normalized) > 1 and normalized[1] == ":":
            return True

        return False

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> List[str]:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)

Attributes

MAX_EXTRACT_SIZE class-attribute instance-attribute
MAX_EXTRACT_SIZE = 1024 * 1024 * 1024
MAX_FILES class-attribute instance-attribute
MAX_FILES = 1000

Functions

extract
extract(file_path, config=None)
Source code in textxtract/handlers/zip.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> List[str]:
    extracted_texts = []
    total_size = 0
    file_count = 0

    try:
        with zipfile.ZipFile(file_path, "r") as zip_file:
            for file_info in zip_file.infolist():
                if file_info.is_dir():
                    continue

                # Security checks
                if file_count >= self.MAX_FILES:
                    logger.warning("Maximum file limit reached in ZIP archive")
                    break

                # Check for path traversal
                if self._is_unsafe_path(file_info.filename):
                    logger.warning("Skipping unsafe path: %s", file_info.filename)
                    continue

                # Check file size
                if file_info.file_size > 100 * 1024 * 1024:  # 100MB per file
                    logger.warning(
                        "Skipping large file: %s (%d bytes)",
                        file_info.filename,
                        file_info.file_size,
                    )
                    continue

                total_size += file_info.file_size
                if total_size > self.MAX_EXTRACT_SIZE:
                    logger.warning("Total extract size limit reached")
                    break

                file_count += 1

                try:
                    with zip_file.open(file_info.filename) as source_file:
                        file_bytes = source_file.read()
                        suffix = Path(file_info.filename).suffix.lower()

                        # Use registry to get handler
                        from textxtract.core.registry import registry

                        if registry.is_supported(suffix):
                            handler = registry.get_handler(suffix)
                            with tempfile.NamedTemporaryFile(
                                delete=False, suffix=suffix
                            ) as temp_file:
                                temp_file.write(file_bytes)
                                temp_path = Path(temp_file.name)
                            try:
                                text = handler.extract(temp_path, config)
                                extracted_texts.append(text)
                                logger.debug(
                                    "Extracted text from %s", file_info.filename
                                )
                            except Exception as e:
                                logger.warning(
                                    "Failed to extract text from %s: %s",
                                    file_info.filename,
                                    e,
                                )
                            finally:
                                temp_path.unlink(missing_ok=True)
                        else:
                            logger.debug(
                                "Unsupported file type: %s", file_info.filename
                            )

                except Exception as e:
                    logger.warning(
                        "Error processing file %s: %s", file_info.filename, e
                    )
                    continue

        logger.info(
            "Extracted text from %d files in ZIP archive", len(extracted_texts)
        )
        return extracted_texts

    except Exception as e:
        raise ExtractionError(f"ZIP extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/zip.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> List[str]:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)