#!/usr/bin/env python3 import os import sys import json from typing import Optional import requests import argparse BASE_URL = "https://wiki.mako.cc/api.php" def login(session: requests.Session) -> (str, str): bot_username = os.environ['WIKI_USERNAME'] bot_password = os.environ['WIKI_PASSWORD'] # Step 1: Retrieve a login token resp = session.get( BASE_URL, params={ "action": "query", "meta": "tokens", "type": "login", "format": "json", }, ) resp.raise_for_status() login_token = resp.json()['query']['tokens']['logintoken'] assert login_token # Step 2: Send a post request to login. Use of main account for login is not # supported. Obtain credentials via Special:BotPasswords # (https://www.mediawiki.org/wiki/Special:BotPasswords) for lgname & lgpassword resp = session.post( BASE_URL, data={ "action": "login", "lgname": bot_username, "lgpassword": bot_password, "format": "json", "lgtoken": login_token, }, ) resp.raise_for_status() # Step 3: While logged in, retrieve a CSRF token resp = session.get( BASE_URL, params={ "action": "query", "meta":"tokens", "format":"json" }, ) resp.raise_for_status() csrf_token = resp.json()['query']['tokens']['csrftoken'] assert csrf_token return (login_token, csrf_token) def get_imageinfo(title: str, session: requests.Session) -> Optional[dict]: """Returns a dict, or None if image not found""" assert not title.startswith("File:") # Does not require authentication resp = session.get( BASE_URL, params={ "action": "query", "format": "json", "prop": "imageinfo", "titles": f"File:{title}" }, ) resp.raise_for_status() pages = resp.json()['query']['pages'] if "-1" in pages: return None else: return list(pages.values())[0] def reupload_post(args): # parse metadata base_path = args.json_file.replace('.json', '') meta = json.loads(open(args.json_file, 'r').read())['node'] #print(meta) shortcode = meta['shortcode'] caption = meta['edge_media_to_caption']['edges'][0]['node']['text'] if meta.get('edge_sidecar_to_children'): image_count = len(meta['edge_sidecar_to_children']['edges']) else: image_count = 1 if image_count != 1: raise NotImplementedError() jpeg_path = base_path + ".jpg" date_part = base_path.split('/')[1].split('_')[0].replace('-', '') time_part = base_path.split('/')[1].split('_')[1].replace('-', '')[:4] remote_name = f"CEQD_{date_part}_{time_part}_1.jpg" page_text = f"""[[Category:Center for Extraordinary Quarantine Dining]] == Summary == {caption} Imported from ELS instagram: https://www.instagram.com/p/{shortcode}/ """ session = requests.Session() (login_token, csrf_token) = login(session) # First, check if file already exists existing = get_imageinfo(remote_name, session) if existing: print(json.dumps(existing, sort_keys=True, indent=2)) sys.exit(-1) # If it doesn't, upload it! print(f"Uploading {remote_name}") resp = session.post( BASE_URL, data={ "action": "upload", "filename": remote_name, "text": page_text, "format": "json", "token": csrf_token, "ignorewarnings": 1 }, files={ 'file': ('filename.jpg', open(jpeg_path, 'rb'), 'multipart/form-data'), }, ) resp.raise_for_status() print(json.dumps(resp.json(), sort_keys=True, indent=2) def main(): parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument( 'json_file', help="JSON metadata file", type=str, #type=argparse.FileType('r'), ) args = parser.parse_args() reupload_post(args) if __name__ == "__main__": main()