Initial commit
This commit is contained in:
parent
e2b24b6f7a
commit
bc7a6c3004
10
.gitignore
vendored
Normal file
10
.gitignore
vendored
Normal file
|
@ -0,0 +1,10 @@
|
||||||
|
# Environment variables
|
||||||
|
.env
|
||||||
|
|
||||||
|
# Python virtual environment
|
||||||
|
venv
|
||||||
|
|
||||||
|
.streamlit/
|
||||||
|
|
||||||
|
|
||||||
|
|
7
requirements.txt
Normal file
7
requirements.txt
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
requests==2.32.3
|
||||||
|
streamlit==1.40.1
|
||||||
|
python-dotenv==0.0.1
|
||||||
|
textblob==0.18.0.post0
|
||||||
|
chardet==5.2.0
|
||||||
|
GitPython==3.1.44
|
||||||
|
|
261
review.py
Normal file
261
review.py
Normal file
|
@ -0,0 +1,261 @@
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
import zipfile
|
||||||
|
import tempfile
|
||||||
|
import requests
|
||||||
|
import streamlit as st
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
import ast
|
||||||
|
from textblob import TextBlob
|
||||||
|
import chardet
|
||||||
|
import git
|
||||||
|
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
|
API_KEY = os.getenv("GROQ_API_KEY")
|
||||||
|
|
||||||
|
if not API_KEY:
|
||||||
|
st.error("API key is missing. Please make sure you have set the GROQ_API_KEY in the .env file.")
|
||||||
|
st.stop()
|
||||||
|
|
||||||
|
API_URL = "https://api.groq.com/openai/v1/chat/completions"
|
||||||
|
DOCUMENT_STORE = "path_to_document_store"
|
||||||
|
|
||||||
|
def query_groq_api(messages, retries=3, delay=2):
|
||||||
|
headers = {"Authorization": f"Bearer {API_KEY}"}
|
||||||
|
payload = {
|
||||||
|
"model": "llama3-8b-8192",
|
||||||
|
"messages": messages,
|
||||||
|
"max_tokens": 1000,
|
||||||
|
"temperature": 0.8,
|
||||||
|
}
|
||||||
|
attempt = 0
|
||||||
|
while attempt < retries:
|
||||||
|
try:
|
||||||
|
response = requests.post(API_URL, json=payload, headers=headers)
|
||||||
|
response.raise_for_status()
|
||||||
|
return response.json()
|
||||||
|
except requests.exceptions.RequestException as e:
|
||||||
|
attempt += 1
|
||||||
|
if attempt == retries:
|
||||||
|
st.error(f"API request failed after {retries} attempts: {str(e)}")
|
||||||
|
return None
|
||||||
|
time.sleep(delay)
|
||||||
|
|
||||||
|
def retrieve_relevant_documents(code_snippet):
|
||||||
|
relevant_docs = [
|
||||||
|
"Document 1: Best practices for code",
|
||||||
|
"Document 2: Code complexity analysis techniques",
|
||||||
|
"Document 3: Common mistakes in coding"
|
||||||
|
]
|
||||||
|
return relevant_docs
|
||||||
|
|
||||||
|
def extract_zip_and_list_files(zip_file):
|
||||||
|
extracted_files = []
|
||||||
|
temp_dir = tempfile.mkdtemp()
|
||||||
|
with zipfile.ZipFile(zip_file, "r") as z:
|
||||||
|
z.extractall(temp_dir)
|
||||||
|
for root, _, files in os.walk(temp_dir):
|
||||||
|
for file in files:
|
||||||
|
extracted_files.append(os.path.join(root, file))
|
||||||
|
return extracted_files
|
||||||
|
|
||||||
|
def clone_git_repo(git_url):
|
||||||
|
temp_dir = tempfile.mkdtemp()
|
||||||
|
try:
|
||||||
|
git.Repo.clone_from(git_url, temp_dir)
|
||||||
|
return temp_dir
|
||||||
|
except Exception as e:
|
||||||
|
st.error(f"Failed to clone the repository: {str(e)}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def read_file_content(file_path):
|
||||||
|
try:
|
||||||
|
with open(file_path, "r", encoding="utf-8") as f:
|
||||||
|
return f.read()
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
with open(file_path, "rb") as f:
|
||||||
|
raw_data = f.read()
|
||||||
|
result = chardet.detect(raw_data)
|
||||||
|
encoding = result['encoding']
|
||||||
|
try:
|
||||||
|
with open(file_path, "r", encoding=encoding) as f:
|
||||||
|
return f.read()
|
||||||
|
except (UnicodeDecodeError, TypeError):
|
||||||
|
return f"Cannot read the file '{file_path}' as text. It may be a binary file."
|
||||||
|
|
||||||
|
def is_programming_file(file_path):
|
||||||
|
# Include only files with programming extensions for popular stacks with easily reviewable code
|
||||||
|
programming_extensions = [
|
||||||
|
# General programming languages
|
||||||
|
'.py', '.js', '.java', '.cpp', '.c', '.rb', '.php', '.go', '.ts','.swift', '.kt', '.dart', '.scala',
|
||||||
|
|
||||||
|
# MERN stack
|
||||||
|
'.jsx', '.tsx', # React.js and TypeScript (frontend in MERN)
|
||||||
|
|
||||||
|
# Web development
|
||||||
|
'.vue', '.ts', '.tsx', '.ejs', '.handlebars',
|
||||||
|
|
||||||
|
# Mobile development
|
||||||
|
'.dart', '.java', '.kt', '.swift', '.objective-c',
|
||||||
|
|
||||||
|
# Blockchain development
|
||||||
|
'.sol', '.rs',
|
||||||
|
|
||||||
|
# Other common extensions
|
||||||
|
'.bash', '.zsh', '.xml',
|
||||||
|
]
|
||||||
|
|
||||||
|
# Files to exclude explicitly (e.g., configuration or high-payload files)
|
||||||
|
excluded_files = ['package.json', 'package-lock.json', 'yarn.lock', 'composer.lock', 'go.sum', 'go.mod', '.html', '.css']
|
||||||
|
|
||||||
|
# Check file extension and exclude unwanted files
|
||||||
|
file_name = os.path.basename(file_path).lower()
|
||||||
|
return any(file_path.endswith(ext) for ext in programming_extensions) and file_name not in excluded_files
|
||||||
|
|
||||||
|
|
||||||
|
class ReviewFeedback:
|
||||||
|
def __init__(self):
|
||||||
|
self.feedback = ""
|
||||||
|
self.quality = 100
|
||||||
|
self.time_complexity = ""
|
||||||
|
self.mistakes = []
|
||||||
|
|
||||||
|
def add_feedback(self, feedback):
|
||||||
|
self.feedback += feedback + "\n"
|
||||||
|
|
||||||
|
def set_quality(self, quality_score):
|
||||||
|
self.quality = quality_score
|
||||||
|
|
||||||
|
def set_time_complexity(self, complexity_feedback):
|
||||||
|
self.time_complexity = complexity_feedback
|
||||||
|
|
||||||
|
def add_mistake(self, mistake):
|
||||||
|
self.mistakes.append(mistake)
|
||||||
|
|
||||||
|
class CodeReviewTool:
|
||||||
|
def __init__(self, code):
|
||||||
|
self.code = code
|
||||||
|
self.review_feedback = ReviewFeedback()
|
||||||
|
self.start_time = time.time()
|
||||||
|
|
||||||
|
def review_code(self):
|
||||||
|
if not self.code.strip():
|
||||||
|
return "No code provided."
|
||||||
|
|
||||||
|
relevant_docs = retrieve_relevant_documents(self.code)
|
||||||
|
messages = [
|
||||||
|
{"role": "system", "content": f"Relevant documents for review: {', '.join(relevant_docs)}"},
|
||||||
|
{"role": "user", "content": f"Please review the following Python code:\n{self.code}"}
|
||||||
|
]
|
||||||
|
result = query_groq_api(messages)
|
||||||
|
|
||||||
|
if result:
|
||||||
|
review = result.get("choices", [{}])[0].get("message", {}).get("content", "No review generated.")
|
||||||
|
self.review_feedback.add_feedback(review)
|
||||||
|
self.evaluate_time_complexity()
|
||||||
|
self.infer_code_quality()
|
||||||
|
|
||||||
|
return {
|
||||||
|
"Feedback": self.review_feedback.feedback,
|
||||||
|
"Code Quality (%)": self.review_feedback.quality,
|
||||||
|
"Time Complexity": self.review_feedback.time_complexity,
|
||||||
|
"Review Time (s)": time.time() - self.start_time,
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
return {"Mistakes": self.review_feedback.mistakes}
|
||||||
|
|
||||||
|
def infer_code_quality(self):
|
||||||
|
quality_score = 100
|
||||||
|
sentiment_score = 0
|
||||||
|
|
||||||
|
for feedback in self.review_feedback.feedback.split("\n"):
|
||||||
|
blob = TextBlob(feedback)
|
||||||
|
sentiment_score += blob.sentiment.polarity
|
||||||
|
|
||||||
|
sentiment_avg = sentiment_score / len(self.review_feedback.feedback.split("\n")) if self.review_feedback.feedback else 0
|
||||||
|
quality_score -= 20 * (1 - sentiment_avg)
|
||||||
|
self.review_feedback.set_quality(max(0, min(100, quality_score)))
|
||||||
|
|
||||||
|
def evaluate_time_complexity(self):
|
||||||
|
nested_loops = 0
|
||||||
|
try:
|
||||||
|
tree = ast.parse(self.code)
|
||||||
|
|
||||||
|
def count_loops(node, depth=0):
|
||||||
|
nonlocal nested_loops
|
||||||
|
if isinstance(node, (ast.For, ast.While)):
|
||||||
|
if depth > 1:
|
||||||
|
nested_loops += 1
|
||||||
|
for child in ast.iter_child_nodes(node):
|
||||||
|
count_loops(child, depth + 1)
|
||||||
|
|
||||||
|
for node in tree.body:
|
||||||
|
count_loops(node)
|
||||||
|
|
||||||
|
if nested_loops > 0:
|
||||||
|
self.review_feedback.set_time_complexity(f"Nested Loops Detected: {nested_loops}. Consider optimizing the loops.")
|
||||||
|
else:
|
||||||
|
self.review_feedback.set_time_complexity("No nested loops detected. Time complexity looks reasonable.")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.review_feedback.add_mistake(f"Error in time complexity analysis: {str(e)}")
|
||||||
|
self.review_feedback.set_time_complexity("Unable to analyze time complexity.")
|
||||||
|
|
||||||
|
st.title("Code Review Tool with RAG")
|
||||||
|
st.markdown("Upload your project as a ZIP file or provide a Git repository URL for review:")
|
||||||
|
|
||||||
|
git_url = st.text_input("Enter Git repository URL (leave empty if uploading ZIP file):")
|
||||||
|
uploaded_file = st.file_uploader("Upload ZIP file", type=["zip"])
|
||||||
|
|
||||||
|
if st.button("Review Project"):
|
||||||
|
if git_url:
|
||||||
|
repo_dir = clone_git_repo(git_url)
|
||||||
|
if repo_dir:
|
||||||
|
st.write("**Files Found in Git Repo:**")
|
||||||
|
all_files = []
|
||||||
|
for root, _, files in os.walk(repo_dir):
|
||||||
|
for file in files:
|
||||||
|
file_path = os.path.join(root, file)
|
||||||
|
if is_programming_file(file_path): # Only include programming files
|
||||||
|
all_files.append(file_path)
|
||||||
|
|
||||||
|
for file_path in all_files:
|
||||||
|
file_content = read_file_content(file_path)
|
||||||
|
st.subheader(f"Reviewing {os.path.basename(file_path)}")
|
||||||
|
|
||||||
|
review_tool = CodeReviewTool(file_content)
|
||||||
|
review_results = review_tool.review_code()
|
||||||
|
|
||||||
|
st.write(f"**File: {file_path}**")
|
||||||
|
st.write(f"**Code Quality:** {review_results['Code Quality (%)']}%")
|
||||||
|
st.write(f"**Time Complexity Analysis:** {review_results['Time Complexity']}")
|
||||||
|
st.write(f"**Review Feedback:**\n{review_results['Feedback']}")
|
||||||
|
st.write(f"**Review Time:** {review_results['Review Time (s)']:.2f} seconds")
|
||||||
|
else:
|
||||||
|
st.warning("Failed to clone the repository.")
|
||||||
|
elif uploaded_file:
|
||||||
|
python_files = extract_zip_and_list_files(uploaded_file)
|
||||||
|
programming_files = [file for file in python_files if is_programming_file(file)] # Filter programming files
|
||||||
|
|
||||||
|
if programming_files:
|
||||||
|
st.write("**Programming Files Found in ZIP File:**")
|
||||||
|
for file in programming_files:
|
||||||
|
st.write(file)
|
||||||
|
|
||||||
|
for file_path in programming_files:
|
||||||
|
file_content = read_file_content(file_path)
|
||||||
|
st.subheader(f"Reviewing {os.path.basename(file_path)}")
|
||||||
|
|
||||||
|
review_tool = CodeReviewTool(file_content)
|
||||||
|
review_results = review_tool.review_code()
|
||||||
|
|
||||||
|
st.write(f"**File: {file_path}**")
|
||||||
|
st.write(f"**Code Quality:** {review_results['Code Quality (%)']}%")
|
||||||
|
st.write(f"**Time Complexity Analysis:** {review_results['Time Complexity']}")
|
||||||
|
st.write(f"**Review Feedback:**\n{review_results['Feedback']}")
|
||||||
|
st.write(f"**Review Time:** {review_results['Review Time (s)']:.2f} seconds")
|
||||||
|
else:
|
||||||
|
st.warning("No programming files found in the uploaded ZIP file.")
|
||||||
|
else:
|
||||||
|
st.warning("Please provide a Git repository URL or upload a ZIP file.")
|
Loading…
Reference in a new issue