SwiftDocsAI: Revolutionizing Documentation Generation with AWS Bedrock and Claude

Share

In the fast-paced world of software development, maintaining up-to-date and comprehensive documentation is often a challenging task. Enter SwiftDocsAI, an innovative open-source project that leverages the power of AWS Bedrock and Claude AI to automate the process of generating high-quality documentation for your code repositories. In this article, we’ll explore the key features, architecture, and implementation details of SwiftDocsAI, demonstrating how it can transform your documentation workflow.

Introduction

SwiftDocsAI is a Python-based tool designed to analyze code repositories, process their contents, and generate relevant, context-aware documentation using advanced AI models. By harnessing the capabilities of AWS Bedrock and the Claude AI model, SwiftDocsAI offers a sophisticated solution to the perennial challenge of keeping documentation in sync with rapidly evolving codebases.

Key Features

  1. Flexible File Processing: Supports various file types and allows customization of included/excluded files and folders.
  2. Intelligent Chunking: Breaks down large codebases into manageable chunks for efficient processing.
  3. Parallel and Sequential Processing: Adapts to the size of your codebase for optimal performance.
  4. AWS Bedrock Integration: Leverages cloud-based AI models for scalable and powerful text generation.
  5. Robust Retry Mechanism: Implements retry logic to handle potential API failures or rate limiting.

System Architecture

SwiftDocsAI follows a modular architecture to ensure flexibility and scalability:

  1. Input Module: Reads files from the specified directory, applying user-defined filters.
  2. Chunking Module: Splits files into processable chunks based on word and character limits.
  3. Processing Module: Sends chunks to Claude models for processing, utilizing multithreading for larger datasets.
  4. Output Module: Consolidates results into cohesive technical documents in Markdown format.

How It Works

SwiftDocsAI operates through a multi-step process:

  1. File Reading and Chunking: The tool scans the specified directory, reading files based on provided extensions and exclusion rules. It then chunks the content into smaller, processable pieces.
  2. Processing Type Determination: Based on the total content size, SwiftDocsAI decides whether to use sequential or parallel processing for optimal performance.
  3. AI Model Invocation: The chunked content is sent to the Claude AI model via AWS Bedrock for processing. The model generates documentation based on the provided context and instructions.
  4. Result Consolidation: For larger codebases processed in parallel, the tool combines the generated content into a cohesive document.
  5. Output Generation: The final documentation is saved to a markdown file, ready for use in your repository.

[File System] -> [File Processing Module] -> [Chunking Module]
                                                    |
                                                    v
[AWS Bedrock] <-> [AI Integration Module] <-> [Consolidation Module]
                                                    |
                                                    v
                                           [Output Generation Module]
                                                    |
                                                    v
                                           [Generated Documentation]

Implementation Details

Let’s dive into some of the key components of SwiftDocsAI:

Environment Setup

SwiftDocsAI uses environment variables for AWS credentials:

load_dotenv()

This loads the AWS access key and secret key from a .env file, ensuring secure credential management.

File Processing

The read_files_with_chunking function is responsible for reading and chunking the files:

def read_files_with_chunking(
    directory, file_extensions=None, exclude_folders=None, exclude_files=None
):
    """
    Reads all files in a directory, applies exclusions, and chunks content.

    Args:
        directory (str): The directory to read files from.
        file_extensions (list, optional): List of file extensions to include. Defaults to None.
        exclude_folders (list, optional): List of folders to exclude. Defaults to None.
        exclude_files (list, optional): List of specific files to exclude. Defaults to None.

    Returns:
        tuple: A tuple containing a list of chunks and the processing type.
    """
    logging.info(f"Starting to read files from directory: {directory}")
    exclude_folders = set(exclude_folders or [])
    exclude_files = set(exclude_files or [])
    file_extensions = file_extensions or []

    file_paths = [
        os.path.join(root, file)
        for root, dirs, files in os.walk(directory)
        if not any(folder in root for folder in exclude_folders)
        for file in files
        if (not file_extensions or any(file.endswith(ext) for ext in file_extensions))
        and file not in exclude_files
    ]

    logging.info(f"Total files found: {len(file_paths)}")
    all_chunks = []
    total_chars = 0
    total_words = 0

    for file_path in file_paths:
        with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
            content = f.read()
            total_chars += len(content)
            total_words += len(content.split())

    PROCESSING_TYPE = (
        "sequential"
        if total_chars <= (DEFAULT_MAX_CHARS - INSTRUCTION_CHARS) * CHUNK_LIMIT
        and total_words <= (DEFAULT_MAX_WORDS - INSTRUCTION_WORDS) * CHUNK_LIMIT
        else "parallel"
    )

    logging.info(f"Processing mode determined: {PROCESSING_TYPE}")
    if PROCESSING_TYPE == "sequential":
        logging.info("Processing files sequentially...")
        for file_path in file_paths:
            all_chunks.extend(process_file(file_path))
    else:
        logging.info("Processing files in parallel...")
        with ThreadPoolExecutor() as executor:
            results = executor.map(process_file, file_paths)
            for chunks in results:
                all_chunks.extend(chunks)

    logging.info(f"Total chunks created: {len(all_chunks)}")

    # Combine chunks to reduce the number of requests
    combined_chunks = combine_chunks(all_chunks)
    with open("logs/combined_chunks.json", "w") as f:
        json.dump(combined_chunks, f)
    return combined_chunks, PROCESSING_TYPE

This function walks through the directory, applies exclusion rules, and chunks the content based on character and word limits.

AI Model Invocation

The ask_claude_batch function handles the interaction with the Claude AI model:

def ask_claude_batch(
    context_chunks,
    PROCESSING_TYPE,
    model_id="anthropic.claude-3-5-sonnet-20240620-v1:0",
):
    """
    Sends chunks to the Claude model for processing and consolidates the responses.

    Args:
        context_chunks (list): List of context chunks to be processed.
        PROCESSING_TYPE (str): The processing type (sequential or parallel).
        model_id (str, optional): The model ID to use. Defaults to "anthropic.claude-3-5-sonnet-20240620-v1:0".

    Returns:
        str: The final consolidated response.
    """
    client = boto3.client(
        service_name="bedrock-runtime",
        region_name="us-east-1",
        config=Config(read_timeout=100000),
    )

    def invoke_model(metadata, chunk):
        """
        Invokes the Claude model for a single chunk.

        Args:
            metadata (str): Metadata for the chunk.
            chunk (str): The chunk content.

        Returns:
            str: The response text from the model.
        """
        prompt = f"""
        {INSTRUCTION_PROMPT}

        ### Context:
        {metadata}
        {chunk}
        """
        request_payload = {
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 8192,
            "temperature": 0.7,
            "messages": [
                {"role": "user", "content": [{"type": "text", "text": prompt}]}
            ],
        }
        response = client.invoke_model(
            modelId=model_id, body=json.dumps(request_payload)
        )
        return json.loads(response["body"].read())["content"][0]["text"]

    # Configuration for batching
    BATCH_SIZE = 8  # Number of requests per batch
    TIME_WINDOW = 60  # Time window in seconds for the batch

    # Function to invoke the model with retry logic
    def invoke_model_with_retry(metadata, chunk, retries=10, backoff_factor=2):
        """
        Invokes the model with retry logic in case of failures.

        Args:
            metadata (str): Metadata for the chunk.
            chunk (str): The chunk content.
            retries (int, optional): Number of retry attempts. Defaults to 10.
            backoff_factor (int, optional): Backoff factor for retry delay. Defaults to 2.

        Returns:
            str: The response text from the model.
        """
        delay = 30  # Initial delay in seconds
        for attempt in range(retries):
            try:
                return invoke_model(metadata, chunk)
            except Exception:
                if attempt < retries - 1:
                    wait_time = delay * (backoff_factor**attempt) + random.uniform(
                        5, 10
                    )
                    logging.info(
                        f"ThrottlingException: Retrying in {wait_time:.2f} seconds..."
                    )
                    time.sleep(wait_time)
                else:
                    logging.info("Maximum retry attempts reached.")
                    raise

    # Function to process chunks in batches
    def process_in_batches(context_chunks, batch_size, time_window):
        """
        Processes chunks in batches.

        Args:
            context_chunks (list): List of context chunks to be processed.
            batch_size (int): Number of requests per batch.
            time_window (int): Time window in seconds for the batch.

        Returns:
            list: List of results from the model.
        """
        results = []
        batch_start = 0
        while batch_start < len(context_chunks):
            batch_end = min(batch_start + batch_size, len(context_chunks))
            batch = context_chunks[batch_start:batch_end]

            logging.info(
                f"Processing batch {batch_start // batch_size + 1} (chunks {batch_start + 1}-{batch_end})..."
            )
            with ThreadPoolExecutor() as executor:
                batch_results = list(
                    executor.map(lambda x: invoke_model_with_retry(*x), batch)
                )
            results.extend(batch_results)

            # Wait for the remainder of the time window if needed
            if batch_end < len(context_chunks):  # Skip delay for the last batch
                time.sleep(time_window)

            batch_start = batch_end

        return results

    # Main logic for processing chunks
    if PROCESSING_TYPE == "parallel":
        logging.info("Processing chunks with Claude in parallel using batching...")

        # Process chunks in batches
        results = process_in_batches(context_chunks, BATCH_SIZE, TIME_WINDOW)

        # Combine chunks to reduce the number of requests
        combined_results = combine_results(results)

        while len(combined_results) != 1:
            # Process results in batches
            results = process_in_batches(combined_results, BATCH_SIZE, TIME_WINDOW)
            # Combine chunks to reduce the number of requests
            combined_results = combine_results(results)

        # Consolidate results into a single document
        final_results = invoke_model_with_retry(
            combined_results[0][0], combined_results[0][1]
        )

        return final_results

    else:
        # Process chunks sequentially and consolidate results
        logging.info("Processing chunks with Claude sequentially...")
        consolidated_context = ""
        final_response = ""
        for metadata, chunk in context_chunks:
            response_text = invoke_model_with_retry(
                "### Previous Context:\n"
                + consolidated_context
                + "\n\n### Current Context:\n"
                + str(metadata),
                str(chunk),
            )
            consolidated_context = response_text

        final_response = consolidated_context
        return final_response

This function manages both sequential and parallel processing, implementing batching and retry logic for robust performance.

Main Execution Flow

The main function orchestrates the entire process:

def main(
    directory,
    file_extensions=None,
    exclude_folders=None,
    exclude_files=None,
    output_file="README.md",
    model_id="anthropic.claude-3-5-sonnet-20240620-v1:0",
):
    """
    Main function to process all files and save the output.

    Args:
        directory (str): The directory to read files from.
        file_extensions (list, optional): List of file extensions to include. Defaults to None.
        exclude_folders (list, optional): List of folders to exclude. Defaults to None.
        output_file (str, optional): The output file name. Defaults to "README.md".
    """
    logging.info("Starting main process...")
    chunks, PROCESSING_TYPE = read_files_with_chunking(
        directory, file_extensions, exclude_folders, exclude_files
    )

    logging.info("Sending chunks for processing...")
    responses = ask_claude_batch(chunks, PROCESSING_TYPE, model_id)

    with open(output_file, "w", encoding="utf-8") as f:
        f.write(responses)

    logging.info(f"Processing complete. Output saved to: {output_file}")

It ties together all the components, from file reading to AI processing and output generation.

Usage and Customization

To use SwiftDocsAI, you can customize several parameters:

  • File extensions to include
  • Folders and files to exclude
  • Output file name
  • AI model selection

Here’s an example of how to run SwiftDocsAI:

if __name__ == "__main__":
    DIRECTORY = "/Code/"
    FILE_EXTENSIONS = [".ts", ".js", ".py", ".ksh", ".yaml", ".md", "Dockerfile", ".yml", ".txt", ".env", ".sh", ".html"]
    EXCLUDE_FOLDERS = [".git", ".venv", "node_modules", "logs", "docs", "outputs"]
    EXCLUDE_FILES = ["README.md", "LICENSE", "CONTRIBUTING.md", "CODE_OF_CONDUCT.md", ".DS_Store", ".env", ".md", "__init__.py"]
    OUTPUT_FILE = "README.md"
    MODEL_ID = "anthropic.claude-3-5-sonnet-20240620-v1:0"

    main(DIRECTORY, FILE_EXTENSIONS, EXCLUDE_FOLDERS, EXCLUDE_FILES, OUTPUT_FILE, MODEL_ID)

Real-World Applications

  1. Technical Documentation: Generate comprehensive, Markdown-ready documentation for software projects.
  2. Large-Scale Text Analysis: Efficiently split and analyze large datasets for research or business insights.
  3. Enterprise Solutions: Automate repetitive documentation tasks for enterprise clients, saving time and resources.

Conclusion

SwiftDocsAI demonstrates the power of AI in automating and enhancing software development workflows. By leveraging AWS Bedrock and Claude AI, it offers a sophisticated solution to the perennial challenge of maintaining up-to-date documentation. Whether you’re working on a small project or a large-scale application, SwiftDocsAI can significantly reduce the time and effort required for documentation, allowing developers to focus on what they do best – writing great code.

As AI continues to evolve, tools like SwiftDocsAI pave the way for more intelligent, automated, and efficient software development practices. By embracing such innovations, developers can streamline their workflows and produce higher quality, better-documented software.

Ready to transform your document workflows? Check out the SwiftDocsAI repository on GitHub and start exploring its potential today!