old snappy master

main
oscar 11 months ago
parent e9f3404d1c
commit 345673a480

@ -1,79 +0,0 @@
import os, requests, config
from snapchat import get_stories, get_highlight_stories, get_all_users_data
def get_file_extension(url):
response = requests.head(url)
if response.status_code != 200:
print(f"Failed to access media {url}")
return None
content_type = response.headers.get('Content-Type', '')
if 'image' in content_type:
return '.jpg'
elif 'video' in content_type:
return '.mp4'
else:
print(f"Unknown content type for media {url}")
return None
import re
def extract_file_type(url):
# Use a regular expression to extract the file type number
match = re.search(r"/d/[^.]+\.([0-9]+)\.", url)
if match:
return match.group(1) # Return the number as a string
return None
def map_file_type_to_extension(urls):
file_type_to_extension = {}
seen_file_types = set()
for url in urls:
# Extract the file type number
file_type_number = extract_file_type(url)
if not file_type_number:
continue
# Skip if we've already checked this file type
if file_type_number in seen_file_types:
continue
# Use the get_file_extension function to determine the extension
file_extension = get_file_extension(url)
if file_extension:
file_type_to_extension[file_type_number] = file_extension
seen_file_types.add(file_type_number)
return file_type_to_extension
def main():
cursor.execute("SELECT username FROM following WHERE platform = 'snapchat' ORDER BY id DESC")
usernames = [row[0] for row in cursor.fetchall()]
snapchat_users_data = get_all_users_data(usernames)
all_stories = [get_stories(data) + get_highlight_stories(data) for data in snapchat_users_data.values()]
processed_stories = []
for stories in all_stories:
processed_stories.extend(stories)
all_urls = [story['url'] for story in processed_stories]
# Map file type numbers to extensions
file_type_to_extension = map_file_type_to_extension(all_urls)
# Print the mapping
print("File Type to Extension Mapping:")
for file_type, extension in file_type_to_extension.items():
print(f"File Type {file_type}: {extension}")
if __name__ == '__main__':
print('Starting snappy...')
db, cursor = config.gen_connection()
obj_storage = config.get_storage()
main()
print("Processing completed.")

@ -1,15 +1,24 @@
from uuid import uuid4
from datetime import datetime
import os, requests, config, json, funcs, cv2, re
import os, requests, config, json, funcs, cv2
from snapchat import get_stories, get_highlight_stories, get_all_users_data
directory = "snapchat"
data_directory = "data"
def get_existing_snap_ids(directory):
existing_snap_ids = set()
for root, _, files in os.walk(directory):
for file in files:
if '~' not in file:
continue
filename, _ = os.path.splitext(file)
snap_id = filename.split('~')[2]
existing_snap_ids.add(snap_id)
return existing_snap_ids
def find_duplicate_snap(existing_snaps, snap_id, username):
"""
Find a snap in the existing_snaps list on database.s
"""
for snap in existing_snaps:
if username == snap[2]:
if snap_id in snap[1]:
@ -38,30 +47,6 @@ def get_file_extension(url):
print(f"Unknown content type for media {url}")
return None
def extract_file_type(url):
file_types = {
'400': '.jpg',
'1322': '.mp4',
'1325': '.mp4',
'1034': '.mp4',
'1023': '.jpg'
}
base_url = url.split("?")[0] # Remove query string
snap_data = base_url.split('/')[-1]
# Extract the file type number
data_parts = snap_data.split('.')
if len(data_parts) > 1:
file_type_number = data_parts[1]
if file_type_number in file_types:
return file_types[file_type_number]
else:
print(f"Unexpected URL format: {base_url}")
return None
def download_media(url, filepath):
if os.path.exists(filepath):
print(f"File {filepath} already exists. Skipping download.")
@ -76,56 +61,7 @@ def download_media(url, filepath):
f.write(response.content)
return filepath
def get_all_stories(usernames):
snapchat_users_data = get_all_users_data(usernames)
all_stories = []
for username in usernames:
print(f"Getting stories for {username}...")
data = snapchat_users_data.get(username)
if not data:
print(f"Failed to get data for {username}. Skipping.")
continue
archive_data(data, username)
print("Getting stories...")
stories = get_stories(data)
print("Getting highlights...")
stories.extend(get_highlight_stories(data))
for story in stories:
snap_id = story['snap_id']
url = story['url']
timestamp = story['timestamp']
# Determine file extension using HEAD request.
extension = extract_file_type(url)
if not extension:
print(f"Failed to determine file extension for {url}. Skipping.")
continue
filename = f"{username}~{timestamp}~{snap_id}{extension}"
filepath = os.path.join(directory, filename)
media = {
'username': username,
'timestamp': timestamp,
'filepath': filepath,
'snap_id': snap_id,
'original_snap_id': story['original_snap_id'],
'media_url': url,
}
all_stories.append(media)
print(f"Media {snap_id} ready for download.")
all_stories.extend(stories)
return all_stories
def get_snapchat_stories():
def main():
os.makedirs(directory, exist_ok=True)
os.makedirs(data_directory, exist_ok=True)
@ -134,7 +70,9 @@ def get_snapchat_stories():
cursor.execute("SELECT id, filename, username FROM media WHERE filename IS NOT NULL AND platform = 'snapchat' ORDER BY id DESC")
existing_medias = cursor.fetchall()
existing_snap_ids = get_existing_snap_ids(directory)
snapchat_users_data = get_all_users_data(usernames)
ready_stories = []
@ -146,7 +84,7 @@ def get_snapchat_stories():
if not data:
print(f"Failed to get data for {username}. Skipping.")
continue
archive_data(data, username)
print("Getting stories...")
@ -165,15 +103,24 @@ def get_snapchat_stories():
print(f"Media {snap_id} already exists. Skipping download.")
continue
# Check if media already exists
if snap_id in existing_snap_ids:
print(f"Media {snap_id} already exists. Skipping download.")
continue
# Determine file extension using HEAD request.
extension = extract_file_type(url)
extension = get_file_extension(url)
if not extension:
print(f"Failed to determine file extension for {url}. Skipping.")
continue
filename = f"{username}~{timestamp}~{snap_id}{extension}"
filepath = os.path.join(directory, filename)
# Check if file already exists
if os.path.exists(filepath):
print(f"File {filename} already exists. Skipping download.")
continue
media = {
'username': username,
'timestamp': timestamp,
@ -185,40 +132,19 @@ def get_snapchat_stories():
ready_stories.append(media)
print(f"Media {snap_id} ready for download.")
# sort ready_stories by timestamp from oldest to newest
ready_stories.sort(key=lambda x: x['timestamp'])
return ready_stories
def download_stories(stories):
for story in stories:
for media in ready_stories:
# Download the media
filepath = story['filepath']
url = story['media_url'] if 'media_url' in story else None
filename = os.path.basename(filepath)
timestamp = story['timestamp']
filepath = download_media(url, filepath)
print(f"Downloaded {filename} at {timestamp}")
if not filepath:
continue
story['filepath'] = filepath
UploadMedia(story)
media['filepath'] = filepath
def main():
ready_stories = get_snapchat_stories()
stories_from_files = funcs.get_files(directory)
stories_from_files = [get_media_data(filepath) for filepath in stories_from_files]
stories_from_files = [story for story in stories_from_files if story]
ready_stories.extend(stories_from_files)
download_stories(ready_stories)
UploadMedia(media)
def UploadMedia(media):
username = media['username']
@ -234,7 +160,7 @@ def UploadMedia(media):
file_hash = funcs.calculate_file_hash(filepath)
post_date = datetime.fromtimestamp(int(timestamp))
post_date = datetime.fromtimestamp(int(timestamp)) if timestamp else datetime.now()
width, height = funcs.get_media_dimensions(filepath)
@ -288,7 +214,7 @@ def get_media_data(filepath):
snap_id = parts[2]
snap_id = os.path.splitext(snap_id)[0]
data = {'username': username, 'timestamp': timestamp, 'filepath': filepath, 'snap_id': snap_id, 'original_snap_id': None}
data = {'username': username, 'timestamp': timestamp, 'filepath': filepath, 'media_id': snap_id}
return data

Loading…
Cancel
Save