feat/dbIntigration #13
Binary file not shown.
|
@ -34,5 +34,6 @@ class VeraMindInference:
|
|||
|
||||
return {
|
||||
"result": "FAKE" if is_fake else "REAL",
|
||||
"confidence": float(confidence)
|
||||
"confidence": float(confidence),
|
||||
"is_fake": is_fake
|
||||
}
|
|
@ -1,36 +1,188 @@
|
|||
from collections import deque
|
||||
import customtkinter as ctk
|
||||
from views.mainScreen import MainFrame
|
||||
from models.data import TextData
|
||||
from Ai.interence import VeraMindInference
|
||||
from utils.database.database import FakeNewsChecker
|
||||
from models.provider import Provider
|
||||
from collections import Counter
|
||||
from Ai.llm import ArticleRater
|
||||
|
||||
BAD_WORDS = ["FAKE", "SATIRE", "Fake", "fake"]
|
||||
GOOD_WORDS = ["REAL", "real", "Real"]
|
||||
BAD_COLOR = "#ff8080"
|
||||
GOOD_COLOR = "#80ff8f"
|
||||
WORDS = BAD_WORDS + GOOD_WORDS
|
||||
|
||||
|
||||
class MainFrameController:
|
||||
"""
|
||||
Controller class for the main frame of the application.
|
||||
Handles user interactions, data processing, and database operations.
|
||||
"""
|
||||
|
||||
def __init__(self, frame: MainFrame) -> None:
|
||||
"""
|
||||
Initialize the controller with the main frame and required components.
|
||||
|
||||
:param frame: The main frame of the application
|
||||
"""
|
||||
self.frame = frame
|
||||
self.model_inference = VeraMindInference('VeraMind-Mini')
|
||||
self.db = FakeNewsChecker()
|
||||
self.update_provider_list()
|
||||
self.rater = ArticleRater()
|
||||
|
||||
def get_text_data(self) -> TextData:
|
||||
"""
|
||||
Retrieve text data from the UI input fields.
|
||||
|
||||
def get_textdata(self) -> TextData:
|
||||
:return: TextData object containing URL and text content
|
||||
"""
|
||||
text_data = TextData()
|
||||
text_data.url = self.frame.entry_url.get()
|
||||
if text_data.text_from_url():
|
||||
if not text_data.text_from_url():
|
||||
text_data.text = self.frame.input_textbox.get("0.0", "end")
|
||||
|
||||
return text_data
|
||||
|
||||
def press_check_button(self):
|
||||
text_data = self.get_textdata()
|
||||
print(f"text:{text_data.text}")
|
||||
self.prediction(text_data)
|
||||
print(text_data.text)
|
||||
self._predict(text_data)
|
||||
self.frame.output_textbox.configure(state="normal")
|
||||
self.frame.output_textbox.delete("0.0", "end")
|
||||
self.frame.output_textbox.insert("0.0",f"{text_data.get_output()}")
|
||||
self.frame.output_textbox.configure(state="disabled")
|
||||
|
||||
def prediction(self, text_data:TextData) -> TextData:
|
||||
response_stream = self.rater.get_response(text_data.text, text_data.result, float(f"{text_data.confidence * 100:.2f}"))
|
||||
|
||||
highlight_buffer = deque(maxlen=5)
|
||||
|
||||
for chunk in response_stream:
|
||||
# Display the chunk immediately
|
||||
self.frame.output_textbox.insert("end", chunk)
|
||||
self.frame.output_textbox.see("end")
|
||||
self.frame.update_idletasks()
|
||||
|
||||
# Add to highlight buffer
|
||||
highlight_buffer.append(chunk)
|
||||
|
||||
# Process highlighting when buffer is full
|
||||
if len(highlight_buffer) == 5:
|
||||
self.process_highlighting(highlight_buffer)
|
||||
|
||||
# Process any remaining chunks in the buffer
|
||||
if highlight_buffer:
|
||||
self.process_highlighting(highlight_buffer)
|
||||
|
||||
self.frame.output_textbox.configure(state="disabled")
|
||||
self.update_provider_list()
|
||||
|
||||
def process_highlighting(self, highlight_buffer):
|
||||
start_index = self.frame.output_textbox.index(f"end-{sum(len(c) for c in highlight_buffer)}c")
|
||||
end_index = self.frame.output_textbox.index("end")
|
||||
self.highlight_words(start_index, end_index)
|
||||
|
||||
# Keep overlap of 2 chunks
|
||||
highlight_buffer = deque(list(highlight_buffer)[-2:], maxlen=5)
|
||||
|
||||
def highlight_words(self, start_index, end_index):
|
||||
content = self.frame.output_textbox.get(start_index, end_index)
|
||||
|
||||
for word in WORDS:
|
||||
start = 0
|
||||
while True:
|
||||
pos = content.find(word, start)
|
||||
if pos == -1:
|
||||
break
|
||||
word_start = f"{start_index}+{pos}c"
|
||||
word_end = f"{word_start}+{len(word)}c"
|
||||
tag_name = f"{word.lower()}_color"
|
||||
self.frame.output_textbox.tag_add(tag_name, word_start, word_end)
|
||||
if word in BAD_WORDS:
|
||||
self.frame.output_textbox.tag_config(tag_name, foreground=BAD_COLOR)
|
||||
elif word in GOOD_WORDS:
|
||||
self.frame.output_textbox.tag_config(tag_name, foreground=GOOD_COLOR)
|
||||
start = pos + len(word)
|
||||
|
||||
def _predict(self, text_data: TextData) -> TextData:
|
||||
"""
|
||||
Make a prediction using the VeraMind model.
|
||||
|
||||
:param text_data: TextData object containing the text to analyze
|
||||
:return: Updated TextData object with prediction results
|
||||
"""
|
||||
result = self.model_inference.predict(text_data.text)
|
||||
text_data.confidence = result["confidence"]
|
||||
text_data.result = result["result"]
|
||||
print(f"Prediction: {text_data.result}")
|
||||
print(f"Confidence: {text_data.confidence}")
|
||||
text_data.is_fake_news = result["is_fake"]
|
||||
return text_data
|
||||
|
||||
def _add_to_db(self, text_data: TextData) -> None:
|
||||
"""
|
||||
Add the analyzed data to the database.
|
||||
|
||||
:param text_data: TextData object containing the analyzed information
|
||||
"""
|
||||
|
||||
self.db.insert_data(url=text_data.url, anbieter=text_data.get_provider(),is_fake_news= text_data.is_fake_news)
|
||||
|
||||
def _fetch_db_data(self):
|
||||
self.text_data_list = []
|
||||
data = self.db.fetch_data()
|
||||
if data:
|
||||
for row in data:
|
||||
print(f"ID: {row[0]}, URL: {row[1]}, Anbieter: {row[2]}, Fake News: {'Ja' if row[3] else 'Nein'}")
|
||||
text_data = TextData(url=row[1], provider=row[2], is_fake_news= row[3])
|
||||
self.text_data_list.append(text_data)
|
||||
|
||||
def sort_provider(self, text_data_list):
|
||||
# Gruppiere TextData-Objekte nach Provider
|
||||
provider_groups = {}
|
||||
for text_data in text_data_list:
|
||||
if text_data.provider:
|
||||
if text_data.provider not in provider_groups:
|
||||
provider_groups[text_data.provider] = []
|
||||
provider_groups[text_data.provider].append(text_data)
|
||||
|
||||
# Zähle die Häufigkeit jedes Providers
|
||||
provider_counts = Counter(text_data.provider for text_data in text_data_list if text_data.provider)
|
||||
|
||||
# Erstelle und sortiere die Provider-Liste
|
||||
sorted_providers = [
|
||||
Provider(name, count, provider_groups.get(name, []))
|
||||
for name, count in sorted(provider_counts.items(), key=lambda x: x[1], reverse=True)
|
||||
]
|
||||
|
||||
return sorted_providers
|
||||
|
||||
def update_provider_list(self):
|
||||
self._fetch_db_data()
|
||||
# Lösche vorhandene Einträge in der scrollbaren Ansicht
|
||||
for widget in self.frame.provider_container.winfo_children():
|
||||
widget.destroy()
|
||||
|
||||
# Sortiere und zähle die Provider
|
||||
sorted_providers = self.sort_provider(self.text_data_list)
|
||||
|
||||
# Füge die sortierten Provider zur scrollbaren Ansicht hinzu
|
||||
for i, provider in enumerate(sorted_providers):
|
||||
provider_frame = ctk.CTkFrame(self.frame.provider_container)
|
||||
provider_frame.pack(fill="x", padx=5, pady=2)
|
||||
|
||||
name_label = ctk.CTkLabel(provider_frame, text=provider.title)
|
||||
name_label.pack(side="left", padx=5)
|
||||
|
||||
count_label = ctk.CTkLabel(provider_frame, text=str(provider.get_fake_percentage())+"%")
|
||||
count_label.pack(side="right", padx=5)
|
||||
|
||||
def _update_output(self, output: str) -> None:
|
||||
"""
|
||||
Update the output text box with the result.
|
||||
|
||||
:param output: String containing the output to display
|
||||
"""
|
||||
self.frame.output_textbox.configure(state="normal")
|
||||
self.frame.output_textbox.delete("0.0", "end")
|
||||
self.frame.output_textbox.insert("0.0", output)
|
||||
self.frame.output_textbox.configure(state="disabled")
|
||||
|
||||
|
|
@ -1,10 +1,15 @@
|
|||
from urllib.parse import urlparse
|
||||
from typing import Optional
|
||||
|
||||
from utils.webTextExtractor import WebTextExtractor
|
||||
|
||||
class TextData:
|
||||
def __init__(self, url: str = "") -> None:
|
||||
def __init__(self, url: str = "",text: str = "",result: str = "", is_fake_news: bool = False, provider: str = "") -> None:
|
||||
self.url = url
|
||||
self.text = ""
|
||||
self.result = ""
|
||||
self.text = text
|
||||
self.result = result
|
||||
self.is_fake_news = is_fake_news
|
||||
self.provider = provider
|
||||
self.confidence = None
|
||||
self._extractor = None
|
||||
|
||||
|
@ -32,3 +37,33 @@ class TextData:
|
|||
output = f"Prediction: {self.result}" + f" Confidence: {self.confidence:.4f}"
|
||||
print(output)
|
||||
return output
|
||||
|
||||
def get_provider(self)-> str:
|
||||
self.extract_provider()
|
||||
return self.provider
|
||||
|
||||
def extract_provider(self):
|
||||
"""
|
||||
Extract the domain (anbieter) from a given URL.
|
||||
|
||||
:param url: The URL to process
|
||||
:return: The extracted domain or None if the URL is invalid
|
||||
"""
|
||||
if not self._is_valid_url(self.url):
|
||||
self.provider = "None"
|
||||
parsed_url = urlparse(self.url)
|
||||
domain_parts = parsed_url.netloc.split('.')
|
||||
self.provider = f"{domain_parts[-2]}.{domain_parts[-1]}" if len(domain_parts) >= 2 else "None"
|
||||
|
||||
def _is_valid_url(self, url: str) -> bool:
|
||||
"""
|
||||
Check if a given URL is valid.
|
||||
|
||||
:param url: The URL to validate
|
||||
:return: True if the URL is valid, False otherwise
|
||||
"""
|
||||
try:
|
||||
result = urlparse(url)
|
||||
return all([result.scheme, result.netloc])
|
||||
except ValueError:
|
||||
return False
|
|
@ -0,0 +1,24 @@
|
|||
class Provider():
|
||||
|
||||
def __init__(self, title: str, count: int, text_data_list) -> None:
|
||||
self.title = title
|
||||
self.count = count
|
||||
self.text_data_list = text_data_list
|
||||
|
||||
def get_fake_percentage(self) -> float:
|
||||
|
||||
count_all = 0
|
||||
count_fake = 0
|
||||
for text_data in self.text_data_list:
|
||||
#print(text_data.provider)
|
||||
#print("FAKE" if text_data.is_fake_news else "REAL")
|
||||
count_all += 1
|
||||
if text_data.is_fake_news:
|
||||
count_fake += 1
|
||||
|
||||
if count_all == 0:
|
||||
return 0.0
|
||||
|
||||
return (count_fake / count_all) * 100
|
||||
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
import sqlite3
|
||||
import duckdb
|
||||
|
||||
class FakeNewsChecker:
|
||||
def __init__(self, db_name='fake_news_checker.db'):
|
||||
|
@ -6,47 +6,48 @@ class FakeNewsChecker:
|
|||
self.create_table()
|
||||
|
||||
def create_connection(self):
|
||||
return sqlite3.connect(self.db_name)
|
||||
return duckdb.connect(self.db_name)
|
||||
|
||||
def create_table(self):
|
||||
conn = self.create_connection()
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('''
|
||||
conn.execute('''
|
||||
CREATE TABLE IF NOT EXISTS url_info (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
url TEXT NOT NULL,
|
||||
anbieter TEXT NOT NULL,
|
||||
id INTEGER PRIMARY KEY,
|
||||
url VARCHAR NOT NULL,
|
||||
anbieter VARCHAR NOT NULL,
|
||||
is_fake_news BOOLEAN NOT NULL
|
||||
)
|
||||
''')
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
def get_next_id(self):
|
||||
conn = self.create_connection()
|
||||
result = conn.execute('SELECT COALESCE(MAX(id), 0) + 1 FROM url_info').fetchone()
|
||||
conn.close()
|
||||
return result[0]
|
||||
|
||||
def insert_data(self, url, anbieter, is_fake_news):
|
||||
conn = self.create_connection()
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('''
|
||||
INSERT INTO url_info (url, anbieter, is_fake_news)
|
||||
VALUES (?, ?, ?)
|
||||
''', (url, anbieter, is_fake_news))
|
||||
conn.commit()
|
||||
next_id = self.get_next_id()
|
||||
conn.execute('''
|
||||
INSERT INTO url_info (id, url, anbieter, is_fake_news)
|
||||
VALUES (?, ?, ?, ?)
|
||||
''', [next_id, url, anbieter, bool(is_fake_news)])
|
||||
conn.close()
|
||||
|
||||
def fetch_data(self):
|
||||
conn = self.create_connection()
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('SELECT * FROM url_info')
|
||||
rows = cursor.fetchall()
|
||||
result = conn.execute('SELECT * FROM url_info').fetchall()
|
||||
conn.close()
|
||||
return rows
|
||||
return result
|
||||
|
||||
# Beispielnutzung der Klasse
|
||||
if __name__ == '__main__':
|
||||
checker = FakeNewsChecker()
|
||||
|
||||
# Daten hinzufügen
|
||||
checker.insert_data('https://example.com/news/123', 'Example News', 0)
|
||||
checker.insert_data('https://fakenews.com/article/456', 'Fake News', 1)
|
||||
checker.insert_data('https://example.com/news/123', 'Example News', False)
|
||||
checker.insert_data('https://fakenews.com/article/456', 'Fake News', True)
|
||||
|
||||
# Daten abrufen
|
||||
data = checker.fetch_data()
|
||||
|
|
|
@ -38,6 +38,10 @@ class MainFrame(ctk.CTkFrame):
|
|||
self.header = ctk.CTkLabel(self.scrollview, text="Leaderboard", font=("Arial", 24, "bold"))
|
||||
self.header.pack(pady=10, padx=10, anchor="w")
|
||||
|
||||
# Container für Provider-Einträge
|
||||
self.provider_container = ctk.CTkFrame(self.scrollview)
|
||||
self.provider_container.pack(fill="both", expand=True)
|
||||
|
||||
def set_controller(self, controller):
|
||||
self.controller = controller
|
||||
|
||||
|
|
Loading…
Reference in New Issue