aboutsummaryrefslogtreecommitdiffstats
path: root/reupload.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2020-04-22 10:48:56 -0700
committerBryan Newbold <bnewbold@archive.org>2020-04-22 10:48:56 -0700
commit1f0b8be3756cc52f3c911735dbc15f4063e133d2 (patch)
tree5b20316467b2a66fa3ef7670f45baf7a1116ed05 /reupload.py
downloadels-instawiki-1f0b8be3756cc52f3c911735dbc15f4063e133d2.tar.gz
els-instawiki-1f0b8be3756cc52f3c911735dbc15f4063e133d2.zip
init repo with work-in-progress script
Diffstat (limited to 'reupload.py')
-rwxr-xr-xreupload.py157
1 files changed, 157 insertions, 0 deletions
diff --git a/reupload.py b/reupload.py
new file mode 100755
index 0000000..94fbbe3
--- /dev/null
+++ b/reupload.py
@@ -0,0 +1,157 @@
+#!/usr/bin/env python3
+
+import os
+import sys
+import json
+from typing import Optional
+import requests
+import argparse
+
+BASE_URL = "https://wiki.mako.cc/api.php"
+
+
+def login(session: requests.Session) -> (str, str):
+
+ bot_username = os.environ['WIKI_USERNAME']
+ bot_password = os.environ['WIKI_PASSWORD']
+
+ # Step 1: Retrieve a login token
+ resp = session.get(
+ BASE_URL,
+ params={
+ "action": "query",
+ "meta": "tokens",
+ "type": "login",
+ "format": "json",
+ },
+ )
+
+ resp.raise_for_status()
+ login_token = resp.json()['query']['tokens']['logintoken']
+ assert login_token
+
+ # Step 2: Send a post request to login. Use of main account for login is not
+ # supported. Obtain credentials via Special:BotPasswords
+ # (https://www.mediawiki.org/wiki/Special:BotPasswords) for lgname & lgpassword
+ resp = session.post(
+ BASE_URL,
+ data={
+ "action": "login",
+ "lgname": bot_username,
+ "lgpassword": bot_password,
+ "format": "json",
+ "lgtoken": login_token,
+ },
+ )
+
+ resp.raise_for_status()
+
+ # Step 3: While logged in, retrieve a CSRF token
+ resp = session.get(
+ BASE_URL,
+ params={
+ "action": "query",
+ "meta":"tokens",
+ "format":"json"
+ },
+ )
+
+ resp.raise_for_status()
+ csrf_token = resp.json()['query']['tokens']['csrftoken']
+ assert csrf_token
+
+ return (login_token, csrf_token)
+
+def get_imageinfo(title: str, session: requests.Session) -> Optional[dict]:
+ """Returns a dict, or None if image not found"""
+
+ assert not title.startswith("File:")
+ # Does not require authentication
+ resp = session.get(
+ BASE_URL,
+ params={
+ "action": "query",
+ "format": "json",
+ "prop": "imageinfo",
+ "titles": f"File:{title}"
+ },
+ )
+ resp.raise_for_status()
+ pages = resp.json()['query']['pages']
+ if "-1" in pages:
+ return None
+ else:
+ return list(pages.values())[0]
+
+def reupload_post(args):
+
+ # parse metadata
+ base_path = args.json_file.replace('.json', '')
+ meta = json.loads(open(args.json_file, 'r').read())['node']
+ #print(meta)
+ shortcode = meta['shortcode']
+ caption = meta['edge_media_to_caption']['edges'][0]['node']['text']
+
+ if meta.get('edge_sidecar_to_children'):
+ image_count = len(meta['edge_sidecar_to_children']['edges'])
+ else:
+ image_count = 1
+
+ if image_count != 1:
+ raise NotImplementedError()
+
+ jpeg_path = base_path + ".jpg"
+ date_part = base_path.split('/')[1].split('_')[0].replace('-', '')
+ time_part = base_path.split('/')[1].split('_')[1].replace('-', '')[:4]
+ remote_name = f"CEQD_{date_part}_{time_part}_1.jpg"
+ page_text = f"""[[Category:Center for Extraordinary Quarantine Dining]]
+
+== Summary ==
+{caption}
+
+Imported from ELS instagram: https://www.instagram.com/p/{shortcode}/
+ """
+
+ session = requests.Session()
+ (login_token, csrf_token) = login(session)
+
+ # First, check if file already exists
+ existing = get_imageinfo(remote_name, session)
+ if existing:
+ print(json.dumps(existing, sort_keys=True, indent=2))
+ sys.exit(-1)
+
+ # If it doesn't, upload it!
+ print(f"Uploading {remote_name}")
+ resp = session.post(
+ BASE_URL,
+ data={
+ "action": "upload",
+ "filename": remote_name,
+ "text": page_text,
+ "format": "json",
+ "token": csrf_token,
+ "ignorewarnings": 1
+ },
+ files={
+ 'file': ('filename.jpg', open(jpeg_path, 'rb'), 'multipart/form-data'),
+ },
+ )
+ resp.raise_for_status()
+ print(json.dumps(resp.json(), sort_keys=True, indent=2)
+
+def main():
+ parser = argparse.ArgumentParser(
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+ parser.add_argument(
+ 'json_file',
+ help="JSON metadata file",
+ type=str,
+ #type=argparse.FileType('r'),
+ )
+ args = parser.parse_args()
+
+ reupload_post(args)
+
+if __name__ == "__main__":
+ main()