You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

424 lines
16 KiB
Python

import requests
import hashlib
access_key = "ccd3f9d4-9e6f-4bd2-8f594402b5a7-3646-48fe"
video_library_id = 106867
def create_video(title):
url = f"https://video.bunnycdn.com/library/{video_library_id}/videos"
payload = f"{{\"title\":\"{title}\"}}"
headers = {
"accept": "application/json",
"content-type": "application/*+json",
"AccessKey": access_key
}
response = requests.post(url, data=payload, headers=headers)
return response
def generate_signature(library_id, api_key, expiration_time, video_id):
signature = hashlib.sha256((library_id + api_key + str(expiration_time) + video_id).encode()).hexdigest()
return signature
def upload_video_process(file_path, video_id):
url = f"https://video.bunnycdn.com/library/{video_library_id}/videos/{video_id}"
headers = {
"accept": "application/json",
"AccessKey": access_key
}
with open(file_path, "rb") as file:
file_data = file.read()
response = requests.put(url, headers=headers, data=file_data)
return response.status_code
def upload_video(file_path, title=None):
video_item = create_video(title)
if video_item.status_code != 200:
return False
video_id = video_item.json()['guid']
upload_video_process(file_path, video_id)
return {
"embed_link": f"https://vz-58ca89f1-986.b-cdn.net/{video_id}/playlist.m3u8",
"animated_thumbnail": f"https://vz-58ca89f1-986.b-cdn.net/{video_id}/preview.webp",
"default_thumbnail": f"https://vz-58ca89f1-986.b-cdn.net/{video_id}/thumbnail.jpg",
}
def upload_video_recurbate(videoInfo):
title = f"{videoInfo['username']} {videoInfo['platform']}"
video_item = create_video(title)
if video_item.status_code != 200:
return False
video_id = video_item.json()['guid']
upload_video_process(videoInfo['filename'], video_id)
videoInfo["embed_link"] = f"https://vz-58ca89f1-986.b-cdn.net/{video_id}/playlist.m3u8"
videoInfo["animated_thumbnail"] = f"https://vz-58ca89f1-986.b-cdn.net/{video_id}/preview.webp"
videoInfo["default_thumbnail"] = f"https://vz-58ca89f1-986.b-cdn.net/{video_id}/thumbnail.jpg"
return True
def delete_video(video_id):
video_id = video_id.replace('https://vz-58ca89f1-986.b-cdn.net/', '').replace('/playlist.m3u8', '')
url = f"https://video.bunnycdn.com/library/{video_library_id}/videos/{video_id}"
headers = {
"accept": "application/json",
"AccessKey": access_key
}
response = requests.delete(url, headers=headers)
return response.status_code
def list_videos():
url = f"https://video.bunnycdn.com/library/{video_library_id}/videos?page=1&itemsPerPage=2147483647&orderBy=date"
headers = {
"accept": "application/json",
"AccessKey": access_key
}
response = requests.get(url, headers=headers)
return response.json()['items']
def get_heatmap(video_id):
url = "https://video.bunnycdn.com/library/libraryId/videos/videoId/heatmap"
url = url.replace('libraryId', str(video_library_id)).replace('videoId', str(video_id))
headers = {
"accept": "application/json",
"AccessKey": access_key
}
response = requests.get(url, headers=headers).json()
return response
def get_video(video_id):
url = "https://video.bunnycdn.com/library/libraryId/videos/videoId"
url = url.replace('libraryId', str(video_library_id)).replace('videoId', str(video_id))
headers = {
"accept": "application/json",
"AccessKey": access_key
}
response = requests.get(url, headers=headers).json()
return response
import os
import requests
from requests.exceptions import HTTPError
from urllib import parse
class Storage:
def __init__(self, api_key, storage_zone, storage_zone_region="de"):
"""
Creates an object for using BunnyCDN Storage API
Parameters
----------
api_key : String
Your bunnycdn storage
Apikey/FTP password of
storage zone
storage_zone : String
Name of your storage zone
storage_zone_region(optional parameter) : String
The storage zone region code
as per BunnyCDN
"""
self.headers = {
# headers to be passed in HTTP requests
"AccessKey": api_key,
"Content-Type": "application/json",
"Accept": "applcation/json",
}
# applying constraint that storage_zone must be specified
assert storage_zone != "", "storage_zone is not specified/missing"
# For generating base_url for sending requests
if storage_zone_region == "de" or storage_zone_region == "":
self.base_url = "https://storage.bunnycdn.com/" + storage_zone + "/"
else:
self.base_url = (
"https://"
+ storage_zone_region
+ ".storage.bunnycdn.com/"
+ storage_zone
+ "/"
)
def DownloadFile(self, storage_path, download_path=os.getcwd()):
"""
This function will get the files and subfolders of storage zone mentioned in path
and download it to the download_path location mentioned
Parameters
----------
storage_path : String
The path of the directory
(including file name and excluding storage zone name)
from which files are to be retrieved
download_path : String
The directory on local server to which downloaded file must be saved
Note:For download_path instead of '\' '\\' should be used example: C:\\Users\\XYZ\\OneDrive
"""
assert (
storage_path != ""
), "storage_path must be specified" # to make sure storage_path is not null
# to build correct url
if storage_path[0] == "/":
storage_path = storage_path[1:]
if storage_path[-1] == "/":
storage_path = storage_path[:-1]
url = self.base_url + parse.quote(storage_path)
file_name = url.split("/")[-1] # For storing file name
# to return appropriate help messages if file is present or not and download file if present
try:
response = requests.get(url, headers=self.headers, stream=True)
response.raise_for_status()
except HTTPError as http:
return {
"status": "error",
"HTTP": response.status_code,
"msg": f"Http error occured {http}",
}
except Exception as err:
return {
"status": "error",
"HTTP": response.status_code,
"msg": f"error occured {err}",
}
else:
download_path = os.path.join(download_path, file_name)
# Downloading file
with open(download_path, "wb") as file:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
file.write(chunk)
return {
"status": "success",
"HTTP": response.status_code,
"msg": "File downloaded Successfully",
}
def PutFile(
self,
file_name,
storage_path=None,
local_upload_file_path=os.getcwd(),
):
"""
This function uploads files to your BunnyCDN storage zone
Parameters
----------
storage_path : String
The path of directory in storage zone
(including the name of file as desired and excluding storage zone name)
to which file is to be uploaded
file_name : String
The name of the file as stored in local server
local_upload_file_path : String
The path of file as stored in local server(excluding file name)
from where file is to be uploaded
Examples
--------
file_name : 'ABC.txt'
local_upload_file_path : 'C:\\User\\Sample_Directory'
storage_path : '<Directory name in storage zone>/<file name as to be uploaded on storage zone>.txt'
#Here .txt because the file being uploaded in example is txt
"""
local_upload_file_path = os.path.join(local_upload_file_path, file_name)
# to build correct url
if storage_path is not None and storage_path != "":
if storage_path[0] == "/":
storage_path = storage_path[1:]
if storage_path[-1] == "/":
storage_path = storage_path[:-1]
url = self.base_url + parse.quote(storage_path)
else:
url = self.base_url + parse.quote(file_name)
with open(local_upload_file_path, "rb") as file:
file_data = file.read()
response = requests.put(url, data=file_data, headers=self.headers)
try:
response.raise_for_status()
except HTTPError as http:
return {
"status": "error",
"HTTP": response.status_code,
"msg": f"Upload Failed HTTP Error Occured: {http}",
}
else:
return {
"status": "success",
"HTTP": response.status_code,
"msg": "The File Upload was Successful",
}
def DeleteFile(self, storage_path=""):
"""
This function deletes a file or folder mentioned in the storage_path from the storage zone
Parameters
----------
storage_path : The directory path to your file (including file name) or folder which is to be deleted.
If this is the root of your storage zone, you can ignore this parameter.
"""
# Add code below
assert (
storage_path != ""
), "storage_path must be specified" # to make sure storage_path is not null
# to build correct url
if storage_path[0] == "/":
storage_path = storage_path[1:]
url = self.base_url + parse.quote(storage_path)
try:
response = requests.delete(url, headers=self.headers)
response.raise_for_status
except HTTPError as http:
return {
"status": "error",
"HTTP": response.raise_for_status(),
"msg": f"HTTP Error occured: {http}",
}
except Exception as err:
return {
"status": "error",
"HTTP": response.status_code,
"msg": f"Object Delete failed ,Error occured:{err}",
}
else:
return {
"status": "success",
"HTTP": response.status_code,
"msg": "Object Successfully Deleted",
}
def GetStoragedObjectsList(self, storage_path=None):
"""
This functions returns a list of files and directories located in given storage_path.
Parameters
----------
storage_path : The directory path that you want to list.
"""
# to build correct url
if storage_path is not None:
if storage_path[0] == "/":
storage_path = storage_path[1:]
if storage_path[-1] != "/":
url = self.base_url + parse.quote(storage_path) + "/"
else:
url = self.base_url
# Sending GET request
try:
response = requests.get(url, headers=self.headers)
response.raise_for_status()
except HTTPError as http:
return {
"status": "error",
"HTTP": response.status_code,
"msg": f"http error occured {http}",
}
else:
storage_list = []
for dictionary in response.json():
temp_dict = {}
for key in dictionary:
if key == "ObjectName" and dictionary["IsDirectory"] is False:
temp_dict["File_Name"] = dictionary[key]
if key == "ObjectName" and dictionary["IsDirectory"]:
temp_dict["Folder_Name"] = dictionary[key]
storage_list.append(temp_dict)
return storage_list
def MoveFile(self, old_path, new_path):
"""
Moves a file by downloading from the old path and uploading to the new path,
then deleting from the old path. Uses existing PutFile and DeleteFile methods.
Parameters
----------
old_path : str
The current path (relative to storage zone root) of the file to move.
new_path : str
The new path (relative to storage zone root) for the file.
Returns
-------
dict
A dictionary containing 'status', 'msg', and optionally 'HTTP'.
"""
# Validate arguments
if not old_path or not new_path:
return {
"status": "error",
"msg": "Both old_path and new_path must be provided."
}
# 1. Download from old_path to a temporary local directory
# If you already have the file locally, you can skip this download step.
download_response = self.DownloadFile(old_path, download_path="temp")
if download_response.get("status") != "success":
return {
"status": "error",
"msg": f"Failed to download file for moving. Reason: {download_response.get('msg', 'unknown')}",
"HTTP": download_response.get("HTTP")
}
# Extract the filename from old_path to know what we downloaded
filename = os.path.basename(old_path)
# 2. Upload to new_path using existing PutFile
# We'll assume new_path includes the desired filename. If it does not, adjust logic.
put_response = self.PutFile(
file_name=filename,
storage_path=new_path, # e.g. "folder/newfile.jpg"
local_upload_file_path="temp" # where we downloaded it
)
if put_response.get("status") != "success":
return {
"status": "error",
"msg": f"Failed to upload file to new path. Reason: {put_response.get('msg', 'unknown')}",
"HTTP": put_response.get("HTTP")
}
# 3. Delete the original file using existing DeleteFile
delete_response = self.DeleteFile(old_path)
if delete_response.get("status") != "success":
return {
"status": "error",
"msg": f"Failed to delete old file. Reason: {delete_response.get('msg', 'unknown')}",
"HTTP": delete_response.get("HTTP")
}
# (Optional) Clean up the local temp file
local_temp_path = os.path.join("temp", filename)
if os.path.exists(local_temp_path):
os.remove(local_temp_path)
return {
"status": "success",
"msg": f"File successfully moved from '{old_path}' to '{new_path}'."
}