import os import time import zipfile import tempfile import requests import streamlit as st from dotenv import load_dotenv import ast from textblob import TextBlob import chardet import git load_dotenv() API_KEY = os.getenv("GROQ_API_KEY") if not API_KEY: st.error("API key is missing. Please make sure you have set the GROQ_API_KEY in the .env file.") st.stop() API_URL = "https://api.groq.com/openai/v1/chat/completions" DOCUMENT_STORE = "path_to_document_store" def query_groq_api(messages, retries=3, delay=2): headers = {"Authorization": f"Bearer {API_KEY}"} payload = { "model": "llama3-8b-8192", "messages": messages, "max_tokens": 1000, "temperature": 0.8, } attempt = 0 while attempt < retries: try: response = requests.post(API_URL, json=payload, headers=headers) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: attempt += 1 if attempt == retries: st.error(f"API request failed after {retries} attempts: {str(e)}") return None time.sleep(delay) def retrieve_relevant_documents(code_snippet): relevant_docs = [ "Document 1: Best practices for code", "Document 2: Code complexity analysis techniques", "Document 3: Common mistakes in coding" ] return relevant_docs def extract_zip_and_list_files(zip_file): extracted_files = [] temp_dir = tempfile.mkdtemp() with zipfile.ZipFile(zip_file, "r") as z: z.extractall(temp_dir) for root, _, files in os.walk(temp_dir): for file in files: extracted_files.append(os.path.join(root, file)) return extracted_files def clone_git_repo(git_url): temp_dir = tempfile.mkdtemp() try: git.Repo.clone_from(git_url, temp_dir) return temp_dir except Exception as e: st.error(f"Failed to clone the repository: {str(e)}") return None def read_file_content(file_path): try: with open(file_path, "r", encoding="utf-8") as f: return f.read() except UnicodeDecodeError: with open(file_path, "rb") as f: raw_data = f.read() result = chardet.detect(raw_data) encoding = result['encoding'] try: with open(file_path, "r", encoding=encoding) as f: return f.read() except (UnicodeDecodeError, TypeError): return f"Cannot read the file '{file_path}' as text. It may be a binary file." def is_programming_file(file_path): # Include only files with programming extensions for popular stacks with easily reviewable code programming_extensions = [ # General programming languages '.py', '.js', '.java', '.cpp', '.c', '.rb', '.php', '.go', '.ts','.swift', '.kt', '.dart', '.scala', # MERN stack '.jsx', '.tsx', # React.js and TypeScript (frontend in MERN) # Web development '.vue', '.ts', '.tsx', '.ejs', '.handlebars', # Mobile development '.dart', '.java', '.kt', '.swift', '.objective-c', # Blockchain development '.sol', '.rs', # Other common extensions '.bash', '.zsh', '.xml', ] # Files to exclude explicitly (e.g., configuration or high-payload files) excluded_files = ['package.json', 'package-lock.json', 'yarn.lock', 'composer.lock', 'go.sum', 'go.mod', '.html', '.css'] # Check file extension and exclude unwanted files file_name = os.path.basename(file_path).lower() return any(file_path.endswith(ext) for ext in programming_extensions) and file_name not in excluded_files class ReviewFeedback: def __init__(self): self.feedback = "" self.quality = 100 self.time_complexity = "" self.mistakes = [] def add_feedback(self, feedback): self.feedback += feedback + "\n" def set_quality(self, quality_score): self.quality = quality_score def set_time_complexity(self, complexity_feedback): self.time_complexity = complexity_feedback def add_mistake(self, mistake): self.mistakes.append(mistake) class CodeReviewTool: def __init__(self, code): self.code = code self.review_feedback = ReviewFeedback() self.start_time = time.time() def review_code(self): if not self.code.strip(): return "No code provided." relevant_docs = retrieve_relevant_documents(self.code) messages = [ {"role": "system", "content": f"Relevant documents for review: {', '.join(relevant_docs)}"}, {"role": "user", "content": f"Please review the following Python code:\n{self.code}"} ] result = query_groq_api(messages) if result: review = result.get("choices", [{}])[0].get("message", {}).get("content", "No review generated.") self.review_feedback.add_feedback(review) self.evaluate_time_complexity() self.infer_code_quality() return { "Feedback": self.review_feedback.feedback, "Code Quality (%)": self.review_feedback.quality, "Time Complexity": self.review_feedback.time_complexity, "Review Time (s)": time.time() - self.start_time, } else: return {"Mistakes": self.review_feedback.mistakes} def infer_code_quality(self): quality_score = 100 sentiment_score = 0 for feedback in self.review_feedback.feedback.split("\n"): blob = TextBlob(feedback) sentiment_score += blob.sentiment.polarity sentiment_avg = sentiment_score / len(self.review_feedback.feedback.split("\n")) if self.review_feedback.feedback else 0 quality_score -= 20 * (1 - sentiment_avg) self.review_feedback.set_quality(max(0, min(100, quality_score))) def evaluate_time_complexity(self): nested_loops = 0 try: tree = ast.parse(self.code) def count_loops(node, depth=0): nonlocal nested_loops if isinstance(node, (ast.For, ast.While)): if depth > 1: nested_loops += 1 for child in ast.iter_child_nodes(node): count_loops(child, depth + 1) for node in tree.body: count_loops(node) if nested_loops > 0: self.review_feedback.set_time_complexity(f"Nested Loops Detected: {nested_loops}. Consider optimizing the loops.") else: self.review_feedback.set_time_complexity("No nested loops detected. Time complexity looks reasonable.") except Exception as e: self.review_feedback.add_mistake(f"Error in time complexity analysis: {str(e)}") self.review_feedback.set_time_complexity("Unable to analyze time complexity.") st.title("Code Review Tool with RAG") st.markdown("Upload your project as a ZIP file or provide a Git repository URL for review:") git_url = st.text_input("Enter Git repository URL (leave empty if uploading ZIP file):") uploaded_file = st.file_uploader("Upload ZIP file", type=["zip"]) if st.button("Review Project"): if git_url: repo_dir = clone_git_repo(git_url) if repo_dir: st.write("**Files Found in Git Repo:**") all_files = [] for root, _, files in os.walk(repo_dir): for file in files: file_path = os.path.join(root, file) if is_programming_file(file_path): # Only include programming files all_files.append(file_path) for file_path in all_files: file_content = read_file_content(file_path) st.subheader(f"Reviewing {os.path.basename(file_path)}") review_tool = CodeReviewTool(file_content) review_results = review_tool.review_code() st.write(f"**File: {file_path}**") st.write(f"**Code Quality:** {review_results['Code Quality (%)']}%") st.write(f"**Time Complexity Analysis:** {review_results['Time Complexity']}") st.write(f"**Review Feedback:**\n{review_results['Feedback']}") st.write(f"**Review Time:** {review_results['Review Time (s)']:.2f} seconds") else: st.warning("Failed to clone the repository.") elif uploaded_file: python_files = extract_zip_and_list_files(uploaded_file) programming_files = [file for file in python_files if is_programming_file(file)] # Filter programming files if programming_files: st.write("**Programming Files Found in ZIP File:**") for file in programming_files: st.write(file) for file_path in programming_files: file_content = read_file_content(file_path) st.subheader(f"Reviewing {os.path.basename(file_path)}") review_tool = CodeReviewTool(file_content) review_results = review_tool.review_code() st.write(f"**File: {file_path}**") st.write(f"**Code Quality:** {review_results['Code Quality (%)']}%") st.write(f"**Time Complexity Analysis:** {review_results['Time Complexity']}") st.write(f"**Review Feedback:**\n{review_results['Feedback']}") st.write(f"**Review Time:** {review_results['Review Time (s)']:.2f} seconds") else: st.warning("No programming files found in the uploaded ZIP file.") else: st.warning("Please provide a Git repository URL or upload a ZIP file.")