iricardoxd's picture
Update upload.py
2b3ff64
raw
history blame
26.2 kB
"""
`tiktok_uploader` module for uploading videos to TikTok
Key Functions
-------------
upload_video : Uploads a single TikTok video
upload_videos : Uploads multiple TikTok videos
"""
from os.path import abspath, exists
from typing import List
import time
import pytz
import datetime
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.keys import Keys
from browsers import get_browser
from tiktok_uploader.auth import AuthBackend
from tiktok_uploader import logger
from tiktok_uploader.utils import bold, green
import toml
config = toml.load("./config.toml")
from proxy_auth_extension.proxy_auth_extension import proxy_is_working
def upload_video(filename=None, description='', schedule: datetime.datetime = None, username='',
password='', cookies='', sessionid=None, cookies_list=None, cookies_str=None, proxy=None, *args, **kwargs):
"""
Uploads a single TikTok video.
Conder using `upload_videos` if using multiple videos
Parameters
----------
filename : str
The path to the video to upload
description : str
The description to set for the video
schedule: datetime.datetime
The datetime to schedule the video, must be naive or aware with UTC timezone, if naive it will be aware with UTC timezone
cookies : str
The cookies to use for uploading
sessionid: str
The `sessionid` is the only required cookie for uploading,
but it is recommended to use all cookies to avoid detection
"""
auth = AuthBackend(username=username, password=password, cookies=cookies,
cookies_list=cookies_list, cookies_str=cookies_str, sessionid=sessionid)
return upload_videos(
videos=[ { 'path': filename, 'description': description, 'schedule': schedule } ],
auth=auth,
proxy=proxy,
*args, **kwargs
)
def upload_videos(videos: list = None, auth: AuthBackend = None, proxy: dict = None, browser='chrome',
browser_agent=None, on_complete=None, headless=True, num_retires : int = 1, *args, **kwargs):
"""
Uploads multiple videos to TikTok
Parameters
----------
videos : list
A list of dictionaries containing the video's ('path') and description ('description')
proxy: dict
A dictionary containing the proxy user, pass, host and port
browser : str
The browser to use for uploading
browser_agent : selenium.webdriver
A selenium webdriver object to use for uploading
on_complete : function
A function to call when the upload is complete
headless : bool
Whether or not the browser should be run in headless mode
num_retries : int
The number of retries to attempt if the upload fails
options : SeleniumOptions
The options to pass into the browser -> custom privacy settings, etc.
*args :
Additional arguments to pass into the upload function
**kwargs :
Additional keyword arguments to pass into the upload function
Returns
-------
failed : list
A list of videos which failed to upload
"""
videos = _convert_videos_dict(videos)
if videos and len(videos) > 1:
logger.debug("Uploading %d videos", len(videos))
if not browser_agent: # user-specified browser agent
logger.debug('Create a %s browser instance %s', browser,
'in headless mode' if headless else '')
driver = get_browser(name=browser, headless=headless, proxy=proxy, *args, **kwargs)
else:
logger.debug('Using user-defined browser agent')
driver = browser_agent
if proxy:
if proxy_is_working(driver, proxy['host']):
logger.debug(green('Proxy is working'))
else:
logger.error('Proxy is not working')
driver.quit()
raise Exception('Proxy is not working')
driver = auth.authenticate_agent(driver)
driver.set_window_size(1280, 1280) # Adjust the window size here
failed = []
# uploads each video
for video in videos:
try:
path = abspath(video.get('path'))
description = video.get('description', '')
schedule = video.get('schedule', None)
logger.debug('Posting %s%s', bold(video.get('path')),
f'\n{" " * 15}with description: {bold(description)}' if description else '')
# Video must be of supported type
if not _check_valid_path(path):
print(f'{path} is invalid, skipping')
#failed.append(video)
continue
# Video must have a valid datetime for tiktok's scheduler
if schedule:
timezone = pytz.UTC
if schedule.tzinfo is None:
schedule = schedule.astimezone(timezone)
elif int(schedule.utcoffset().total_seconds()) == 0: # Equivalent to UTC
schedule = timezone.localize(schedule)
else:
print(f'{schedule} is invalid, the schedule datetime must be naive or aware with UTC timezone, skipping')
#failed.append(video)
continue
valid_tiktok_minute_multiple = 5
schedule = _get_valid_schedule_minute(schedule, valid_tiktok_minute_multiple)
if not _check_valid_schedule(schedule):
print(f'{schedule} is invalid, the schedule datetime must be as least 20 minutes in the future, and a maximum of 10 days, skipping')
#failed.append(video)
continue
complete_upload_form(driver, path, description, schedule,
num_retires=num_retires, headless=headless,
*args, **kwargs)
except Exception as exception:
logger.error('Failed to upload %s', path)
logger.error(exception)
#failed.append(video)
if on_complete is callable: # calls the user-specified on-complete function
on_complete(video)
xd = take_screenshot(driver)
if config['quit_on_end']:
driver.quit()
return xd
#return failed
from PIL import Image
from io import BytesIO
from selenium.common.exceptions import WebDriverException
def take_screenshot(wd):
screenshot=0
try:
wd.set_window_size(1280, 1280) # Adjust the window size here
wd.implicitly_wait(10)
screenshot = wd.get_screenshot_as_png()
open('screenshot.png', 'wb').write(screenshot)
except WebDriverException as e:
return Image.new('RGB', (1, 1))
return Image.open(BytesIO(screenshot))
def complete_upload_form(driver, path: str, description: str, schedule: datetime.datetime, headless=True, *args, **kwargs) -> None:
"""
Actually uploades each video
Parameters
----------
driver : selenium.webdriver
The selenium webdriver to use for uploading
path : str
The path to the video to upload
"""
_go_to_upload(driver)
_set_video(driver, path=path, **kwargs)
_set_interactivity(driver, **kwargs)
_set_description(driver, description)
if schedule:
_set_schedule_video(driver, schedule)
_post_video(driver)
def _go_to_upload(driver) -> None:
"""
Navigates to the upload page, switches to the iframe and waits for it to load
Parameters
----------
driver : selenium.webdriver
"""
logger.debug(green('Navigating to upload page'))
driver.get(config['paths']['upload'])
# changes to the iframe
iframe_selector = EC.presence_of_element_located(
(By.XPATH, config['selectors']['upload']['iframe'])
)
iframe = WebDriverWait(driver, config['explicit_wait']).until(iframe_selector)
driver.switch_to.frame(iframe)
# waits for the iframe to load
root_selector = EC.presence_of_element_located((By.ID, 'root'))
WebDriverWait(driver, config['explicit_wait']).until(root_selector)
def _set_description(driver, description: str) -> None:
"""
Sets the description of the video
Parameters
----------
driver : selenium.webdriver
description : str
The description to set
"""
if description is None:
# if no description is provided, filename
return
logger.debug(green('Setting description'))
saved_description = description # save the description in case it fails
desc = driver.find_element(By.XPATH, config['selectors']['upload']['description'])
# desc populates with filename before clearing
WebDriverWait(driver, config['explicit_wait']).until(lambda driver: desc.text != '')
_clear(desc)
try:
while description:
nearest_mention = description.find('@')
nearest_hash = description.find('#')
if nearest_mention == 0 or nearest_hash == 0:
desc.send_keys('@' if nearest_mention == 0 else '#')
# wait for the frames to load
time.sleep(config['implicit_wait'])
name = description[1:].split(' ')[0]
if nearest_mention == 0: # @ case
mention_xpath = config['selectors']['upload']['mention_box']
condition = EC.presence_of_element_located((By.XPATH, mention_xpath))
mention_box = WebDriverWait(driver, config['explicit_wait']).until(condition)
mention_box.send_keys(name)
else:
desc.send_keys(name)
time.sleep(config['implicit_wait'])
if nearest_mention == 0: # @ case
mention_xpath = config['selectors']['upload']['mentions'].format('@' + name)
condition = EC.presence_of_element_located((By.XPATH, mention_xpath))
else:
hashtag_xpath = config['selectors']['upload']['hashtags'].format(name)
condition = EC.presence_of_element_located((By.XPATH, hashtag_xpath))
elem = WebDriverWait(driver, config['explicit_wait']).until(condition)
ActionChains(driver).move_to_element(elem).click(elem).perform()
description = description[len(name) + 2:]
else:
min_index = _get_splice_index(nearest_mention, nearest_hash, description)
desc.send_keys(description[:min_index])
description = description[min_index:]
except Exception as exception:
print('Failed to set description: ', exception)
_clear(desc)
desc.send_keys(saved_description) # if fail, use saved description
def _clear(element) -> None:
"""
Clears the text of the element (an issue with the TikTok website when automating)
Parameters
----------
element
The text box to clear
"""
element.send_keys(2 * len(element.text) * Keys.BACKSPACE)
def _set_video(driver, path: str = '', num_retries: int = 3, **kwargs) -> None:
"""
Sets the video to upload
Parameters
----------
driver : selenium.webdriver
path : str
The path to the video to upload
num_retries : number of retries (can occassionally fail)
"""
# uploades the element
logger.debug(green('Uploading video file'))
for _ in range(num_retries):
try:
logger.debug(green('Uploading video file 1'))
upload_box = driver.find_element(
By.XPATH, config['selectors']['upload']['upload_video']
)
logger.debug(green('Uploading video file 2'))
upload_box.send_keys(path)
logger.debug(green('Uploading video file 3'))
# waits for the upload progress bar to disappear
upload_progress = EC.presence_of_element_located(
(By.XPATH, config['selectors']['upload']['upload_in_progress'])
)
logger.debug(green('Uploading video file 4'))
WebDriverWait(driver, config['implicit_wait']).until(upload_progress)
WebDriverWait(driver, config['explicit_wait']).until_not(upload_progress)
logger.debug(green('Uploading video file 5'))
# waits for the video to upload
upload_confirmation = EC.presence_of_element_located(
(By.XPATH, config['selectors']['upload']['upload_confirmation'])
)
logger.debug(green('Uploading video file 6'))
# An exception throw here means the video failed to upload an a retry is needed
WebDriverWait(driver, config['explicit_wait']).until(upload_confirmation)
logger.debug(green('Uploading video file 7'))
# wait until a non-draggable image is found
process_confirmation = EC.presence_of_element_located(
(By.XPATH, config['selectors']['upload']['process_confirmation'])
)
logger.debug(green('Uploading video file 8'))
WebDriverWait(driver, config['explicit_wait']).until(process_confirmation)
logger.debug(green('Uploading video file 9'))
return
except Exception as exception:
print(exception)
raise FailedToUpload()
def _set_interactivity(driver, comment=True, stitch=True, duet=True, *args, **kwargs) -> None:
"""
Sets the interactivity settings of the video
Parameters
----------
driver : selenium.webdriver
comment : bool
Whether or not to allow comments
stitch : bool
Whether or not to allow stitching
duet : bool
Whether or not to allow duets
"""
try:
logger.debug(green('Setting interactivity settings'))
comment_box = driver.find_element(By.XPATH, config['selectors']['upload']['comment'])
stitch_box = driver.find_element(By.XPATH, config['selectors']['upload']['stitch'])
duet_box = driver.find_element(By.XPATH, config['selectors']['upload']['duet'])
# xor the current state with the desired state
if comment ^ comment_box.is_selected():
comment_box.click()
if stitch ^ stitch_box.is_selected():
stitch_box.click()
if duet ^ duet_box.is_selected():
duet_box.click()
except Exception as _:
logger.error('Failed to set interactivity settings')
def _set_schedule_video(driver, schedule: datetime.datetime) -> None:
"""
Sets the schedule of the video
Parameters
----------
driver : selenium.webdriver
schedule : datetime.datetime
The datetime to set
"""
logger.debug(green('Setting schedule'))
driver_timezone = __get_driver_timezone(driver)
schedule = schedule.astimezone(driver_timezone)
month = schedule.month
day = schedule.day
hour = schedule.hour
minute = schedule.minute
try:
switch = driver.find_element(By.XPATH, config['selectors']['schedule']['switch'])
switch.click()
__date_picker(driver, month, day)
__time_picker(driver, hour, minute)
except Exception as e:
msg = f'Failed to set schedule: {e}'
print(msg)
logger.error(msg)
raise FailedToUpload()
def __date_picker(driver, month: int, day: int) -> None:
logger.debug(green('Picking date'))
condition = EC.presence_of_element_located(
(By.XPATH, config['selectors']['schedule']['date_picker'])
)
date_picker = WebDriverWait(driver, config['implicit_wait']).until(condition)
date_picker.click()
condition = EC.presence_of_element_located(
(By.XPATH, config['selectors']['schedule']['calendar'])
)
calendar = WebDriverWait(driver, config['implicit_wait']).until(condition)
calendar_month = driver.find_element(By.XPATH, config['selectors']['schedule']['calendar_month']).text
n_calendar_month = datetime.datetime.strptime(calendar_month, '%B').month
if n_calendar_month != month: # Max can be a month before or after
if n_calendar_month < month:
arrow = driver.find_elements(By.XPATH, config['selectors']['schedule']['calendar_arrows'])[-1]
else:
arrow = driver.find_elements(By.XPATH, config['selectors']['schedule']['calendar_arrows'])[0]
arrow.click()
valid_days = driver.find_elements(By.XPATH, config['selectors']['schedule']['calendar_valid_days'])
day_to_click = None
for day_option in valid_days:
if int(day_option.text) == day:
day_to_click = day_option
break
if day_to_click:
day_to_click.click()
else:
raise Exception('Day not found in calendar')
__verify_date_picked_is_correct(driver, month, day)
def __verify_date_picked_is_correct(driver, month: int, day: int):
date_selected = driver.find_element(By.XPATH, config['selectors']['schedule']['date_picker']).text
date_selected_month = int(date_selected.split('-')[1])
date_selected_day = int(date_selected.split('-')[2])
if date_selected_month == month and date_selected_day == day:
logger.debug(green('Date picked correctly'))
else:
msg = f'Something went wrong with the date picker, expected {month}-{day} but got {date_selected_month}-{date_selected_day}'
logger.error(msg)
raise Exception(msg)
def __time_picker(driver, hour: int, minute: int) -> None:
logger.debug(green('Picking time'))
condition = EC.presence_of_element_located(
(By.XPATH, config['selectors']['schedule']['time_picker'])
)
time_picker = WebDriverWait(driver, config['implicit_wait']).until(condition)
time_picker.click()
condition = EC.presence_of_element_located(
(By.XPATH, config['selectors']['schedule']['time_picker_container'])
)
time_picker_container = WebDriverWait(driver, config['implicit_wait']).until(condition)
# 00 = 0, 01 = 1, 02 = 2, 03 = 3, 04 = 4, 05 = 5, 06 = 6, 07 = 7, 08 = 8, 09 = 9, 10 = 10, 11 = 11, 12 = 12,
# 13 = 13, 14 = 14, 15 = 15, 16 = 16, 17 = 17, 18 = 18, 19 = 19, 20 = 20, 21 = 21, 22 = 22, 23 = 23
hour_options = driver.find_elements(By.XPATH, config['selectors']['schedule']['timepicker_hours'])
# 00 == 0, 05 == 1, 10 == 2, 15 == 3, 20 == 4, 25 == 5, 30 == 6, 35 == 7, 40 == 8, 45 == 9, 50 == 10, 55 == 11
minute_options = driver.find_elements(By.XPATH, config['selectors']['schedule']['timepicker_minutes'])
hour_to_click = hour_options[hour]
minute_option_correct_index = int(minute / 5)
minute_to_click = minute_options[minute_option_correct_index]
driver.execute_script("arguments[0].scrollIntoView({block: 'center', inline: 'nearest'});", hour_to_click)
hour_to_click.click()
driver.execute_script("arguments[0].scrollIntoView({block: 'center', inline: 'nearest'});", minute_to_click)
minute_to_click.click()
# click somewhere else to close the time picker
time_picker.click()
time.sleep(.5) # wait for the DOM change
__verify_time_picked_is_correct(driver, hour, minute)
def __verify_time_picked_is_correct(driver, hour: int, minute: int):
time_selected = driver.find_element(By.XPATH, config['selectors']['schedule']['time_picker_text']).text
time_selected_hour = int(time_selected.split(':')[0])
time_selected_minute = int(time_selected.split(':')[1])
if time_selected_hour == hour and time_selected_minute == minute:
logger.debug(green('Time picked correctly'))
else:
msg = f'Something went wrong with the time picker, ' \
f'expected {hour:02d}:{minute:02d} ' \
f'but got {time_selected_hour:02d}:{time_selected_minute:02d}'
logger.error(msg)
raise Exception(msg)
def _post_video(driver) -> None:
"""
Posts the video by clicking the post button
Parameters
----------
driver : selenium.webdriver
"""
logger.debug(green('Clicking the post button'))
WebDriverWait(driver, config['explicit_wait']).until(config['selectors']['upload']['post'])
logger.debug(green('Clicking the post button 1'))
post = driver.find_element(By.XPATH, config['selectors']['upload']['post'])
logger.debug(green('Clicking the post button 2'))
post.click()
logger.debug(green('Clicking the post button 3'))
# waits for the video to upload
post_confirmation = EC.presence_of_element_located(
(By.XPATH, config['selectors']['upload']['post_confirmation'])
)
logger.debug(green('Clicking the post button 4'))
WebDriverWait(driver, config['explicit_wait']).until(post_confirmation)
logger.debug(green('Video posted successfully'))
# HELPERS
def _check_valid_path(path: str) -> bool:
"""
Returns whether or not the filetype is supported by TikTok
"""
return exists(path) and path.split('.')[-1] in config['supported_file_types']
def _get_valid_schedule_minute(schedule, valid_multiple) -> datetime.datetime:
"""
Returns a datetime.datetime with valid minute for TikTok
"""
if _is_valid_schedule_minute(schedule.minute, valid_multiple):
return schedule
else:
return _set_valid_schedule_minute(schedule, valid_multiple)
def _is_valid_schedule_minute(minute, valid_multiple) -> bool:
if minute % valid_multiple != 0:
return False
else:
return True
def _set_valid_schedule_minute(schedule, valid_multiple) -> datetime.datetime:
minute = schedule.minute
remainder = minute % valid_multiple
integers_to_valid_multiple = 5 - remainder
schedule += datetime.timedelta(minutes=integers_to_valid_multiple)
return schedule
def _check_valid_schedule(schedule: datetime.datetime) -> bool:
"""
Returns if the schedule is supported by TikTok
"""
valid_tiktok_minute_multiple = 5
margin_to_complete_upload_form = 5
datetime_utc_now = pytz.UTC.localize(datetime.datetime.utcnow())
min_datetime_tiktok_valid = datetime_utc_now + datetime.timedelta(minutes=15)
min_datetime_tiktok_valid += datetime.timedelta(minutes=margin_to_complete_upload_form)
max_datetime_tiktok_valid = datetime_utc_now + datetime.timedelta(days=10)
if schedule < min_datetime_tiktok_valid \
or schedule > max_datetime_tiktok_valid:
return False
elif not _is_valid_schedule_minute(schedule.minute, valid_tiktok_minute_multiple):
return False
else:
return True
def _get_splice_index(nearest_mention: int, nearest_hashtag: int, description: str) -> int:
"""
Returns the index to splice the description at
Parameters
----------
nearest_mention : int
The index of the nearest mention
nearest_hashtag : int
The index of the nearest hashtag
Returns
-------
int
The index to splice the description at
"""
if nearest_mention == -1 and nearest_hashtag == -1:
return len(description)
elif nearest_hashtag == -1:
return nearest_mention
elif nearest_mention == -1:
return nearest_hashtag
else:
return min(nearest_mention, nearest_hashtag)
def _convert_videos_dict(videos_list_of_dictionaries) -> List:
"""
Takes in a videos dictionary and converts it.
This allows the user to use the wrong stuff and thing to just work
"""
if not videos_list_of_dictionaries:
raise RuntimeError("No videos to upload")
valid_path = config['valid_path_names']
valid_description = config['valid_descriptions']
correct_path = valid_path[0]
correct_description = valid_description[0]
def intersection(lst1, lst2):
""" return the intersection of two lists """
return list(set(lst1) & set(lst2))
return_list = []
for elem in videos_list_of_dictionaries:
# preprocesses the dictionary
elem = {k.strip().lower(): v for k, v in elem.items()}
keys = elem.keys()
path_intersection = intersection(valid_path, keys)
description_interesection = intersection(valid_description, keys)
if path_intersection:
# we have a path
path = elem[path_intersection.pop()]
if not _check_valid_path(path):
raise RuntimeError("Invalid path: " + path)
elem[correct_path] = path
else:
# iterates over the elem and find a key which is a path with a valid extension
for _, value in elem.items():
if _check_valid_path(value):
elem[correct_path] = value
break
else:
# no valid path found
raise RuntimeError("Path not found in dictionary: " + str(elem))
if description_interesection:
# we have a description
elem[correct_description] = elem[description_interesection.pop()]
else:
# iterates over the elem and finds a description which is not a valid path
for _, value in elem.items():
if not _check_valid_path(value):
elem[correct_description] = value
break
else:
elem[correct_description] = '' # null description is fine
return_list.append(elem)
return return_list
def __get_driver_timezone(driver) -> pytz.timezone:
"""
Returns the timezone of the driver
"""
timezone_str = driver.execute_script("return Intl.DateTimeFormat().resolvedOptions().timeZone")
return pytz.timezone(timezone_str)
class DescriptionTooLong(Exception):
"""
A video description longer than the maximum allowed by TikTok's website (not app) uploader
"""
def __init__(self, message=None):
super().__init__(message or self.__doc__)
class FailedToUpload(Exception):
"""
A video failed to upload
"""
def __init__(self, message=None):
super().__init__(message or self.__doc__)