feat/dbIntigration #13

Merged
Fabel merged 6 commits from feat/dbIntigration into develop 2024-10-10 07:40:25 +00:00
7 changed files with 260 additions and 43 deletions

Binary file not shown.

View File

@ -34,5 +34,6 @@ class VeraMindInference:
return { return {
"result": "FAKE" if is_fake else "REAL", "result": "FAKE" if is_fake else "REAL",
"confidence": float(confidence) "confidence": float(confidence),
"is_fake": is_fake
} }

View File

@ -1,36 +1,188 @@
from collections import deque
import customtkinter as ctk
from views.mainScreen import MainFrame from views.mainScreen import MainFrame
from models.data import TextData from models.data import TextData
from Ai.interence import VeraMindInference from Ai.interence import VeraMindInference
from utils.database.database import FakeNewsChecker
from models.provider import Provider
from collections import Counter
from Ai.llm import ArticleRater
BAD_WORDS = ["FAKE", "SATIRE", "Fake", "fake"]
GOOD_WORDS = ["REAL", "real", "Real"]
BAD_COLOR = "#ff8080"
GOOD_COLOR = "#80ff8f"
WORDS = BAD_WORDS + GOOD_WORDS
class MainFrameController: class MainFrameController:
"""
Controller class for the main frame of the application.
Handles user interactions, data processing, and database operations.
"""
def __init__(self, frame: MainFrame) -> None: def __init__(self, frame: MainFrame) -> None:
"""
Initialize the controller with the main frame and required components.
:param frame: The main frame of the application
"""
self.frame = frame self.frame = frame
self.model_inference = VeraMindInference('VeraMind-Mini') self.model_inference = VeraMindInference('VeraMind-Mini')
self.db = FakeNewsChecker()
self.update_provider_list()
self.rater = ArticleRater()
def get_text_data(self) -> TextData:
"""
Retrieve text data from the UI input fields.
def get_textdata(self) -> TextData: :return: TextData object containing URL and text content
"""
text_data = TextData() text_data = TextData()
text_data.url = self.frame.entry_url.get() text_data.url = self.frame.entry_url.get()
if text_data.text_from_url(): if not text_data.text_from_url():
text_data.text = self.frame.input_textbox.get("0.0", "end") text_data.text = self.frame.input_textbox.get("0.0", "end")
return text_data return text_data
def press_check_button(self): def press_check_button(self):
text_data = self.get_textdata() text_data = self.get_textdata()
print(f"text:{text_data.text}") print(text_data.text)
self.prediction(text_data) self._predict(text_data)
self.frame.output_textbox.configure(state="normal") self.frame.output_textbox.configure(state="normal")
self.frame.output_textbox.delete("0.0", "end") self.frame.output_textbox.delete("0.0", "end")
self.frame.output_textbox.insert("0.0",f"{text_data.get_output()}")
self.frame.output_textbox.configure(state="disabled")
def prediction(self, text_data:TextData) -> TextData: response_stream = self.rater.get_response(text_data.text, text_data.result, float(f"{text_data.confidence * 100:.2f}"))
highlight_buffer = deque(maxlen=5)
for chunk in response_stream:
# Display the chunk immediately
self.frame.output_textbox.insert("end", chunk)
self.frame.output_textbox.see("end")
self.frame.update_idletasks()
# Add to highlight buffer
highlight_buffer.append(chunk)
# Process highlighting when buffer is full
if len(highlight_buffer) == 5:
self.process_highlighting(highlight_buffer)
# Process any remaining chunks in the buffer
if highlight_buffer:
self.process_highlighting(highlight_buffer)
self.frame.output_textbox.configure(state="disabled")
self.update_provider_list()
def process_highlighting(self, highlight_buffer):
start_index = self.frame.output_textbox.index(f"end-{sum(len(c) for c in highlight_buffer)}c")
end_index = self.frame.output_textbox.index("end")
self.highlight_words(start_index, end_index)
# Keep overlap of 2 chunks
highlight_buffer = deque(list(highlight_buffer)[-2:], maxlen=5)
def highlight_words(self, start_index, end_index):
content = self.frame.output_textbox.get(start_index, end_index)
for word in WORDS:
start = 0
while True:
pos = content.find(word, start)
if pos == -1:
break
word_start = f"{start_index}+{pos}c"
word_end = f"{word_start}+{len(word)}c"
tag_name = f"{word.lower()}_color"
self.frame.output_textbox.tag_add(tag_name, word_start, word_end)
if word in BAD_WORDS:
self.frame.output_textbox.tag_config(tag_name, foreground=BAD_COLOR)
elif word in GOOD_WORDS:
self.frame.output_textbox.tag_config(tag_name, foreground=GOOD_COLOR)
start = pos + len(word)
def _predict(self, text_data: TextData) -> TextData:
"""
Make a prediction using the VeraMind model.
:param text_data: TextData object containing the text to analyze
:return: Updated TextData object with prediction results
"""
result = self.model_inference.predict(text_data.text) result = self.model_inference.predict(text_data.text)
text_data.confidence = result["confidence"] text_data.confidence = result["confidence"]
text_data.result = result["result"] text_data.result = result["result"]
print(f"Prediction: {text_data.result}") text_data.is_fake_news = result["is_fake"]
print(f"Confidence: {text_data.confidence}")
return text_data return text_data
def _add_to_db(self, text_data: TextData) -> None:
"""
Add the analyzed data to the database.
:param text_data: TextData object containing the analyzed information
"""
self.db.insert_data(url=text_data.url, anbieter=text_data.get_provider(),is_fake_news= text_data.is_fake_news)
def _fetch_db_data(self):
self.text_data_list = []
data = self.db.fetch_data()
if data:
for row in data:
print(f"ID: {row[0]}, URL: {row[1]}, Anbieter: {row[2]}, Fake News: {'Ja' if row[3] else 'Nein'}")
text_data = TextData(url=row[1], provider=row[2], is_fake_news= row[3])
self.text_data_list.append(text_data)
def sort_provider(self, text_data_list):
# Gruppiere TextData-Objekte nach Provider
provider_groups = {}
for text_data in text_data_list:
if text_data.provider:
if text_data.provider not in provider_groups:
provider_groups[text_data.provider] = []
provider_groups[text_data.provider].append(text_data)
# Zähle die Häufigkeit jedes Providers
provider_counts = Counter(text_data.provider for text_data in text_data_list if text_data.provider)
# Erstelle und sortiere die Provider-Liste
sorted_providers = [
Provider(name, count, provider_groups.get(name, []))
for name, count in sorted(provider_counts.items(), key=lambda x: x[1], reverse=True)
]
return sorted_providers
def update_provider_list(self):
self._fetch_db_data()
# Lösche vorhandene Einträge in der scrollbaren Ansicht
for widget in self.frame.provider_container.winfo_children():
widget.destroy()
# Sortiere und zähle die Provider
sorted_providers = self.sort_provider(self.text_data_list)
# Füge die sortierten Provider zur scrollbaren Ansicht hinzu
for i, provider in enumerate(sorted_providers):
provider_frame = ctk.CTkFrame(self.frame.provider_container)
provider_frame.pack(fill="x", padx=5, pady=2)
name_label = ctk.CTkLabel(provider_frame, text=provider.title)
name_label.pack(side="left", padx=5)
count_label = ctk.CTkLabel(provider_frame, text=str(provider.get_fake_percentage())+"%")
count_label.pack(side="right", padx=5)
def _update_output(self, output: str) -> None:
"""
Update the output text box with the result.
:param output: String containing the output to display
"""
self.frame.output_textbox.configure(state="normal")
self.frame.output_textbox.delete("0.0", "end")
self.frame.output_textbox.insert("0.0", output)
self.frame.output_textbox.configure(state="disabled")

View File

@ -1,10 +1,15 @@
from urllib.parse import urlparse
from typing import Optional
from utils.webTextExtractor import WebTextExtractor from utils.webTextExtractor import WebTextExtractor
class TextData: class TextData:
def __init__(self, url: str = "") -> None: def __init__(self, url: str = "",text: str = "",result: str = "", is_fake_news: bool = False, provider: str = "") -> None:
self.url = url self.url = url
self.text = "" self.text = text
self.result = "" self.result = result
self.is_fake_news = is_fake_news
self.provider = provider
self.confidence = None self.confidence = None
self._extractor = None self._extractor = None
@ -32,3 +37,33 @@ class TextData:
output = f"Prediction: {self.result}" + f" Confidence: {self.confidence:.4f}" output = f"Prediction: {self.result}" + f" Confidence: {self.confidence:.4f}"
print(output) print(output)
return output return output
def get_provider(self)-> str:
self.extract_provider()
return self.provider
def extract_provider(self):
"""
Extract the domain (anbieter) from a given URL.
:param url: The URL to process
:return: The extracted domain or None if the URL is invalid
"""
if not self._is_valid_url(self.url):
self.provider = "None"
parsed_url = urlparse(self.url)
domain_parts = parsed_url.netloc.split('.')
self.provider = f"{domain_parts[-2]}.{domain_parts[-1]}" if len(domain_parts) >= 2 else "None"
def _is_valid_url(self, url: str) -> bool:
"""
Check if a given URL is valid.
:param url: The URL to validate
:return: True if the URL is valid, False otherwise
"""
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except ValueError:
return False

24
src/models/provider.py Normal file
View File

@ -0,0 +1,24 @@
class Provider():
def __init__(self, title: str, count: int, text_data_list) -> None:
self.title = title
self.count = count
self.text_data_list = text_data_list
def get_fake_percentage(self) -> float:
count_all = 0
count_fake = 0
for text_data in self.text_data_list:
#print(text_data.provider)
#print("FAKE" if text_data.is_fake_news else "REAL")
count_all += 1
if text_data.is_fake_news:
count_fake += 1
if count_all == 0:
return 0.0
return (count_fake / count_all) * 100

View File

@ -1,4 +1,4 @@
import sqlite3 import duckdb
class FakeNewsChecker: class FakeNewsChecker:
def __init__(self, db_name='fake_news_checker.db'): def __init__(self, db_name='fake_news_checker.db'):
@ -6,47 +6,48 @@ class FakeNewsChecker:
self.create_table() self.create_table()
def create_connection(self): def create_connection(self):
return sqlite3.connect(self.db_name) return duckdb.connect(self.db_name)
def create_table(self): def create_table(self):
conn = self.create_connection() conn = self.create_connection()
cursor = conn.cursor() conn.execute('''
cursor.execute('''
CREATE TABLE IF NOT EXISTS url_info ( CREATE TABLE IF NOT EXISTS url_info (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY,
url TEXT NOT NULL, url VARCHAR NOT NULL,
anbieter TEXT NOT NULL, anbieter VARCHAR NOT NULL,
is_fake_news BOOLEAN NOT NULL is_fake_news BOOLEAN NOT NULL
) )
''') ''')
conn.commit()
conn.close() conn.close()
def get_next_id(self):
conn = self.create_connection()
result = conn.execute('SELECT COALESCE(MAX(id), 0) + 1 FROM url_info').fetchone()
conn.close()
return result[0]
def insert_data(self, url, anbieter, is_fake_news): def insert_data(self, url, anbieter, is_fake_news):
conn = self.create_connection() conn = self.create_connection()
cursor = conn.cursor() next_id = self.get_next_id()
cursor.execute(''' conn.execute('''
INSERT INTO url_info (url, anbieter, is_fake_news) INSERT INTO url_info (id, url, anbieter, is_fake_news)
VALUES (?, ?, ?) VALUES (?, ?, ?, ?)
''', (url, anbieter, is_fake_news)) ''', [next_id, url, anbieter, bool(is_fake_news)])
conn.commit()
conn.close() conn.close()
def fetch_data(self): def fetch_data(self):
conn = self.create_connection() conn = self.create_connection()
cursor = conn.cursor() result = conn.execute('SELECT * FROM url_info').fetchall()
cursor.execute('SELECT * FROM url_info')
rows = cursor.fetchall()
conn.close() conn.close()
return rows return result
# Beispielnutzung der Klasse # Beispielnutzung der Klasse
if __name__ == '__main__': if __name__ == '__main__':
checker = FakeNewsChecker() checker = FakeNewsChecker()
# Daten hinzufügen # Daten hinzufügen
checker.insert_data('https://example.com/news/123', 'Example News', 0) checker.insert_data('https://example.com/news/123', 'Example News', False)
checker.insert_data('https://fakenews.com/article/456', 'Fake News', 1) checker.insert_data('https://fakenews.com/article/456', 'Fake News', True)
# Daten abrufen # Daten abrufen
data = checker.fetch_data() data = checker.fetch_data()

View File

@ -38,6 +38,10 @@ class MainFrame(ctk.CTkFrame):
self.header = ctk.CTkLabel(self.scrollview, text="Leaderboard", font=("Arial", 24, "bold")) self.header = ctk.CTkLabel(self.scrollview, text="Leaderboard", font=("Arial", 24, "bold"))
self.header.pack(pady=10, padx=10, anchor="w") self.header.pack(pady=10, padx=10, anchor="w")
# Container für Provider-Einträge
self.provider_container = ctk.CTkFrame(self.scrollview)
self.provider_container.pack(fill="both", expand=True)
def set_controller(self, controller): def set_controller(self, controller):
self.controller = controller self.controller = controller