mpenn/scripts/Converter.py

333 lines
12 KiB
Python

import customtkinter as Ctk
from tkinter import filedialog
import scripts.get_sys_info as system_code
import cv2
import os
import threading
from pathlib import Path
EPSILON = 1e-9 # choose an appropriate epsilon value
class SharedCounter:
def __init__(self):
self._value = 0
self.lock = threading.Lock()
@property
def value(self):
with self.lock:
return self._value
@value.setter
def value(self, new_value):
with self.lock:
self._value = new_value
class WorkerThread(threading.Thread):
def __init__(self, thread_id, thread_count, last_frame, delay, shared_counter, is_running, video_path, output_path, img_name):
super(WorkerThread, self).__init__()
self.thread_id = thread_id
self.thread_count = thread_count
self.last_frame = last_frame
self.delay = delay
self.shared_counter = shared_counter
self.is_running = is_running
self.video_path = video_path
self.output_path = output_path
self.img_name = img_name
if len(self.img_name) == 0:
self.img_name = "Img_seq"
def run(self):
video = cv2.VideoCapture(self.video_path)
if not video.isOpened():
return
for i in range(self.thread_id, self.last_frame, self.thread_count):
video.set(cv2.CAP_PROP_POS_FRAMES, i) # Seek to the correct frame.
ret, frame = video.read()
# Check if the frame was read successfully
if not ret:
break
# Save the frame as an image
cv2.imwrite(f'{self.output_path}/{self.img_name}_{i+1}.png', frame)
if not self.is_running.value:
break
with self.shared_counter.lock: # Safely increment the shared counter
self.shared_counter._value += 1
# Release the video file
video.release()
class Converter(Ctk.CTkFrame):
def __init__(self,master, **kwargs):
super().__init__(master, **kwargs)
# get the threads counts:
# create objects for the classes
self.shared_counter = SharedCounter()
self.is_running = SharedCounter()
#system_code.load_json_file()
self.thread_count = system_code.used_threads
self.continue_offset = 0
self.my_font = Ctk.CTkFont(family="Berlin Sans FB", size=22)
self.font_entry = Ctk.CTkFont(family="Berlin Sans FB", size=18)
stop_btn_txt = ("Stop Convert", "Continue")
self.test_var = 0
self.input_path = None
self.output_path = None
self.img_name = None
self.total_frames = None
self.step_size = None
# the layout
# input layout
self.input_entry = Ctk.CTkEntry(self, placeholder_text="input path", width=350, font=self.my_font)
self.input_btn = Ctk.CTkButton(self, text="Browse Input", width=100, command=self.get_video_path, font=self.my_font)
# output layout
self.output_entry = Ctk.CTkEntry(self, placeholder_text="output path", width=350, font=self.my_font)
self.output_btn = Ctk.CTkButton(self, text="Browse Output", width=100, command=self.get_folder_path, font=self.my_font)
# button row + img name selection
self.img_naming = Ctk.CTkEntry(self, placeholder_text="Img_seq", width=150, font=self.my_font)
# start Button
self.start_btn = Ctk.CTkButton(self, text="Start Convert", width=100, command=self.start_threads, font=self.my_font)
# stop Button
self.stop_btn = Ctk.CTkButton(self, text=stop_btn_txt[0], width=50, command=self.stop_continue_threads, font=self.my_font)
self.stop_btn.configure(text='Stop', state=Ctk.DISABLED)
self.show_progress()
self.progress_bar.set(0)
self.align()
def enable_keybinding(self):
self.master.bind("<Return>", self.on_a_press, add="+")
def disable_keybinding(self):
self.master.unbind("<Return>")
def on_a_press(self, event):
# Check if the frame is visible by querying its manager info
if self.winfo_manager():
self.start_threads()
def get_frame_rate_and_last_frame(self, video_path):
"""
Determines the frame rate of a video file and returns the index of the last frame using OpenCV.
:param video_path: The path to the video file.
:return: A tuple containing the frame rate and the index of the last frame of the video.
"""
# Open the video file using OpenCV's VideoCapture class
video = cv2.VideoCapture(video_path)
# Check if the video was opened successfully
if not video.isOpened():
raise ValueError(f"Failed to open video file at path: {video_path}")
# Get the total number of frames and the frame rate of the video
total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
frame_rate = video.get(cv2.CAP_PROP_FPS)
# Check if the total number of frames is positive
if total_frames <= 0:
raise ValueError(f"Invalid total number of frames: {total_frames}")
# Check if the frame rate is positive
if frame_rate <= 0:
raise ValueError(f"Invalid frame rate: {frame_rate}")
# Index of the last frame
last_frame_index = total_frames - 1
# Release the video file
video.release()
return last_frame_index
def show_progress(self):
# Progress
# progressbar
self.progress_bar = Ctk.CTkProgressBar(self,orientation="horizontal", width=500, height=30)
# progressinfo
self.progress_info = Ctk.CTkLabel(self, text="", width=1, font=self.my_font)
# button row
self.progress_bar.place(
relx=0.5,
rely=0.6,
anchor="center",
relwidth=0.8,
relheight=0.03
) # Position the converter button to the left
self.progress_info.place(
relx=0.5,
rely=0.7,
anchor="center",
relwidth=0.35,
relheight=0.06
) # Position the converter button to the left
def update_progress(self, total_frames, last_counter_value):
current_counter_value = self.shared_counter.value
if current_counter_value == total_frames:
# Disable the stop button and display "Finished! :)"
self.progress_bar.set(1)
self.stop_btn.configure(state="disabled")
self.start_btn.configure(state="enabled")
self.input_btn.configure(state="enabled")
self.output_btn.configure(state="enabled")
self.progress_info.configure(text="Finished! :)")
self.is_running.value = False
self.stop_threads()
elif current_counter_value > last_counter_value:
jump = current_counter_value / self.total_frames
self.progress_bar.set(jump)
self.progress_info.configure(text=f"{int(self.progress_bar.get()*100)} %")
self.after(1, lambda: self.update_progress(total_frames, current_counter_value))
else:
# If the counter value did not change, schedule the next update
self.after(1, lambda: self.update_progress(total_frames, current_counter_value))
def get_video_path(self):
video_path = filedialog.askopenfilename(filetypes=[('Video files', '*.mp4 *.avi *.mov')])
self.input_entry.delete(0, Ctk.END) # Delete any existing text in the Entry widget
self.input_entry.insert(0, video_path) # Insert the new text
def get_folder_path(self):
file_path = filedialog.askdirectory()
self.output_entry.delete(0, Ctk.END) # Delete any existing text in the Entry widget
self.output_entry.insert(0, file_path) # Insert the new text
def start_threads(self):
self.input_path = self.input_entry.get()
self.output_path = self.output_entry.get()
input_extension = Path(self.input_path).suffix.lower()
if input_extension not in ['.mp4', '.avi', '.mkv', '.mov']:
self.progress_info.configure(text="Source Video could not be found")
return
if not os.path.exists(self.output_path):
self.progress_info.configure(text="Output Folder could not be found!")
return
self.input_btn.configure(state="disabled")
self.output_btn.configure(state="disabled")
self.start_btn.configure(state="disabled")
self.img_name = self.img_naming.get()
self.total_frames = self.get_frame_rate_and_last_frame(self.input_path)
self.step_size = 1 / self.total_frames * 100 /2
self.progress_bar.configure(determinate_speed=self.step_size)
self.progress_bar.set(0)
self.is_running.value = True
self.continue_offset = 0 # Reset the continue offset to start from the beginning
self.shared_counter.value = 0 # Reset the counter
self.threads = []
for i in range(0,self.thread_count):
thread = WorkerThread(i, self.thread_count, self.total_frames, 100, self.shared_counter, self.is_running, self.input_path, self.output_path, self.img_name)
self.threads.append(thread)
thread.start()
self.update_progress(self.total_frames, 0)
self.start_btn.configure(state=Ctk.DISABLED)
self.stop_btn.configure(text='Stop', state=Ctk.NORMAL)
def stop_threads(self):
self.is_running.value = False
for thread in self.threads:
thread.join()
self.continue_offset = self.shared_counter.value # Save the current counter value for continue
if self.shared_counter.value != self.total_frames:
self.stop_btn.configure(text='Continue', state=Ctk.NORMAL)
# Close all OpenCV windows
cv2.destroyAllWindows()
def stop_continue_threads(self):
self.start_btn.configure(state=Ctk.NORMAL)
self.input_btn.configure(state=Ctk.NORMAL)
self.output_btn.configure(state=Ctk.NORMAL)
if self.is_running.value:
self.stop_threads()
else:
self.continue_threads()
def continue_threads(self):
self.is_running.value = True
self.threads = []
for i in range(self.thread_count):
thread = WorkerThread(i, self.thread_count, self.total_frames, 100, self.shared_counter, self.is_running, self.input_path, self.output_path, self.img_name)
self.threads.append(thread)
thread.start()
self.after(100, self.update_progress(self.total_frames, 0))
self.stop_btn.config(text='Stop', state="normal")
def calculate_step_size(self, frame_count):
"""This function calculates the the step size for the
progressbar. The Progressbar does 0.02 steps normally."""
percent = frame_count / 100 # find out how many frames are 1%
percent = 1 / percent # find out how many % one frame is
percent = percent * 0.5 # half that, because tkinter makes 0.02 with each step
return percent
def align(self):
# output placing
self.input_entry.place(
relx=0.10,
rely=0.3,
anchor="w",
relwidth=0.5,
relheight=0.06
) # Position the converter button to the left
self.input_btn.place(
relx=0.90,
rely=0.3,
anchor="e",
relwidth=0.2,
relheight=0.06
) # Position the converter button to the left
# output placing
self.output_entry.place(
relx=0.10,
rely=0.4,
anchor="w",
relwidth=0.5,
relheight=0.06
) # Position the converter button to the left
self.output_btn.place(
relx=0.90,
rely=0.4,
anchor="e",
relwidth=0.2,
relheight=0.06
) # Position the converter button to the left
self.img_naming.place(
relx=0.1,
rely=0.5,
anchor="w",
relwidth=0.2,
relheight=0.06
) # Position the converter button to the left
# button row
self.start_btn.place(
relx=0.325,
rely=0.5,
anchor="w",
relwidth=0.15,
relheight=0.06
) # Position the converter button to the left
self.stop_btn.place(
relx=0.6,
rely=0.5,
anchor="e",
relwidth=0.1,
relheight=0.06
) # Position the converter button to the left