You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
95 lines
3.2 KiB
Python
95 lines
3.2 KiB
Python
|
11 months ago
|
import os
|
||
|
|
import json
|
||
|
|
import config
|
||
|
|
import imagehash
|
||
|
|
from PIL import Image
|
||
|
|
from funcs import get_files, calculate_file_hash
|
||
|
|
|
||
|
|
|
||
|
|
def generate_image_phash(filepath, hash_size=8):
|
||
|
|
try:
|
||
|
|
# Open the image using PIL
|
||
|
|
pil_image = Image.open(filepath)
|
||
|
|
|
||
|
|
# Compute pHash using the imagehash library
|
||
|
|
phash = imagehash.phash(pil_image, hash_size=hash_size)
|
||
|
|
return phash
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Error processing image {filepath}: {e}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
def are_phashes_duplicates(phash1, phash2, threshold=5):
|
||
|
|
try:
|
||
|
|
# Compute the Hamming distance between the pHashes
|
||
|
|
distance = phash1 - phash2
|
||
|
|
return distance <= threshold
|
||
|
|
except TypeError as e:
|
||
|
|
print(f"Error comparing pHashes: {e}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
def get_media_by_phash(phash, username, existing_medias, threshold=5):
|
||
|
|
for media in existing_medias:
|
||
|
|
existing_phash_str = media[1]
|
||
|
|
existing_username = media[2]
|
||
|
|
|
||
|
|
# Convert stored pHash string to ImageHash object
|
||
|
|
existing_phash = imagehash.hex_to_hash(existing_phash_str)
|
||
|
|
|
||
|
|
# Check if the current pHash is a duplicate
|
||
|
|
if are_phashes_duplicates(phash, existing_phash, threshold=threshold):
|
||
|
|
return media
|
||
|
|
return None
|
||
|
|
|
||
|
|
def get_media_by_hash(hash, existing_medias):
|
||
|
|
for media in existing_medias:
|
||
|
|
existing_hash = media[1]
|
||
|
|
if hash == existing_hash:
|
||
|
|
return media
|
||
|
|
return None
|
||
|
|
|
||
|
|
def get_media_by_id(media_id, existing_medias):
|
||
|
|
for media in existing_medias:
|
||
|
|
existing_media_id = media[1]
|
||
|
|
if media_id == existing_media_id:
|
||
|
|
return media
|
||
|
|
return None
|
||
|
|
|
||
|
|
def get_data_by_filename(filename, data):
|
||
|
|
for item in data:
|
||
|
|
if filename in item['filepath']:
|
||
|
|
return item
|
||
|
|
return None
|
||
|
|
|
||
|
|
directory = 'check_if_exists' # Directory containing user images
|
||
|
|
|
||
|
|
# Database connection
|
||
|
|
db, cursor = config.gen_connection()
|
||
|
|
|
||
|
|
# Fetch existing media with pHashes (assuming media are images, adjust media_type if needed)
|
||
|
|
cursor.execute("SELECT id, phash, username FROM media WHERE media_type = %s AND phash IS NOT NULL", ['image'])
|
||
|
|
existing_medias = cursor.fetchall()
|
||
|
|
|
||
|
|
usernames = os.listdir(directory)
|
||
|
|
|
||
|
|
for username in usernames:
|
||
|
|
files = get_files(os.path.join(directory, username))
|
||
|
|
for filepath in files:
|
||
|
|
image_filename = os.path.basename(filepath)
|
||
|
|
print(f'Processing {image_filename}...')
|
||
|
|
|
||
|
|
# Generate pHash for the image
|
||
|
|
phash = generate_image_phash(filepath, hash_size=8)
|
||
|
|
if phash is None:
|
||
|
|
continue # Skip this image if there's an issue
|
||
|
|
|
||
|
|
phash_str = str(phash)
|
||
|
|
|
||
|
|
# Check if the image is a duplicate of any in the database
|
||
|
|
duplicate_media = get_media_by_phash(phash, username, existing_medias, threshold=5)
|
||
|
|
if duplicate_media:
|
||
|
|
print(f'Duplicate found: https://altpins.com/pin/{duplicate_media[0]}')
|
||
|
|
print(f'Duplicate image path: {filepath}')
|
||
|
|
newpath = os.path.join('duplicates', duplicate_media[2], image_filename)
|
||
|
|
os.makedirs(os.path.dirname(newpath), exist_ok=True)
|
||
|
|
os.rename(filepath, newpath)
|
||
|
|
print(f'Moved {image_filename} to duplicates/')
|