from snapchat import get_stories, get_highlight_stories, get_all_users_data, parse_stories from datetime import datetime from uuid import uuid4 import config import funcs import cv2 import os import json UPLOAD_MODE = True media_directory = "media" snapchat_directory = "snapchat" temp_directory = ".temp" data_directory = "data" directory = os.path.join(media_directory, snapchat_directory) os.makedirs(media_directory, exist_ok=True) os.makedirs(directory, exist_ok=True) os.makedirs(temp_directory, exist_ok=True) os.makedirs(data_directory, exist_ok=True) def find_duplicate_snap(existing_snap_ids, snap_id): return snap_id in existing_snap_ids def archive_data(data, username): try: current_timestamp = int(datetime.now().timestamp()) data_filename = f"{username}~{current_timestamp}.json" data_filepath = os.path.join(data_directory, data_filename) with open(data_filepath, 'w') as f: f.write(json.dumps(data, indent=4)) except: print(f"Failed to archive data for {username}.") return False def get_snapchat_stories(usernames): snapchat_users_data = get_all_users_data(usernames) snapchat_users_data = dict(sorted(snapchat_users_data.items())) ready_stories = [] for username, data in snapchat_users_data.items(): print(f"Getting stories for {username}...") if not data: print(f"Failed to get data for {username}. Skipping.") continue archive_data(data, username) stories = get_stories(data) stories = parse_stories(stories) stories.extend(get_highlight_stories(data)) for story in stories: snap_id = story['snap_id'] url = story['url'] timestamp = story['timestamp'] # Determine file extension file_exts = {'image': '.jpg', 'video': '.mp4'} extension = file_exts.get(story['media_type']) if not extension: print(f"Failed to determine file extension for {url}. Skipping.") continue filename = f"{username}~{timestamp}~{snap_id}{extension}" filepath = os.path.join(directory, filename) story['media_url'] = url story['snap_id'] = snap_id story['filepath'] = filepath story['username'] = username story['timestamp'] = timestamp story['original_snap_id'] = story['original_snap_id'] ready_stories.append(story) ready_stories.sort(key=lambda x: x['timestamp']) return ready_stories def get_snapchat_files(): stories = funcs.get_files(directory) stories = [get_media_data(filepath) for filepath in stories] stories = [story for story in stories if story] return stories def main(): print('Initializing snappy...') ready_stories = [] stories_from_files = get_snapchat_files() cursor.execute("SELECT username FROM following WHERE platform = 'snapchat' ORDER BY id DESC") usernames = [row[0] for row in cursor.fetchall()] print(f"Getting stories for {len(usernames)} users...") new_stories = get_snapchat_stories(usernames) cleaned_stories = [] print("Checking for duplicates...") for story in new_stories: duplicate_snap = find_duplicate_snap(existing_snap_ids, story['snap_id']) if duplicate_snap: print(f"Snap {story['filepath']} already exists in the database. Removing...") continue cleaned_stories.append(story) cleaned_stories = download_stories(cleaned_stories) ready_stories.extend(cleaned_stories) ready_stories.extend(stories_from_files) for story in ready_stories: UploadMedia(story) def download_stories(stories): downloaded_stories = [] for story in stories: filepath = story['filepath'] url = story['media_url'] filepath = funcs.download_file(url, filepath) print(f"Downloaded {os.path.basename(filepath)}") if not filepath: continue story['hash'] = funcs.calculate_file_hash(filepath) story['size'] = os.path.getsize(filepath) downloaded_stories.append(story) return downloaded_stories def UploadMedia(media): file_size = media['size'] file_hash = media['hash'] filepath = media['filepath'] filename = os.path.basename(filepath) username = media['username'] timestamp = media['timestamp'] media_type = media['media_type'] snap_id = media['snap_id'] original_snap_id = media['original_snap_id'] thumbnail_url = None phash = None duplicate_snap = find_duplicate_snap(existing_snap_ids, media['snap_id']) if duplicate_snap: print(f"Snap {filename} already exists in the database. Removing...") os.remove(filepath) return False post_date = datetime.fromtimestamp(int(timestamp)) width, height = funcs.get_media_dimensions(filepath) duration = funcs.get_video_duration(filepath) if media_type == 'image': phash = funcs.generate_phash(filepath) elif media_type == 'video': try: thumb_path = generate_thumbnail(filepath) obj_storage.PutFile(thumb_path, f'thumbnails/{filename}') thumbnail_url = f"https://storysave.b-cdn.net/thumbnails/{filename}" phash = funcs.generate_phash(thumb_path) os.remove(thumb_path) except: print('Error generating thumbnail. Skipping...') return False server_path = f'media/snaps/{username}/{filename}' file_url = f"https://storysave.b-cdn.net/{server_path}" obj_storage.PutFile(filepath, server_path) query = "INSERT IGNORE INTO media (username, media_type, media_url, width, height, post_type, date, hash, filename, duration, thumbnail, phash, platform, snap_id, original_snap_id, file_size) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" values = (username, media_type, file_url, width, height, 'story', post_date, file_hash, filename, duration, thumbnail_url, phash, 'snapchat', snap_id, original_snap_id, file_size) cursor.execute(query, values) db.commit() print(f'[{cursor.rowcount}] records updated. File {filename} uploaded to {file_url}') os.remove(filepath) return True def generate_thumbnail(filepath): thumb_path = os.path.join(temp_directory, f'{uuid4()}.jpg') cap = cv2.VideoCapture(filepath) ret, frame = cap.read() cv2.imwrite(thumb_path, frame) cap.release() return thumb_path def get_media_data(filepath): filename = os.path.basename(filepath) parts = filename.split('~') if len(parts) < 3: return False username = parts[0] timestamp = parts[1] snap_id = parts[2] snap_id = os.path.splitext(snap_id)[0] file_size = os.path.getsize(filepath) file_hash = funcs.calculate_file_hash(filepath) data = { "username": username, "timestamp": timestamp, "filepath": filepath, "snap_id": snap_id, "original_snap_id": None, "media_url": None, "size": file_size, "hash": file_hash } return data if __name__ == '__main__': print('Starting snappy...') db, cursor = config.gen_connection() obj_storage = config.get_storage() cursor.execute("SELECT snap_id FROM media WHERE filename IS NOT NULL AND platform = 'snapchat' ORDER BY id DESC") existing_snap_ids = cursor.fetchall() existing_snap_ids = {row[0] for row in existing_snap_ids} main() print("Processing completed.")