MisskeyBooruImageBot_v2/gelbooru_poster.py

355 lines
17 KiB
Python
Raw Normal View History

2023-10-10 18:30:37 +00:00
# Script to take a random image with certain tags from Gelbooru and post it to Misskey
import os
import sys
import copy
import requests
import random
import json
import time
import traceback
from PIL import Image
from io import BytesIO
2023-10-13 17:18:02 +00:00
import urllib.parse
2023-10-10 18:30:37 +00:00
2023-10-13 21:50:15 +00:00
def gelboorusource_urlcleaner(url):
# Split out all urls in the string
url = url.split(" ")[0]
#Check if the url matches the old pixiv url format
if url.startswith("http://www.pixiv.net/member_illust.php?mode=medium&illust_id="):
#Extract the id from the url
url = url.replace("http://www.pixiv.net/member_illust.php?mode=medium&illust_id=", "")
#Return the new url
return "https://www.pixiv.net/en/artworks/" + url
return url
2023-10-10 18:30:37 +00:00
class BotInstance:
# Gelbooru API URL
gelbooru_url = "https://gelbooru.com/index.php?page=dapi&s=post&q=index&json=1&limit=100&tags="
# Misskey API URL
misskey_url = "https://misskey.io/api/"
# Misskey API token
misskey_token = "NONE"
# Bot message
bot_message = "Random image from Gelbooru"
# Gelbooru tags
gelbooru_tags = ""
# Gelbooru tags to exclude
gelbooru_tags_exclude = ""
2023-10-13 22:12:42 +00:00
def __init__(self, cfg_name, config, log_file):
self.log_file = log_file
2023-10-10 18:30:37 +00:00
self.cfg_name = cfg_name
self.gelbooru_tags = config["gelbooru_tags"]
self.gelbooru_tags_exclude = config["gelbooru_tags_exclude"]
self.bot_message = config["bot_message"]
self.bot_hashtags = config["bot_hashtags"]
self.misskey_url = config["misskey_url"]
self.misskey_token = config["misskey_token"]
self.max_page_number = config["max_page_number"]
2023-10-13 23:23:06 +00:00
def format_message(self, image_src, image_url):
if isinstance(self.bot_message, list):
msg = random.choice(self.bot_message)
else:
msg = self.bot_message
if random.randint(0, 100) < 5:
msg = msg.replace("$dh$", self.bot_hashtags)
else:
msg = msg.replace("$dh$", "")
msg = msg.replace("$gel_src$", "[Gelbooru Link](%s)" % (image_url))
msg = msg.replace("$src$", "[Source](%s)" % (image_src))
msg = msg.strip()
return msg
2023-10-10 18:30:37 +00:00
# Get a random image from Gelbooru
def get_random_image(self, max_page_number = 100):
# Get a random page number
page_number = random.randint(0, max_page_number)
2023-10-13 23:05:59 +00:00
2023-10-10 18:30:37 +00:00
# Get the JSON data from the API
if self.gelbooru_tags_exclude != "":
gelbooru_tags_exclude = " " + self.gelbooru_tags_exclude
2023-10-10 18:30:37 +00:00
else:
gelbooru_tags_exclude = ""
gelbooru_json = requests.get(self.gelbooru_url + urllib.parse.quote_plus(self.gelbooru_tags + gelbooru_tags_exclude) + "&pid=" + str(page_number)).json()
2023-10-13 22:14:03 +00:00
print ("[%s][%s] Gelbooru Url: %s" % (time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), self.cfg_name, self.gelbooru_url + urllib.parse.quote_plus(self.gelbooru_tags + gelbooru_tags_exclude) + "&pid=" + str(page_number)), file=self.log_file)
2023-10-10 18:30:37 +00:00
max_pages = gelbooru_json['@attributes']['count'] // 100 + (1 if gelbooru_json['@attributes']['count'] % 100 != 0 else 0)
if max_pages > 200:
max_pages = 200
2023-10-10 18:30:37 +00:00
# Make sure there are images on the page
if 'post' not in gelbooru_json:
return None, None, None, None, max_pages
2023-10-13 23:05:59 +00:00
# Choose a random image from the page
image_number = random.randint(0, len(gelbooru_json['post']))
2023-10-10 18:30:37 +00:00
# Save json to file for debugging
#with open("gelbooru.json", "w") as gelbooru_json_file:
# gelbooru_json_file.write(str(gelbooru_json))
# Get the image ID
image_id = gelbooru_json['post'][image_number]["id"]
image_post_url = "https://gelbooru.com/index.php?page=post&s=view&id=" + str(image_id)
2023-10-10 18:30:37 +00:00
# Get the image URL
image_url = gelbooru_json['post'][image_number]["file_url"]
# Get the image source if exists
if 'source' not in gelbooru_json['post'][image_number] or gelbooru_json['post'][image_number]["source"] == "":
image_src = image_url
else:
2023-10-13 21:50:15 +00:00
image_src = gelboorusource_urlcleaner(gelbooru_json['post'][image_number]["source"])
2023-10-10 18:30:37 +00:00
# Get the image rating
image_rating = gelbooru_json['post'][image_number]["rating"]
return image_url, image_src, image_post_url, image_rating, max_pages
2023-10-10 18:30:37 +00:00
# Download and post the image to Misskey
def post_image(self, image_url, image_src, image_post_url, image_rating, log_file):
2023-10-10 18:30:37 +00:00
image_found = False
2023-10-13 19:21:19 +00:00
#Extract image filename, replace extension with .jpg
image_fname = os.path.splitext(os.path.split(image_url)[-1])[0] + ".jpg"
2023-10-13 19:41:01 +00:00
# Try to determine if the image_src is a fediverse link, if so renote it instead of posting a new note
post_request = requests.post(self.misskey_url + "ap/show", json = {"uri": image_src, "i": self.misskey_token})
if post_request.status_code == 200:
post_json = post_request.json()
if 'id' in post_json:
# Submit a /notes/create request to Misskey
create_note_request = requests.post(self.misskey_url + "notes/create", json = {"renoteId": post_json['id'], "text":"", "i": self.misskey_token})
# If error, print error and exit
if create_note_request.status_code != 200:
print(self.cfg_name + ": Error: ", file=log_file)
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=log_file)
print(create_note_request.json()["error"]["message"], file=log_file)
return True
#Check if the image is already uploaded
2023-10-13 19:21:19 +00:00
file_presence_check = requests.post(self.misskey_url + "drive/files/find", json = {"name": image_fname, "i": self.misskey_token})
2023-10-10 18:30:37 +00:00
if file_presence_check.status_code != 200:
image_found = False
else:
file_presence_json = file_presence_check.json()
image_found = len(file_presence_json) > 0
2023-10-13 19:41:01 +00:00
# If the image is not uploaded, download, optimize and upload it
2023-10-10 18:30:37 +00:00
if not image_found:
# If the file is a static image, download, optimize and post it to Misskey
if image_url.endswith(".jpg") or image_url.endswith(".jpeg") or image_url.endswith(".png"):
# Download the image and save it to a file
image_request = requests.get(image_url)
# If error, print error and exit
if image_request.status_code != 200:
2023-10-13 19:10:15 +00:00
print(self.cfg_name + ": Error: ", file=log_file)
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=log_file)
print(image_request.json()["error"]["message"], file=log_file)
print(image_url, file=log_file)
2023-10-10 18:30:37 +00:00
return False
2023-10-10 18:30:37 +00:00
# Optimise the image by reducing it to max width of 2048px
image = Image.open(BytesIO(image_request.content))
image_original_exists = False
if os.path.splitext(image_url)[-1] == ".jpg" or os.path.splitext(image_url)[-1] == ".jpeg":
image_original_exists = True
with open("image_original.jpg", "wb") as image_file:
image_file.write(image_request.content)
2023-10-10 18:30:37 +00:00
if image.width > 2048:
2023-10-13 17:10:55 +00:00
image = image.resize((2048, int(image.height * (2048 / image.width))), Image.LANCZOS)
2023-10-10 18:30:37 +00:00
# Apply JPEG compression
image = image.convert('RGB')
image.save("image.jpg", optimize=True, quality=90)
# If the image is larger than the original, use the original
if image_original_exists and os.path.getsize("image.jpg") > os.path.getsize("image_original.jpg"):
image_upld_src = "image_original.jpg"
else:
image_upld_src = "image.jpg"
# Submit a /drive/files/create request to Misskey
create_file_request = requests.post(self.misskey_url + "drive/files/create", data = {"name": image_fname, "i": self.misskey_token, "isSensitive": str(image_rating != 'general').lower()}, files = {"file": open(image_upld_src, "rb")})
# If error, print error and exit
if create_file_request.status_code != 200:
print(self.cfg_name + ": Error: ", file=log_file)
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=log_file)
print(create_file_request.json()["error"]["message"], file=log_file)
return False
2023-10-10 18:30:37 +00:00
2023-10-13 19:41:01 +00:00
# Delete the temporary image files
if os.path.exists("image.jpg"):
os.remove("image.jpg")
if os.path.exists("image_original.jpg"):
os.remove("image_original.jpg")
2023-10-10 18:30:37 +00:00
else:
upload_from_url_request = requests.post(self.misskey_url + "drive/files/upload-from-url", json = {"url": image_url, "isSensitive": image_rating != 'general', "i": self.misskey_token})
# If error, print error and exit
if upload_from_url_request.status_code != 204 and upload_from_url_request.status_code != 200:
2023-10-13 19:10:15 +00:00
print(self.cfg_name + ": Error: ", file=log_file)
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=log_file)
print(upload_from_url_request.json()["error"]["message"], file=log_file)
2023-10-10 18:30:37 +00:00
return False
# Wait for the image to be uploaded
time.sleep(1)
2023-10-13 19:41:01 +00:00
# Wait for the image to be uploaded and get the file ID
2023-10-10 18:30:37 +00:00
attempts = 0
while True:
# Get the file ID using the /drive/files/find request
2023-10-13 19:21:19 +00:00
file_id_request = requests.post(self.misskey_url + "drive/files/find", json = {"name": image_fname, "i": self.misskey_token})
2023-10-10 18:30:37 +00:00
# If error, print error and exit
if file_id_request.status_code != 200:
2023-10-13 19:10:15 +00:00
print(self.cfg_name + ": Error: ", file=log_file)
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=log_file)
print(file_id_request.json()["error"]["message"], file=log_file)
2023-10-10 18:30:37 +00:00
return False
file_id_json = file_id_request.json()
if len(file_id_json) > 0:
file_id = file_id_json[0]["id"]
break
if attempts > 10:
2023-10-13 19:10:15 +00:00
print(self.cfg_name + ": Error: ", file=log_file)
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=log_file)
print("Image not uploaded", file=log_file)
2023-10-10 18:30:37 +00:00
return False
# If the image hasn't been uploaded after 10 attempts, exit
attempts += 1
# Wait and try again
print("Waiting for image to be uploaded...\n", file=log_file)
2023-10-10 18:30:37 +00:00
time.sleep(min(30, (attempts ** 2) / 2))
# Submit a /notes/create request to Misskey
msg = self.format_message(image_src, image_post_url)
2023-10-13 23:26:58 +00:00
create_note_request = requests.post(self.misskey_url + "notes/create", json = {"fileIds": [file_id], "text": msg, "i": self.misskey_token})
2023-10-10 18:30:37 +00:00
# If error, print error and exit
if create_note_request.status_code != 200:
2023-10-13 19:10:15 +00:00
print(self.cfg_name + ": Error: ", file=log_file)
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=log_file)
print(create_note_request.json()["error"]["message"], file=log_file)
2023-10-10 18:30:37 +00:00
return True
def bot_process(self, log_file):
# Get a random image making sure it's not in the saved image list
2023-10-13 22:33:23 +00:00
attempts = 10
2023-10-10 18:30:37 +00:00
while True:
2023-10-13 22:33:23 +00:00
if attempts <= 0:
return False
image_url, image_src, image_post_url, image_rating, cur_page_number = self.get_random_image(max_page_number=self.max_page_number)
2023-10-13 18:40:13 +00:00
self.max_page_number = cur_page_number
2023-10-10 18:30:37 +00:00
if image_url is None:
2023-10-13 22:33:23 +00:00
attempts -= 1
2023-10-10 18:30:37 +00:00
continue
break
# Download and post the image to Misskey
return self.post_image(image_url, image_src, image_post_url, image_rating, log_file)
2023-10-10 18:30:37 +00:00
def generate_config(defaults):
if os.path.exists("config.json"):
with open("config.json", "r") as config_file:
config = json.load(config_file)
else:
config = {}
config['bot_name'] = {
'gelbooru_tags': defaults['gelbooru_tags'],
'gelbooru_tags_exclude': defaults['gelbooru_tags_exclude'],
'bot_message': defaults['bot_message'],
'bot_hashtags': defaults['bot_hashtags'],
'misskey_url': defaults['misskey_url'],
'misskey_token': defaults['misskey_token'],
'max_page_number': defaults['max_page_number'],
'last_run_time': -1,
}
with open("config.json", "w") as config_file:
json.dump(config, config_file, indent=4)
def generate_defaults():
if os.path.exists("defaults.json"):
with open("defaults.json", "r") as config_file:
config = json.load(config_file)
else:
config = {}
config['gelbooru_tags'] = 'rating:safe'
config['gelbooru_tags_exclude'] = ''
2023-10-13 23:25:41 +00:00
config['bot_message'] = ['Random image from Gelbooru $dh$\n$gel_src$\n$src$']
2023-10-10 18:30:37 +00:00
config['bot_hashtags'] = '#gelbooru #random'
config['misskey_url'] = 'https://misskey.example.com/'
config['misskey_token'] = ''
config['max_page_number'] = 1000
with open("defaults.json", "w") as config_file:
json.dump(config, config_file, indent=4)
# Main function
def main():
if not os.path.exists("defaults.json"):
generate_defaults()
with open("defaults.json", "r") as config_file:
defaults = json.load(config_file)
if not os.path.exists("config.json"):
generate_config(defaults)
sys.exit(0)
# If first argument is '--gen-config', generate config.json:
if len(sys.argv) > 1 and sys.argv[1] == "--gen-config":
# Generate a config.json entry
generate_config(defaults)
elif len(sys.argv) > 1 and sys.argv[1] == "--help":
print("Usage: python3 gelbooru-bot.py [--gen-config] [--help]")
print(" --gen-config: Add a new bot to the config.json file")
print(" --help: Show this help message")
print(" No arguments: Run the bot")
print(" Note: The values in defaults.json will be used if the values are not set in config.json")
else:
# Load set of configs to run from json config
with open("config.json", "r") as config_file:
config = json.load(config_file)
# Create and run bot instances for each config in config.json
with open('log.txt', 'a') as log_file:
for cfg_name in config:
# Set missing config values to defaults
cfg_tmp = copy.deepcopy(config[cfg_name])
for key in defaults:
if key not in cfg_tmp:
cfg_tmp[key] = defaults[key]
if cfg_tmp['last_run_time'] == -1 or cfg_tmp['last_run_time'] > time.time() + 60 * 60: # If last run time is in the future, set it to 1 hour ago
cfg_tmp['last_run_time'] = time.time() - 60 * 60
if cfg_tmp['last_run_time'] != -1 and time.time() - cfg_tmp['last_run_time'] < 60 * 60:
continue
try:
2023-10-13 22:12:42 +00:00
bot_instance = BotInstance(cfg_name, cfg_tmp, log_file)
2023-10-13 22:33:23 +00:00
if not bot_instance.bot_process(log_file):
print("\n" + cfg_name + ": Error: ", file=log_file)
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=log_file)
print("Error running bot", file=log_file)
continue
2023-10-10 18:30:37 +00:00
# Save the saved image list to config.json
config[cfg_name]["max_page_number"] = bot_instance.max_page_number
# Save the last run time
config[cfg_name]["last_run_time"] = time.time()
# If error, print error and continue
except Exception as e:
2023-10-13 18:22:44 +00:00
#Print time
print("\n" + cfg_name + ": Error: ", file=log_file)
2023-10-13 18:22:44 +00:00
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=log_file)
2023-10-10 18:30:37 +00:00
traceback.print_exc(file=log_file)
continue
# Save the saved image list to config.json
with open("config.json", "w") as config_file:
json.dump(config, config_file, indent=4)
# Run main function
if __name__ == "__main__":
main()