# Script to take a random image with certain tags from Gelbooru and post it to Misskey import os import sys import copy import requests import random import json import time import traceback from PIL import Image from io import BytesIO import urllib.parse class BotInstance: # Gelbooru API URL gelbooru_url = "https://gelbooru.com/index.php?page=dapi&s=post&q=index&json=1&limit=100&tags=" # Misskey API URL misskey_url = "https://misskey.io/api/" # Misskey API token misskey_token = "NONE" # Bot message bot_message = "Random image from Gelbooru" # Gelbooru tags gelbooru_tags = "" # Gelbooru tags to exclude gelbooru_tags_exclude = "" def __init__(self, cfg_name, config): self.cfg_name = cfg_name self.gelbooru_tags = config["gelbooru_tags"] self.gelbooru_tags_exclude = config["gelbooru_tags_exclude"] self.bot_message = config["bot_message"] self.bot_hashtags = config["bot_hashtags"] self.misskey_url = config["misskey_url"] self.misskey_token = config["misskey_token"] self.max_page_number = config["max_page_number"] # Get a random image from Gelbooru def get_random_image(self, max_page_number = 100): # Get a random page number page_number = random.randint(0, max_page_number) # Get a random image from the page image_number = random.randint(0, 100) # Get the JSON data from the API if self.gelbooru_tags_exclude != "": gelbooru_tags_exclude = " " + self.gelbooru_tags_exclude else: gelbooru_tags_exclude = "" gelbooru_json = requests.get(self.gelbooru_url + urllib.parse.quote_plus(self.gelbooru_tags + gelbooru_tags_exclude) + "&pid=" + str(page_number)).json() max_pages = gelbooru_json['@attributes']['count'] // 100 + (1 if gelbooru_json['@attributes']['count'] % 100 != 0 else 0) # Make sure there are images on the page if 'post' not in gelbooru_json: return None, None, None, max_pages # Make sure the image number is valid if image_number > len(gelbooru_json['post']): image_number = random.randint(0, len(gelbooru_json['post'])) # Save json to file for debugging #with open("gelbooru.json", "w") as gelbooru_json_file: # gelbooru_json_file.write(str(gelbooru_json)) # Get the image URL image_url = gelbooru_json['post'][image_number]["file_url"] # Get the image source if exists if 'source' not in gelbooru_json['post'][image_number] or gelbooru_json['post'][image_number]["source"] == "": image_src = image_url else: image_src = gelbooru_json['post'][image_number]["source"] # Get the image rating image_rating = gelbooru_json['post'][image_number]["rating"] return image_url, image_src, image_rating, max_pages # Download and post the image to Misskey def post_image(self, image_url, image_src, image_rating, log_file): image_found = False file_presence_check = requests.post(self.misskey_url + "drive/files/find", json = {"name": os.path.split(image_url)[-1], "i": self.misskey_token}) if file_presence_check.status_code != 200: image_found = False else: file_presence_json = file_presence_check.json() image_found = len(file_presence_json) > 0 if not image_found: # If the file is a static image, download, optimize and post it to Misskey if image_url.endswith(".jpg") or image_url.endswith(".jpeg") or image_url.endswith(".png"): # Download the image and save it to a file image_request = requests.get(image_url) # If error, print error and exit if image_request.status_code != 200: print(self.cfg_name + ": Error: ", file=log_file) print("\n", file=log_file) print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=log_file) print("\n", file=log_file) print(image_request.json()["error"]["message"], file=log_file) print("\n", file=log_file) print(image_url, file=log_file) return False #Save image_request.content, based on image_url extension with open("image_original" + os.path.splitext(image_url)[-1], "wb") as image_file: image_file.write(image_request.content) # Optimise the image by reducing it to max width of 2048px image = Image.open(BytesIO(image_request.content)) if image.width > 2048: image = image.resize((2048, int(image.height * (2048 / image.width))), Image.LANCZOS) # Apply JPEG compression image = image.convert('RGB') image.save("image.jpg", optimize=True, quality=90) # If the image is larger than the original, use the original if os.path.getsize("image.jpg") > os.path.getsize("image_original" + os.path.splitext(image_url)[-1]): upload_from_url_request = requests.post(self.misskey_url + "drive/files/upload-from-url", json = {"url": image_url, "isSensitive": image_rating != 'general', "i": self.misskey_token}) # If error, print error and exit if upload_from_url_request.status_code != 204 and upload_from_url_request.status_code != 200: print(self.cfg_name + ": Error: ", file=log_file) print("\n", file=log_file) print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=log_file) print("\n", file=log_file) print(upload_from_url_request.json()["error"]["message"], file=log_file) return False else: # Submit a /drive/files/create request to Misskey create_file_request = requests.post(self.misskey_url + "drive/files/create", data = {"name": os.path.split(image_url)[-1], "i": self.misskey_token, "isSensitive": str(image_rating != 'general').lower()}, files = {"file": open("image.jpg", "rb")}) # If error, print error and exit if create_file_request.status_code != 200: print(self.cfg_name + ": Error: ", file=log_file) print("\n", file=log_file) print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=log_file) print("\n", file=log_file) print(create_file_request.json()["error"]["message"], file=log_file) return False os.remove("image.jpg") os.remove("image_original" + os.path.splitext(image_url)[-1]) else: upload_from_url_request = requests.post(self.misskey_url + "drive/files/upload-from-url", json = {"url": image_url, "isSensitive": image_rating != 'general', "i": self.misskey_token}) # If error, print error and exit if upload_from_url_request.status_code != 204 and upload_from_url_request.status_code != 200: print(self.cfg_name + ": Error: ", file=log_file) print("\n", file=log_file) print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=log_file) print("\n", file=log_file) print(upload_from_url_request.json()["error"]["message"], file=log_file) return False # Wait for the image to be uploaded time.sleep(1) attempts = 0 while True: # Get the file ID using the /drive/files/find request file_id_request = requests.post(self.misskey_url + "drive/files/find", json = {"name": os.path.split(image_url)[-1], "i": self.misskey_token}) # If error, print error and exit if file_id_request.status_code != 200: print(self.cfg_name + ": Error: ", file=log_file) print("\n", file=log_file) print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=log_file) print("\n", file=log_file) print(file_id_request.json()["error"]["message"], file=log_file) return False file_id_json = file_id_request.json() if len(file_id_json) > 0: file_id = file_id_json[0]["id"] break if attempts > 10: print(self.cfg_name + ": Error: ", file=log_file) print("\n", file=log_file) print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=log_file) print("\n", file=log_file) print("Image not uploaded", file=log_file) return False # If the image hasn't been uploaded after 10 attempts, exit attempts += 1 # Wait and try again time.sleep(min(30, (attempts ** 2) / 2)) # Try to determine if the image_src is a fediverse link, if so renote it instead of posting a new note post_request = requests.post(self.misskey_url + "ap/show", json = {"uri": image_src, "i": self.misskey_token}) if post_request.status_code == 200: post_json = post_request.json() if 'id' in post_json: # Submit a /notes/create request to Misskey if isinstance(self.bot_message, list): msg = random.choice(self.bot_message) else: msg = self.bot_message if random.randint(0, 100) < 5: msg += " " + self.bot_hashtags create_note_request = requests.post(self.misskey_url + "notes/create", json = {"renoteId": post_json['id'], "text": "%s\n[Source](%s)\n" % (msg, image_src), "i": self.misskey_token}) # If error, print error and exit if create_note_request.status_code != 200: print(self.cfg_name + ": Error: ", file=log_file) print("\n", file=log_file) print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=log_file) print("\n", file=log_file) print(create_note_request.json()["error"]["message"], file=log_file) return True # Submit a /notes/create request to Misskey if isinstance(self.bot_message, list): msg = random.choice(self.bot_message) else: msg = self.bot_message if random.randint(0, 100) < 5: msg += " " + self.bot_hashtags create_note_request = requests.post(self.misskey_url + "notes/create", json = {"fileIds": [file_id], "text": "%s\n[Source](%s)\n" % (msg, image_src), "i": self.misskey_token}) # If error, print error and exit if create_note_request.status_code != 200: print(self.cfg_name + ": Error: ", file=log_file) print("\n", file=log_file) print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=log_file) print("\n", file=log_file) print(create_note_request.json()["error"]["message"], file=log_file) return True def bot_process(self, log_file): # Get a random image making sure it's not in the saved image list while True: image_url, image_src, image_rating, cur_page_number = self.get_random_image(max_page_number=self.max_page_number) self.max_page_number = cur_page_number if image_url is None: continue break # Download and post the image to Misskey self.post_image(image_url, image_src, image_rating, log_file) def generate_config(defaults): if os.path.exists("config.json"): with open("config.json", "r") as config_file: config = json.load(config_file) else: config = {} config['bot_name'] = { 'gelbooru_tags': defaults['gelbooru_tags'], 'gelbooru_tags_exclude': defaults['gelbooru_tags_exclude'], 'bot_message': defaults['bot_message'], 'bot_hashtags': defaults['bot_hashtags'], 'misskey_url': defaults['misskey_url'], 'misskey_token': defaults['misskey_token'], 'max_page_number': defaults['max_page_number'], 'last_run_time': -1, } with open("config.json", "w") as config_file: json.dump(config, config_file, indent=4) def generate_defaults(): if os.path.exists("defaults.json"): with open("defaults.json", "r") as config_file: config = json.load(config_file) else: config = {} config['gelbooru_tags'] = 'rating:safe' config['gelbooru_tags_exclude'] = '' config['bot_message'] = ['Random image from Gelbooru'] config['bot_hashtags'] = '#gelbooru #random' config['misskey_url'] = 'https://misskey.example.com/' config['misskey_token'] = '' config['max_page_number'] = 1000 with open("defaults.json", "w") as config_file: json.dump(config, config_file, indent=4) # Main function def main(): if not os.path.exists("defaults.json"): generate_defaults() with open("defaults.json", "r") as config_file: defaults = json.load(config_file) if not os.path.exists("config.json"): generate_config(defaults) sys.exit(0) # If first argument is '--gen-config', generate config.json: if len(sys.argv) > 1 and sys.argv[1] == "--gen-config": # Generate a config.json entry generate_config(defaults) elif len(sys.argv) > 1 and sys.argv[1] == "--help": print("Usage: python3 gelbooru-bot.py [--gen-config] [--help]") print(" --gen-config: Add a new bot to the config.json file") print(" --help: Show this help message") print(" No arguments: Run the bot") print(" Note: The values in defaults.json will be used if the values are not set in config.json") else: # Load set of configs to run from json config with open("config.json", "r") as config_file: config = json.load(config_file) # Create and run bot instances for each config in config.json with open('log.txt', 'a') as log_file: for cfg_name in config: # Set missing config values to defaults cfg_tmp = copy.deepcopy(config[cfg_name]) for key in defaults: if key not in cfg_tmp: cfg_tmp[key] = defaults[key] if cfg_tmp['last_run_time'] == -1 or cfg_tmp['last_run_time'] > time.time() + 60 * 60: # If last run time is in the future, set it to 1 hour ago cfg_tmp['last_run_time'] = time.time() - 60 * 60 if cfg_tmp['last_run_time'] != -1 and time.time() - cfg_tmp['last_run_time'] < 60 * 60: continue try: bot_instance = BotInstance(cfg_name, cfg_tmp) bot_instance.bot_process(log_file) # Save the saved image list to config.json config[cfg_name]["max_page_number"] = bot_instance.max_page_number # Save the last run time config[cfg_name]["last_run_time"] = time.time() # If error, print error and continue except Exception as e: #Print time print("\n\n" + cfg_name + ": Error: ", file=log_file) print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=log_file) print("\n\n", file=log_file) traceback.print_exc(file=log_file) continue # Save the saved image list to config.json with open("config.json", "w") as config_file: json.dump(config, config_file, indent=4) # Run main function if __name__ == "__main__": main()