PC & IT SUPPORT MADE EASY FORUM
Would you like to react to this message? Create an account in a few clicks or log in to continue.

CrewAI Ollama StoryTime AI Python Example

Go down

CrewAI Ollama StoryTime AI Python Example Empty CrewAI Ollama StoryTime AI Python Example

Post by jamied_uk 9th August 2024, 15:27

create_story.sh


Code:
#!/bin/bash
# (c) J~Net 2024
#
#
#
echo "Starting Storytime AI By J~Net 2024"

python -m venv venv
source venv/bin/activate


python questV14.py




questV14.py

Code:
# (c) J~Net 2024
# https://jnet.forumotion.com/t2025-crewai-ollama-storytime-ai-python-example#3120

import sys
import os
import time
import logging
import json
import importlib
import subprocess
import tempfile
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from crewai import Agent, Task
import pkg_resources
import nltk

nltk.download('vader_lexicon')

# Load environment variables
load_dotenv()

# Configure logging
logging.basicConfig(filename='script.log', level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')

def sanitize_folder_name(name):
    """Sanitize folder name by removing spaces and limiting length."""
    sanitized_name=name.replace(' ', '_')[:15]  # Replace spaces and truncate to 15 characters
    return sanitized_name

def determine_project_folder(prompt):
    """
    Determine the project folder based on the prompt text.
    """
    folder_name=prompt.replace(' ', '_').strip('_')  # Remove leading/trailing underscores
    
    # Get the directory where the script is running
    script_dir=os.path.dirname(os.path.abspath(__file__))
    project_folder=os.path.join(script_dir, folder_name)

    # Create the directory if it doesn't exist
    os.makedirs(project_folder, exist_ok=True)
    
    # Print the full project folder path
    print(f"\033[92mProject folder: {project_folder}\033[0m")
    
    return project_folder


def read_prompt_file(file_path):
    """Read prompt from a file and return its content."""
    with open(file_path, 'r') as file:
        return file.read().strip()

def setup_and_execute(filename=None):
    """Sets up the environment and executes tasks based on the filename or interactive input."""
    start_time=time.time()

    if filename:
        project_folder=determine_project_folder(filename)
        print(f"Project folder: {project_folder}")

        files_dir, output_dir, tools_dir=setup_directories(project_folder)
        content=read_file(files_dir, filename)

        general_agent=create_agent(
            role='Requirements Manager',
            goal="""Handle any user input commands including reading a file
                    and creating files for apps based on user input. For file reading,
                    read the contents and display them. For file creation,
                    generate a filename based on user input or a pattern and create
                    the file with the specified content. Respond as if
                    you were an expert developer providing file creation services
                    on demand.""",
            backstory="""You are an expert developer with a strong background
                        in software engineering. You provide high quality,
                        thorough, and efficient file creation services based on
                        user requirements."""
        )

        if content:
            is_story='story' in filename.lower()
            result=execute_task_chain(content, general_agent, files_dir=files_dir, output_dir=output_dir, is_story=is_story)

            generated_filename=generate_filename(files_dir, "generated_file.txt")
            create_file(files_dir, generated_filename, result)
            print(f"Generated file: {generated_filename}")

    end_time=time.time()
    elapsed_time=end_time - start_time
    elapsed_hours, rem=divmod(elapsed_time, 3600)
    elapsed_minutes, elapsed_seconds=divmod(rem, 60)
    print(f"Execution time: {int(elapsed_hours)}h {int(elapsed_minutes)}m {int(elapsed_seconds)}s")

def handle_interactive_mode():
    """Handles interactive user input mode."""
    while True:
        prompt=input("Enter your command (type 'exit' to quit): ").strip()
        if prompt.lower() == 'exit':
            break

        project_folder=determine_project_folder(prompt)
        print(f"Project folder: {project_folder}")

        files_dir, output_dir, tools_dir=setup_directories(project_folder)
        memory_data=read_memory_file(files_dir)
        existing_steps, latest_output=read_existing_steps(files_dir)
        start_step=len(existing_steps)

        general_agent=create_agent(
            role='Requirements Manager',
            goal="""Handle any user input commands including reading a file
                    and creating files for apps based on user input. For file reading,
                    read the contents and display them. For file creation,
                    generate a filename based on user input or a pattern and create
                    the file with the specified content. Respond as if
                    you were an expert developer providing file creation services
                    on demand.""",
            backstory="""You are an expert developer with a strong background
                        in software engineering. You provide high quality,
                        thorough, and efficient file creation services based on
                        user requirements."""
        )

        if prompt.lower().startswith('read '):
            filename=prompt[len('read '):].strip()
            if filename:
                try:
                    start_time=time.time()
                    content=read_file(files_dir, filename)

                    general_agent.goal=content.strip()
                    create_file(files_dir, 'goal.txt', general_agent.goal)

                    is_story='story' in filename.lower()
                    result=execute_task_chain(content, general_agent, files_dir=files_dir, output_dir=output_dir, is_story=is_story)

                    generated_filename=generate_filename(files_dir, "generated_file.txt")
                    create_file(files_dir, generated_filename, result)
                    print(f"Generated file: {generated_filename}")

                    end_time=time.time()
                    elapsed_time=end_time - start_time
                    elapsed_hours, rem=divmod(elapsed_time, 3600)
                    elapsed_minutes, elapsed_seconds=divmod(rem, 60)
                    print(f"Execution time: {int(elapsed_hours)}h {int(elapsed_minutes)}m {int(elapsed_seconds)}s")
                except Exception as e:
                    logging.error(f"Error occurred while reading file: {e}", exc_info=True)
                    print(f"Error occurred: {e}")
            else:
                print("Filename is empty or invalid.")
        elif prompt.lower().startswith('create file '):
            parts=prompt[len('create file '):].split(' with content ', 1)
            if len(parts) == 2:
                filename, content=parts
                filename=generate_filename(files_dir, filename.strip())
                result=create_file(files_dir, filename, content.strip())
                print(result)
                append_output(output_dir, result)
            else:
                print("Invalid command format. Use: create file <filename> with content <content>")
        else:
            task_description=prompt
            try:
                task=Task(description=task_description.strip(), agent=general_agent)
                result=task.execute()

                if "Agent stopped due to iteration limit or time limit" in result:
                    print("Agent stopped due to iteration or time limit. Stopping execution.")
                    break

                print(result)
                append_output(output_dir, result)

                generated_filename=generate_filename(files_dir, "generated_file.txt")
                create_file(files_dir, generated_filename, result)
                print(f"Generated file: {generated_filename}")

            except Exception as e:
                logging.error(f"Error occurred during task execution: {e}", exc_info=True)
                print(f"Error occurred: {e}")

from functions import *

if __name__ == "__main__":
    if len(sys.argv) > 1:
        setup_and_execute(filename=sys.argv[1])
    else:
        handle_interactive_mode()




functions.py

Code:
# (c) J~Net 2024
# https://jnet.forumotion.com/t2025-crewai-ollama-storytime-ai-python-example#3120

import os
import json
import logging
import subprocess
import importlib.util
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from crewai import Agent, Task
import pkg_resources
from nltk.sentiment import SentimentIntensityAnalyzer
import warnings

warnings.filterwarnings("ignore", category=UserWarning, module='pydantic')

# Configure logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')

# Configure the LLM model
llm=ChatOpenAI(
    model="crewai-dolphin-phi",
    base_url="http://localhost:11434/v1"
)

def sanitize_filename(name, max_length=15):
    """Sanitize and truncate filenames."""
    sanitized_name=name.replace(' ', '_').strip('_')[:max_length]
    invalid_chars='<>:\"/\\|?*'
    sanitized_name=''.join(c for c in sanitized_name if c not in invalid_chars)
    return sanitized_name

def setup_directories(project_folder):
    """Sets up directories for files, output, and tools."""
    files_dir=os.path.join(project_folder, 'files')
    output_dir=os.path.join(project_folder, 'output')
    # Directory where the script is located
    script_dir=os.path.dirname(os.path.abspath(__file__))
    tools_dir=os.path.join(script_dir, 'tools')

    os.makedirs(files_dir, exist_ok=True)
    os.makedirs(output_dir, exist_ok=True)
    os.makedirs(tools_dir, exist_ok=True)

    logging.info(f"Directories set up: files_dir='{files_dir}', output_dir='{output_dir}', tools_dir='{tools_dir}'")
    return files_dir, output_dir, tools_dir

def determine_project_folder(prompt):
    """
    Determine the project folder based on the prompt text.
    """
    folder_name=sanitize_filename(prompt, max_length=15)  # Adjust length as needed
    
    # Get the directory where the script is running
    script_dir=os.path.dirname(os.path.abspath(__file__))
    project_folder=os.path.join(script_dir, folder_name)

    # Create the directory if it doesn't exist
    os.makedirs(project_folder, exist_ok=True)
    
    # Print the project folder path (truncated to 15 characters)
    print(f"\033[92mProject folder: {folder_name[:15]}\033[0m")
    
    return project_folder

def get_last_processed_page(project_folder):
    """
    Get the last processed page number from a progress file.
    """
    progress_file = os.path.join(project_folder, 'progress.json')
    if os.path.exists(progress_file):
        with open(progress_file, 'r') as file:
            progress = json.load(file)
            return progress.get('last_page', 0)
    return 0

def save_progress(project_folder, page_number):
    """
    Save the progress to a file to track the last processed page number.
    """
    progress_file = os.path.join(project_folder, 'progress.json')
    with open(progress_file, 'w') as file:
        json.dump({'last_page': page_number}, file)


def process_story(story_text, prompt, page_length=200):
    """
    Processes the story by splitting it into pages and writing each page to a file.
    """
    project_folder=determine_project_folder(prompt)
    pages=split_story_into_pages(story_text, page_length)
    start_page=get_last_processed_page(project_folder) + 1

    for i, page in enumerate(pages[start_page - 1:200], start=start_page):
        write_page_to_file(i, page, project_folder)
        save_progress(project_folder, i)

def split_story_into_pages(story_text, page_length):
    """
    Splits the story into pages of a given length.
    """
    return [story_text[i:i + page_length] for i in range(0, len(story_text), page_length)]

def write_page_to_file(page_number, page_text, project_folder):
    """
    Writes a single page of text to a file named 'page{page_number}.txt'.
    """
    file_path=os.path.join(project_folder, f"page{page_number}.txt")
    try:
        with open(file_path, 'w') as file:
            file.write(page_text)
        logging.info(f"Page {page_number} written to {file_path}")
    except Exception as e:
        logging.error(f"Failed to write page {page_number} to file: {e}")

def save_story_to_file(filename, content, prompt):
    """
    Save generated story content to a file inside the determined project folder.
    """
    project_folder=determine_project_folder(prompt)
    filename=sanitize_filename(filename)
    file_path=os.path.join(project_folder, filename)
    try:
        with open(file_path, 'w') as file:
            file.write(content)
        print(f"\033[92mContent saved to {file_path}\033[0m")
    except Exception as e:
        print(f"\033[91mFailed to save content to {file_path}: {e}\033[0m")

def generate_story_in_chain_mode(story_text, prompt):
    """
    Generate story pages in a continuous chain until completion, automatically saving each page.
    """
    project_folder=determine_project_folder(prompt)
    page_number=get_last_processed_page(project_folder) + 1
    is_story_complete=False
    
    while not is_story_complete:
        try:
            # Generate the next page of the story
            story_page_response=llm.invoke(f"Generate the next page of the story starting with this prompt:\n{story_text}")
            story_page_text=story_page_response.content if hasattr(story_page_response, 'content') else str(story_page_response)
            
            # Print each page's generated text to the console
            print(f"\033[92mGenerated Page {page_number}:\033[0m")  # Green text for each generated page
            print(story_page_text)
            
            # Save the generated page
            write_page_to_file(page_number, story_page_text, project_folder)
            save_progress(project_folder, page_number)
            
            # Check if the story is complete
            if "THE END" in story_page_text.upper():
                is_story_complete=True
                print("\033[92mStory generation complete!\033[0m")  # Green text for completion message
            else:
                # If story continues, update for next page
                page_number += 1
                story_text=story_page_text  # Continue generating based on previous page
            
        except Exception as e:
            logging.error(f"Error during story generation: {e}", exc_info=True)
            print(f"An error occurred during story generation: {e}")
            is_story_complete=True  # Break the loop on failure

def handle_interactive_mode():
    """
    Handle interactive mode for user commands, ensuring progressive story generation.
    """
    print("\033[92mStarting Storytime AI By J~Net 2024\033[0m")  # Green text for starting message
    
    script_dir=os.path.dirname(os.path.abspath(__file__))  # Directory where the script is located
    print(f"\033[92mScript directory: {script_dir}\033[0m")

    while True:
        try:
            user_input=input("Enter your command (type 'exit' to quit): ").strip()
            if user_input.lower() == 'exit':
                break
            elif user_input.startswith("read "):
                filename=sanitize_filename(user_input[5:].strip())

                # Read the story file from the same directory as the script
                story_file_path=os.path.join(script_dir, filename)
                print(f"\033[93mLooking for file: {story_file_path}\033[0m")  # Yellow text for file search

                if os.path.exists(story_file_path):
                    print(f"\033[92mFile '{filename}' found. Reading the file...\033[0m")

                    # Read the file content
                    with open(story_file_path, 'r') as file:
                        story_text=file.read()

                    # Print the content to console
                    print("\033[92mFile content:\033[0m")
                    print(story_text)

                    # Start by generating the first page of the story
                    current_page=get_last_processed_page(determine_project_folder(story_text)) + 1
                    story_length=200  # Assume we want 200 pages
                    
                    while current_page <= story_length:
                        print(f"\033[92mGenerating page {current_page} of the story...\033[0m")
                        page_prompt=f"Based on the plot, please generate page {current_page} of the story."
                        story_response=llm.invoke(f"{story_text}\n\n{page_prompt}")
                        
                        # Ensure response is treated correctly
                        if hasattr(story_response, 'content'):
                            story_response_text=story_response.content
                        else:
                            story_response_text=str(story_response)

                        print(f"\033[92mGenerated content for page {current_page}:\033[0m")
                        print(story_response_text)

                        # Save the generated story incrementally
                        save_story_to_file(f"page{current_page}.txt", story_response_text, story_text)
                        print(f"\033[92mPage {current_page} saved successfully.\033[0m")

                        # Update the current page counter
                        current_page += 1

                else:
                    print(f"\033[91mFile '{filename}' not found in '{script_dir}'.\033[0m")
            else:
                print("\033[93mUnknown command.\033[0m")
        except Exception as e:
            logging.error(f"Error during interactive mode: {e}", exc_info=True)
            print(f"\033[91mAn error occurred: {e}\033[0m")

def list_available_tools(tools_dir):
    """
    List all available tool files in the specified directory.
    """
    try:
        tools=[f[:-3] for f in os.listdir(tools_dir) if f.endswith('.py')]
        logging.debug(f"Available tools: {tools}")
        return tools
    except Exception as e:
        logging.error(f"Error listing available tools: {e}", exc_info=True)
        return []

def delete_tool(tools_dir, tool_name):
    """
    Delete a Python tool file from the specified directory.
    """
    tool_filename=os.path.join(tools_dir, f"{tool_name}.py")
    if os.path.exists(tool_filename):
        try:
            os.remove(tool_filename)
            logging.info(f"Tool '{tool_name}' deleted.")
            return f"Tool '{tool_name}' deleted successfully."
        except Exception as e:
            logging.error(f"Error occurred while deleting the tool: {e}", exc_info=True)
            return f"An error occurred while deleting the tool: {e}"
    else:
        return f"Tool '{tool_name}' does not exist."

def update_tool(tools_dir, tool_name, new_code):
    """
    Update the code of an existing Python tool.
    """
    tool_filename=os.path.join(tools_dir, f"{tool_name}.py")
    if os.path.exists(tool_filename):
        try:
            with open(tool_filename, 'w') as tool_file:
                tool_file.write(new_code)
            logging.info(f"Tool '{tool_name}' updated.")
            return f"Tool '{tool_name}' updated successfully."
        except Exception as e:
            logging.error(f"Error occurred while updating the tool: {e}", exc_info=True)
            return f"An error occurred while updating the tool: {e}"
    else:
        return f"Tool '{tool_name}' does not exist."

def execute_tool_function(tools_dir, tool_name, function_name, *args):
    """
    Execute a function from a Python tool.
    """
    tool=load_tool(tools_dir, tool_name)
    if hasattr(tool, function_name):
        func=getattr(tool, function_name)
        try:
            result=func(*args)
            logging.debug(f"Executed function '{function_name}' from tool '{tool_name}' with result: {result}")
            return result
        except Exception as e:
            logging.error(f"Error occurred while executing function '{function_name}' from tool '{tool_name}': {e}", exc_info=True)
            return f"An error occurred while executing the function: {e}"
    else:
        return f"Function '{function_name}' not found in tool '{tool_name}'."

def analyze_sentiment(text):
    """
    Analyze the sentiment of a given text using VADER.
    """
    sia=SentimentIntensityAnalyzer()
    sentiment_scores=sia.polarity_scores(text)
    return sentiment_scores

def move_file(src_dir, dest_dir, filename):
    """
    Move a file from one directory to another.
    """
    src_path=os.path.join(src_dir, filename)
    dest_path=os.path.join(dest_dir, filename)
    if os.path.exists(src_path):
        try:
            os.rename(src_path, dest_path)
            logging.debug(f"File '{filename}' moved from '{src_dir}' to '{dest_dir}'.")
            return f"File '{filename}' moved successfully."
        except Exception as e:
            logging.error(f"Error occurred while moving file '{filename}': {e}", exc_info=True)
            return f"An error occurred while moving the file: {e}"
    else:
        return f"File '{filename}' does not exist in the source directory."

def copy_file(src_dir, dest_dir, filename):
    """
    Copy a file from one directory to another.
    """
    src_path=os.path.join(src_dir, filename)
    dest_path=os.path.join(dest_dir, filename)
    if os.path.exists(src_path):
        try:
            with open(src_path, 'rb') as src_file:
                with open(dest_path, 'wb') as dest_file:
                    dest_file.write(src_file.read())
            logging.debug(f"File '{filename}' copied from '{src_dir}' to '{dest_dir}'.")
            return f"File '{filename}' copied successfully."
        except Exception as e:
            logging.error(f"Error occurred while copying file '{filename}': {e}", exc_info=True)
            return f"An error occurred while copying the file: {e}"
    else:
        return f"File '{filename}' does not exist in the source directory."

def search_text_in_files(files_dir, search_text):
    """
    Search for a text string within all files in the specified directory.
    """
    results={}
    for filename in os.listdir(files_dir):
        file_path=os.path.join(files_dir, filename)
        if os.path.isfile(file_path):
            try:
                with open(file_path, 'r') as file:
                    lines=file.readlines()
                    matching_lines=[line.strip() for line in lines if search_text.lower() in line.lower()]
                    if matching_lines:
                        results[filename]=matching_lines
            except Exception as e:
                logging.error(f"Error occurred while searching in file '{filename}': {e}", exc_info=True)
    return results

def list_files_in_directory(directory):
    """
    List all files in the specified directory.
    """
    try:
        return [f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))]
    except Exception as e:
        logging.error(f"Error occurred while listing files in directory '{directory}': {e}", exc_info=True)
        return []

def execute_shell_command(command):
    """
    Execute a shell command and return its output.
    """
    try:
        result=subprocess.check_output(command, shell=True, text=True)
        logging.debug(f"Executed command: '{command}' with output: {result}")
        return result
    except subprocess.CalledProcessError as e:
        logging.error(f"Command '{command}' failed with return code {e.returncode} and output: {e.output}", exc_info=True)
        return f"Command failed with return code {e.returncode}."

def load_tool(tools_dir, tool_name):
    """
    Load a Python tool module dynamically.
    """
    tool_path=os.path.join(tools_dir, f"{tool_name}.py")
    spec=importlib.util.spec_from_file_location(tool_name, tool_path)
    tool=importlib.util.module_from_spec(spec)
    try:
        spec.loader.exec_module(tool)
        logging.debug(f"Loaded tool '{tool_name}' from '{tool_path}'.")
        return tool
    except Exception as e:
        logging.error(f"Error occurred while loading tool '{tool_name}': {e}", exc_info=True)
        return None



requirements.txt

Code:
python-dotenv
langchain_openai
crewai
setuptools
nltk


Code:
pip install -r requirements.txt


Last edited by jamied_uk on 23rd August 2024, 21:41; edited 3 times in total
jamied_uk
jamied_uk
Admin

Posts : 3016
Join date : 2010-05-09
Age : 41
Location : UK

https://jnet.sytes.net

Back to top Go down

Back to top

- Similar topics

 
Permissions in this forum:
You cannot reply to topics in this forum