MisskeyBooruImageBot_v2/gelbooru_poster.py

281 lines
13 KiB
Python

# Script to take a random image with certain tags from Gelbooru and post it to Misskey
import os
import sys
import copy
import requests
import random
import json
import time
import traceback
from PIL import Image
from io import BytesIO
class BotInstance:
# Gelbooru API URL
gelbooru_url = "https://gelbooru.com/index.php?page=dapi&s=post&q=index&json=1&limit=100&tags="
# Misskey API URL
misskey_url = "https://misskey.io/api/"
# Misskey API token
misskey_token = "NONE"
# Bot message
bot_message = "Random image from Gelbooru"
# Gelbooru tags
gelbooru_tags = ""
# Gelbooru tags to exclude
gelbooru_tags_exclude = ""
def __init__(self, cfg_name, config):
self.cfg_name = cfg_name
self.gelbooru_tags = config["gelbooru_tags"]
self.gelbooru_tags_exclude = config["gelbooru_tags_exclude"]
self.bot_message = config["bot_message"]
self.bot_hashtags = config["bot_hashtags"]
self.misskey_url = config["misskey_url"]
self.misskey_token = config["misskey_token"]
self.max_page_number = config["max_page_number"]
# Get a random image from Gelbooru
def get_random_image(self, max_page_number = 100):
# Get a random page number
page_number = random.randint(0, max_page_number)
# Get a random image from the page
image_number = random.randint(0, 100)
# Get the JSON data from the API
if self.gelbooru_tags_exclude != "":
gelbooru_tags_exclude = "+" + self.gelbooru_tags_exclude
else:
gelbooru_tags_exclude = ""
gelbooru_json = requests.get(self.gelbooru_url + self.gelbooru_tags + '+' + gelbooru_tags_exclude + "&pid=" + str(page_number)).json()
max_pages = gelbooru_json['@attributes']['count'] // 100 + (1 if gelbooru_json['@attributes']['count'] % 100 != 0 else 0)
# Make sure there are images on the page
if 'post' not in gelbooru_json:
return None, None, None, max_pages
# Make sure the image number is valid
if image_number > len(gelbooru_json['post']):
image_number = random.randint(0, len(gelbooru_json['post']))
# Save json to file for debugging
#with open("gelbooru.json", "w") as gelbooru_json_file:
# gelbooru_json_file.write(str(gelbooru_json))
# Get the image URL
image_url = gelbooru_json['post'][image_number]["file_url"]
# Get the image source if exists
if 'source' not in gelbooru_json['post'][image_number] or gelbooru_json['post'][image_number]["source"] == "":
image_src = image_url
else:
image_src = gelbooru_json['post'][image_number]["source"]
# Get the image rating
image_rating = gelbooru_json['post'][image_number]["rating"]
return image_url, image_src, image_rating, max_pages
# Download and post the image to Misskey
def post_image(self, image_url, image_src, image_rating, log_file):
image_found = False
file_presence_check = requests.post(self.misskey_url + "drive/files/find", json = {"name": os.path.split(image_url)[-1], "i": self.misskey_token})
if file_presence_check.status_code != 200:
image_found = False
else:
file_presence_json = file_presence_check.json()
image_found = len(file_presence_json) > 0
if not image_found:
# If the file is a static image, download, optimize and post it to Misskey
if image_url.endswith(".jpg") or image_url.endswith(".jpeg") or image_url.endswith(".png"):
# Download the image and save it to a file
image_request = requests.get(image_url)
# If error, print error and exit
if image_request.status_code != 200:
print("Error: " + image_request.json()["error"]["message"], file=log_file)
return False
# Optimise the image by reducing it to max width of 2048px
image = Image.open(BytesIO(image_request.content))
#Save a copy of the original image
image.save("image_original.jpg")
if image.width > 2048:
image = image.resize((2048, int(image.height * (2048 / image.width))), Image.Resampling.LANCZOS)
# Apply JPEG compression
image = image.convert('RGB')
image.save("image.jpg", optimize=True, quality=90)
# If the image is larger than the original, use the original
if os.path.getsize("image.jpg") > os.path.getsize("image_original.jpg"):
os.remove("image.jpg")
os.rename("image_original.jpg", "image.jpg")
# Submit a /drive/files/create request to Misskey
create_file_request = requests.post(self.misskey_url + "drive/files/create", data = {"name": os.path.split(image_url)[-1], "i": self.misskey_token, "isSensitive": str(image_rating != 'general').lower()}, files = {"file": open("image.jpg", "rb")})
# If error, print error and exit
if create_file_request.status_code != 200:
print("Error: " + create_file_request.json()["error"]["message"], file=log_file)
return False
else:
upload_from_url_request = requests.post(self.misskey_url + "drive/files/upload-from-url", json = {"url": image_url, "isSensitive": image_rating != 'general', "i": self.misskey_token})
# If error, print error and exit
if upload_from_url_request.status_code != 204 and upload_from_url_request.status_code != 200:
print("Error: " + upload_from_url_request.json()["error"]["message"], file=log_file)
return False
# Wait for the image to be uploaded
time.sleep(1)
attempts = 0
while True:
# Get the file ID using the /drive/files/find request
file_id_request = requests.post(self.misskey_url + "drive/files/find", json = {"name": os.path.split(image_url)[-1], "i": self.misskey_token})
# If error, print error and exit
if file_id_request.status_code != 200:
print("Error: " + file_id_request.json()["error"]["message"], file=log_file)
return False
file_id_json = file_id_request.json()
if len(file_id_json) > 0:
file_id = file_id_json[0]["id"]
break
if attempts > 10:
print("Error: Image not uploaded", file=log_file)
return False
# If the image hasn't been uploaded after 10 attempts, exit
attempts += 1
# Wait and try again
time.sleep(min(30, (attempts ** 2) / 2))
# Try to determine if the image_src is a fediverse link, if so renote it instead of posting a new note
post_request = requests.post(self.misskey_url + "ap/show", json = {"uri": image_src, "i": self.misskey_token})
if post_request.status_code == 200:
post_json = post_request.json()
if 'id' in post_json:
# Submit a /notes/create request to Misskey
msg = self.bot_message
if random.randint(0, 100) < 5:
msg += " " + self.bot_hashtags
create_note_request = requests.post(self.misskey_url + "notes/create", json = {"renoteId": post_json['id'], "text": "%s\nURL: %s\n" % (msg, image_src), "i": self.misskey_token})
# If error, print error and exit
if create_note_request.status_code != 200:
print("Error: " + create_note_request.json()["error"]["message"], file=log_file)
return True
# Submit a /notes/create request to Misskey
msg = self.bot_message
if random.randint(0, 100) < 5:
msg += " " + self.bot_hashtags
create_note_request = requests.post(self.misskey_url + "notes/create", json = {"fileIds": [file_id], "text": "%s\nURL: %s\n" % (msg, image_src), "i": self.misskey_token})
# If error, print error and exit
if create_note_request.status_code != 200:
print("Error: " + create_note_request.json()["error"]["message"], file=log_file)
return True
def bot_process(self, log_file):
# Get a random image making sure it's not in the saved image list
while True:
image_url, image_src, image_rating, cur_page_number = self.get_random_image(max_page_number=self.max_page_number)
if cur_page_number < self.max_page_number:
self.max_page_number = cur_page_number
if image_url is None:
continue
break
# Download and post the image to Misskey
self.post_image(image_url, image_src, image_rating, log_file)
def generate_config(defaults):
if os.path.exists("config.json"):
with open("config.json", "r") as config_file:
config = json.load(config_file)
else:
config = {}
config['bot_name'] = {
'gelbooru_tags': defaults['gelbooru_tags'],
'gelbooru_tags_exclude': defaults['gelbooru_tags_exclude'],
'bot_message': defaults['bot_message'],
'bot_hashtags': defaults['bot_hashtags'],
'misskey_url': defaults['misskey_url'],
'misskey_token': defaults['misskey_token'],
'max_page_number': defaults['max_page_number'],
'last_run_time': -1,
}
with open("config.json", "w") as config_file:
json.dump(config, config_file, indent=4)
def generate_defaults():
if os.path.exists("defaults.json"):
with open("defaults.json", "r") as config_file:
config = json.load(config_file)
else:
config = {}
config['gelbooru_tags'] = 'rating:safe'
config['gelbooru_tags_exclude'] = ''
config['bot_message'] = 'Random image from Gelbooru'
config['bot_hashtags'] = '#gelbooru #random'
config['misskey_url'] = 'https://misskey.example.com/'
config['misskey_token'] = ''
config['max_page_number'] = 1000
with open("defaults.json", "w") as config_file:
json.dump(config, config_file, indent=4)
# Main function
def main():
if not os.path.exists("defaults.json"):
generate_defaults()
with open("defaults.json", "r") as config_file:
defaults = json.load(config_file)
if not os.path.exists("config.json"):
generate_config(defaults)
sys.exit(0)
# If first argument is '--gen-config', generate config.json:
if len(sys.argv) > 1 and sys.argv[1] == "--gen-config":
# Generate a config.json entry
generate_config(defaults)
elif len(sys.argv) > 1 and sys.argv[1] == "--help":
print("Usage: python3 gelbooru-bot.py [--gen-config] [--help]")
print(" --gen-config: Add a new bot to the config.json file")
print(" --help: Show this help message")
print(" No arguments: Run the bot")
print(" Note: The values in defaults.json will be used if the values are not set in config.json")
else:
# Load set of configs to run from json config
with open("config.json", "r") as config_file:
config = json.load(config_file)
# Create and run bot instances for each config in config.json
with open('log.txt', 'a') as log_file:
for cfg_name in config:
# Set missing config values to defaults
cfg_tmp = copy.deepcopy(config[cfg_name])
for key in defaults:
if key not in cfg_tmp:
cfg_tmp[key] = defaults[key]
if cfg_tmp['last_run_time'] == -1 or cfg_tmp['last_run_time'] > time.time() + 60 * 60: # If last run time is in the future, set it to 1 hour ago
cfg_tmp['last_run_time'] = time.time() - 60 * 60
if cfg_tmp['last_run_time'] != -1 and time.time() - cfg_tmp['last_run_time'] < 60 * 60:
continue
try:
bot_instance = BotInstance(cfg_name, cfg_tmp)
bot_instance.bot_process(log_file)
# Save the saved image list to config.json
config[cfg_name]["max_page_number"] = bot_instance.max_page_number
# Save the last run time
config[cfg_name]["last_run_time"] = time.time()
# If error, print error and continue
except Exception as e:
traceback.print_exc(file=log_file)
continue
# Save the saved image list to config.json
with open("config.json", "w") as config_file:
json.dump(config, config_file, indent=4)
# Run main function
if __name__ == "__main__":
main()