|
|
|
|
import os
|
|
|
|
|
import cv2
|
|
|
|
|
import hashlib
|
|
|
|
|
import requests
|
|
|
|
|
import imagehash
|
|
|
|
|
import numpy as np
|
|
|
|
|
from PIL import Image
|
|
|
|
|
from moviepy.editor import VideoFileClip
|
|
|
|
|
|
|
|
|
|
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36"}
|
|
|
|
|
proxies={"http": "http://yehyuxsl-rotate:4tl5bvrwkz5e@p.webshare.io:80/","https": "http://yehyuxsl-rotate:4tl5bvrwkz5e@p.webshare.io:80/"}
|
|
|
|
|
|
|
|
|
|
def get_file_extension(url):
|
|
|
|
|
response = requests.head(url)
|
|
|
|
|
if response.status_code != 200:
|
|
|
|
|
print(f"Failed to access media {url}")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
content_type = response.headers.get('Content-Type', '')
|
|
|
|
|
if 'image' in content_type:
|
|
|
|
|
return '.jpg'
|
|
|
|
|
elif 'video' in content_type:
|
|
|
|
|
return '.mp4'
|
|
|
|
|
else:
|
|
|
|
|
print(f"Unknown content type for media {url}")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def generate_phash(image_path):
|
|
|
|
|
try:
|
|
|
|
|
image = Image.open(image_path)
|
|
|
|
|
return str(imagehash.phash(image))
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"Error generating phash for {image_path}: {e}")
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
def clean_empty_folders(path):
|
|
|
|
|
for root, dirs, fs in os.walk(path):
|
|
|
|
|
for d in dirs:
|
|
|
|
|
clean_empty_folders(os.path.join(root, d))
|
|
|
|
|
if not os.listdir(root):
|
|
|
|
|
os.rmdir(root)
|
|
|
|
|
|
|
|
|
|
def get_files(directory):
|
|
|
|
|
files = []
|
|
|
|
|
for root, dirs, filenames in os.walk(directory):
|
|
|
|
|
for filename in filenames:
|
|
|
|
|
if filename.startswith('.'):
|
|
|
|
|
continue
|
|
|
|
|
files.append(os.path.join(root, filename))
|
|
|
|
|
return files
|
|
|
|
|
|
|
|
|
|
def compare_images(image_path1, image_path2):
|
|
|
|
|
# Load the images in grayscale
|
|
|
|
|
img1 = cv2.imread(image_path1, cv2.IMREAD_GRAYSCALE)
|
|
|
|
|
img2 = cv2.imread(image_path2, cv2.IMREAD_GRAYSCALE)
|
|
|
|
|
|
|
|
|
|
if img1 is None or img2 is None:
|
|
|
|
|
print("Error loading images!")
|
|
|
|
|
return False # Or you could raise an exception
|
|
|
|
|
|
|
|
|
|
# Initialize SIFT detector
|
|
|
|
|
sift = cv2.SIFT_create()
|
|
|
|
|
|
|
|
|
|
# Find keypoints and descriptors with SIFT
|
|
|
|
|
kp1, des1 = sift.detectAndCompute(img1, None)
|
|
|
|
|
kp2, des2 = sift.detectAndCompute(img2, None)
|
|
|
|
|
|
|
|
|
|
# Check if descriptors are None
|
|
|
|
|
if des1 is None or des2 is None:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
# FLANN parameters
|
|
|
|
|
index_params = dict(algorithm=1, trees=5)
|
|
|
|
|
search_params = dict(checks=50)
|
|
|
|
|
|
|
|
|
|
# FLANN based matcher
|
|
|
|
|
flann = cv2.FlannBasedMatcher(index_params, search_params)
|
|
|
|
|
|
|
|
|
|
# Matching descriptor vectors using KNN algorithm
|
|
|
|
|
matches = flann.knnMatch(des1, des2, k=2)
|
|
|
|
|
|
|
|
|
|
# Apply ratio test
|
|
|
|
|
good = []
|
|
|
|
|
for m, n in matches:
|
|
|
|
|
if m.distance < 0.6 * n.distance: # More stringent ratio
|
|
|
|
|
good.append(m)
|
|
|
|
|
|
|
|
|
|
# Minimum number of matches
|
|
|
|
|
MIN_MATCH_COUNT = 15 # Adjust this threshold as needed
|
|
|
|
|
|
|
|
|
|
if len(good) > MIN_MATCH_COUNT:
|
|
|
|
|
# Extract location of good matches
|
|
|
|
|
src_pts = np.float32([kp1[m.queryIdx].pt for m in good]).reshape(-1, 1, 2)
|
|
|
|
|
dst_pts = np.float32([kp2[m.trainIdx].pt for m in good]).reshape(-1, 1, 2)
|
|
|
|
|
|
|
|
|
|
# Find homography
|
|
|
|
|
M, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 5.0)
|
|
|
|
|
matchesMask = mask.ravel().tolist()
|
|
|
|
|
|
|
|
|
|
if np.sum(matchesMask) > 10: # Check if enough points agree on homography
|
|
|
|
|
return True
|
|
|
|
|
else:
|
|
|
|
|
return False
|
|
|
|
|
else:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
def download_file(url, filePath):
|
|
|
|
|
try:
|
|
|
|
|
if os.path.exists(filePath):
|
|
|
|
|
print(f"File already exists: {filePath}")
|
|
|
|
|
return filePath
|
|
|
|
|
|
|
|
|
|
if not url:
|
|
|
|
|
print(f"Invalid URL: {url}")
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
response = requests.get(url, stream=True, headers=headers)
|
|
|
|
|
|
|
|
|
|
if response.status_code != 200:
|
|
|
|
|
print(f"Failed to download {url}. Status code: {response.status_code}")
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
os.makedirs(os.path.dirname(filePath), exist_ok=True)
|
|
|
|
|
|
|
|
|
|
with open(filePath, "wb") as out_file:
|
|
|
|
|
for chunk in response.iter_content(chunk_size=8192):
|
|
|
|
|
out_file.write(chunk)
|
|
|
|
|
|
|
|
|
|
return filePath
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"Failed to download {url}. Error: {e}")
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
def get_media_type(filename):
|
|
|
|
|
image_extensions = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".tiff", ".tif", ".svg", ".eps", ".raw", ".cr2", ".nef", ".orf", ".sr2", ".heic", ".indd", ".ai", ".psd", ".svg"}
|
|
|
|
|
video_extensions = {".mp4", ".mov", ".avi", ".mkv", ".wmv", ".flv", ".webm", ".vob", ".ogg", ".ts", ".flv"}
|
|
|
|
|
filetype_dict = {"image": image_extensions, "video": video_extensions}
|
|
|
|
|
|
|
|
|
|
extension = os.path.splitext(filename.lower())[1] # Get the extension and convert to lower case
|
|
|
|
|
|
|
|
|
|
for filetype, extensions in filetype_dict.items():
|
|
|
|
|
if extension in extensions:
|
|
|
|
|
return filetype
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def get_video_duration(file_path):
|
|
|
|
|
if not os.path.exists(file_path):
|
|
|
|
|
print(f"File not found: {file_path}")
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
if not get_media_type(file_path) == 'video':
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
with VideoFileClip(file_path) as video:
|
|
|
|
|
duration = video.duration
|
|
|
|
|
if duration == 0:
|
|
|
|
|
duration = 1
|
|
|
|
|
return duration
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"Error getting duration for {file_path}: {e}")
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
def get_media_dimensions(media_path):
|
|
|
|
|
if get_media_type(media_path) == 'video':
|
|
|
|
|
return get_video_dimensions(media_path)
|
|
|
|
|
else:
|
|
|
|
|
return get_image_dimensions(media_path)
|
|
|
|
|
|
|
|
|
|
def get_video_dimensions(video_path):
|
|
|
|
|
cap = cv2.VideoCapture(video_path)
|
|
|
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
|
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
|
|
|
cap.release()
|
|
|
|
|
return width, height
|
|
|
|
|
|
|
|
|
|
def get_image_dimensions(image_path):
|
|
|
|
|
try:
|
|
|
|
|
with Image.open(image_path) as img:
|
|
|
|
|
return img.size
|
|
|
|
|
except:
|
|
|
|
|
return 0, 0
|
|
|
|
|
|
|
|
|
|
def get_video_data(video_path):
|
|
|
|
|
data = {'duration': 0, 'width': 0, 'height': 0}
|
|
|
|
|
try:
|
|
|
|
|
with VideoFileClip(video_path) as video:
|
|
|
|
|
data['duration'] = video.duration
|
|
|
|
|
data['width'] = video.size[0]
|
|
|
|
|
data['height'] = video.size[1]
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"Error getting video data for {video_path}: {e}")
|
|
|
|
|
return data
|
|
|
|
|
|
|
|
|
|
def calculate_file_hash(file_path, hash_func='sha256'):
|
|
|
|
|
h = hashlib.new(hash_func)
|
|
|
|
|
with open(file_path, 'rb') as file:
|
|
|
|
|
chunk = file.read(8192)
|
|
|
|
|
while chunk:
|
|
|
|
|
h.update(chunk)
|
|
|
|
|
chunk = file.read(8192)
|
|
|
|
|
return h.hexdigest()
|
|
|
|
|
|
|
|
|
|
def files_are_identical(file1, file2):
|
|
|
|
|
"""Compare two files byte-by-byte."""
|
|
|
|
|
with open(file1, "rb") as f1, open(file2, "rb") as f2:
|
|
|
|
|
while True:
|
|
|
|
|
chunk1 = f1.read(4096)
|
|
|
|
|
chunk2 = f2.read(4096)
|
|
|
|
|
if chunk1 != chunk2:
|
|
|
|
|
return False
|
|
|
|
|
if not chunk1: # End of file
|
|
|
|
|
return True
|