aboutsummaryrefslogtreecommitdiffstats
path: root/postgrest/backfill
diff options
context:
space:
mode:
Diffstat (limited to 'postgrest/backfill')
-rw-r--r--postgrest/backfill/backfill.md117
-rwxr-xr-xpostgrest/backfill/backfill_cdx.py131
-rwxr-xr-xpostgrest/backfill/backfill_file_meta.py55
-rwxr-xr-xpostgrest/backfill/backfill_grobid.py91
-rwxr-xr-xpostgrest/backfill/filter_transform_cdx.py88
-rwxr-xr-xpostgrest/backfill/petabox_transform.py24
6 files changed, 506 insertions, 0 deletions
diff --git a/postgrest/backfill/backfill.md b/postgrest/backfill/backfill.md
new file mode 100644
index 0000000..f97c901
--- /dev/null
+++ b/postgrest/backfill/backfill.md
@@ -0,0 +1,117 @@
+
+SQL Backfill Notes
+-----------------------
+
+GROBID is going to be somewhat complex.
+
+TODO:
+x CDX backfill script (CDX to postgresql direct, bulk inserts, python)
+x `file_meta` bulk insert (TSV to postgresql direct, bulk upserts, python)
+x GROBID insert (python, dump TSV to minio then postgresql)
+
+## `cdx`
+
+ #cat example.cdx | rg ' 200 ' | cut -d' ' -f2,3,4,6,9,10,11
+ #cat example.cdx | rg ' 200 ' | awk '{print $6 "\t" $3 "\t" $2 "\t" $4 "\t\t" $6 "\t" $11 "\t" $9 "\t" $10}' | b32_hex.py | awk '{print $2 "\t" $3 "\t" $4 "\t" $1 "\t" $6 "\t" $7 "\t" $8}' > cdx.example.tsv
+ cat example.cdx | ./filter_transform_cdx.py > cdx.example.tsv
+
+ COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.example.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL '');
+
+Big HDFS import:
+
+ # but actually didn't import those; don't want to need to re-import
+ hdfs dfs -get journal_crawl_cdx/*
+
+ cat citeseerx_crawl_2017.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.citeseerx_crawl_2017.tsv
+ cat gwb-pdf-20171227034923-surt-filter/* | rg ' 200 ' | ./filter_transform_cdx.py > gwb-pdf-20171227034923-surt-filter.tsv
+ cat UNPAYWALL-PDF-CRAWL-2018-07.filtered.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.UNPAYWALL-PDF-CRAWL-2018-07.filtered.tsv
+ cat MSAG-PDF-CRAWL-2017.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.MSAG-PDF-CRAWL-2017.tsv
+
+ cat CORE-UPSTREAM-CRAWL-2018-11.sorted.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.CORE-UPSTREAM-CRAWL-2018-11.sorted.tsv
+ cat DIRECT-OA-CRAWL-2019.pdfs.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.DIRECT-OA-CRAWL-2019.pdfs.tsv
+ cat DOI-LANDING-CRAWL-2018-06.200_pdf.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.DOI-LANDING-CRAWL-2018-06.200_pdf.tsv
+ cat OA-JOURNAL-TESTCRAWL-TWO-2018.pdf.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.OA-JOURNAL-TESTCRAWL-TWO-2018.pdf.tsv
+ cat SEMSCHOLAR-PDF-CRAWL-2017.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.SEMSCHOLAR-PDF-CRAWL-2017.tsv
+ cat TARGETED-PDF-CRAWL-2017.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.TARGETED-PDF-CRAWL-2017.tsv
+ cat UNPAYWALL-PDF-CRAWL-2019-04.pdfs_sorted.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.UNPAYWALL-PDF-CRAWL-2019-04.pdfs_sorted.tsv
+
+TODO: nasty escaping?
+
+In psql:
+
+ COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.UNPAYWALL-PDF-CRAWL-2018-07.filtered.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL '');
+ # ERROR
+ COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.MSAG-PDF-CRAWL-2017.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL '');
+ # ERROR
+ COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.citeseerx_crawl_2017.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL '');
+ # COPY 1653840
+ COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.CORE-UPSTREAM-CRAWL-2018-11.sorted.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL '');
+ # COPY 2827563
+ COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.DIRECT-OA-CRAWL-2019.pdfs.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL '');
+ # COPY 10651736
+ COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.DOI-LANDING-CRAWL-2018-06.200_pdf.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL '');
+ # COPY 768565
+ COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.OA-JOURNAL-TESTCRAWL-TWO-2018.pdf.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL '');
+ # COPY 5310017
+ COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.SEMSCHOLAR-PDF-CRAWL-2017.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL '');
+ # COPY 2219839
+ COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.TARGETED-PDF-CRAWL-2017.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL '');
+ # ERROR
+ COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.UNPAYWALL-PDF-CRAWL-2019-04.pdfs_sorted.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL '');
+ # ERROR
+
+NOTE: these largely didn't work; will need to write a batch importer.
+
+Batch import process:
+
+ cat UNPAYWALL-PDF-CRAWL-2018-07.filtered.cdx MSAG-PDF-CRAWL-2017.cdx TARGETED-PDF-CRAWL-2017.cdx UNPAYWALL-PDF-CRAWL-2019-04.pdfs_sorted.cdx | ./backfill_cdx.py
+
+## `fatcat_file`
+
+ zcat file_export.2019-07-07.json.gz | pv -l | jq -r 'select(.sha1 != null) | [.sha1, .ident, .release_ids[0]] | @tsv' | sort -S 8G | uniq -w 40 > /sandcrawler-db/backfill/fatcat_file.2019-07-07.tsv
+
+In psql:
+
+ COPY fatcat_file FROM '/sandcrawler-db/backfill/fatcat_file.2019-07-07.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL '');
+ # => COPY 24727350
+
+## `file_meta`
+
+ zcat /fast/download/file_export.2019-07-07.json.gz | pv -l | jq -r 'select(.md5 != null) | [.sha1, .sha256, .md5, .size, .mimetype] | @tsv' | sort -S 8G | uniq -w 40 > /sandcrawler-db/backfill/file_meta.2019-07-07.tsv
+
+In psql:
+
+ COPY file_meta FROM '/sandcrawler-db/backfill/file_meta.2019-07-07.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL '');
+ # -> COPY 5860092
+
+## `petabox`
+
+ zcat /fast/download/file_export.2019-07-07.json.gz | rg '//archive.org/' | pigz > /fast/download/file_export.2019-07-07.petabox.json.gz
+ zcat /fast/download/file_export.2019-07-07.petabox.json.gz | ./petabox_transform.py | sort -u -S 8G | awk '{print $3 "\t" $1 "\t" $2}' | uniq -s40 | awk '{print $2 "\t" $3 "\t" $1}' > petabox.fatcat_2019-07-07.tsv
+
+In psql:
+
+ COPY petabox FROM '/sandcrawler-db/backfill/petabox.fatcat_2019-07-07.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL '');
+ # -> COPY 2887834
+
+## `grobid`
+
+Quick test:
+
+ zcat /bigger/unpaywall-transfer/2019-07-17-1741.30-dumpgrobidxml/part-00000.gz | cut -f2 | head | ./backfill_grobid.py
+
+Run big batch:
+
+ ls /bigger/unpaywall-transfer/2019-07-17-1741.30-dumpgrobidxml/part*gz |arallel --progress -j8 'zcat {} | cut -f2 | ./backfill_grobid.py'
+
+## rough table sizes
+
+ table_name | table_size | indexes_size | total_size
+ --------------------------------------------------------------+------------+--------------+------------
+ "public"."cdx" | 11 GB | 8940 MB | 20 GB
+ "public"."shadow" | 8303 MB | 7205 MB | 15 GB
+ "public"."fatcat_file" | 5206 MB | 2094 MB | 7300 MB
+ "public"."file_meta" | 814 MB | 382 MB | 1196 MB
+ "public"."petabox" | 403 MB | 594 MB | 997 MB
+ [...]
+
diff --git a/postgrest/backfill/backfill_cdx.py b/postgrest/backfill/backfill_cdx.py
new file mode 100755
index 0000000..1c452ca
--- /dev/null
+++ b/postgrest/backfill/backfill_cdx.py
@@ -0,0 +1,131 @@
+#!/usr/bin/env python3
+"""
+This is a "one-time" tranform helper script for CDX backfill into sandcrawler
+postgresql.
+
+Most of this file was copied from '../python/common.py'.
+"""
+
+import json, os, sys, collections
+import base64
+import psycopg2
+import psycopg2.extras
+
+NORMAL_MIME = (
+ 'application/pdf',
+ 'application/postscript',
+ 'text/html',
+ 'text/xml',
+)
+
+def normalize_mime(raw):
+ raw = raw.lower()
+ for norm in NORMAL_MIME:
+ if raw.startswith(norm):
+ return norm
+
+ # Special cases
+ if raw.startswith('application/xml'):
+ return 'text/xml'
+ if raw.startswith('application/x-pdf'):
+ return 'application/pdf'
+ return None
+
+
+def test_normalize_mime():
+ assert normalize_mime("asdf") is None
+ assert normalize_mime("application/pdf") == "application/pdf"
+ assert normalize_mime("application/pdf+journal") == "application/pdf"
+ assert normalize_mime("Application/PDF") == "application/pdf"
+ assert normalize_mime("application/p") is None
+ assert normalize_mime("application/xml+stuff") == "text/xml"
+ assert normalize_mime("application/x-pdf") == "application/pdf"
+ assert normalize_mime("application/x-html") is None
+
+def b32_hex(s):
+ s = s.strip().split()[0].lower()
+ if s.startswith("sha1:"):
+ s = s[5:]
+ if len(s) != 32:
+ return s
+ return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8')
+
+
+def parse_cdx_line(raw_cdx):
+
+ cdx = raw_cdx.split()
+ if len(cdx) < 11:
+ return None
+
+ surt = cdx[0]
+ dt = cdx[1]
+ url = cdx[2]
+ mime = normalize_mime(cdx[3])
+ http_status = cdx[4]
+ key = cdx[5]
+ c_size = cdx[8]
+ offset = cdx[9]
+ warc = cdx[10]
+
+ if not (key.isalnum() and c_size.isdigit() and offset.isdigit()
+ and http_status == "200" and len(key) == 32 and dt.isdigit()
+ and mime != None):
+ return None
+
+ if '-' in (surt, dt, url, mime, http_status, key, c_size, offset, warc):
+ return None
+
+ # these are the new/specific bits
+ sha1 = b32_hex(key)
+ return dict(url=url, datetime=dt, sha1hex=sha1, cdx_sha1hex=None, mimetype=mime, warc_path=warc, warc_csize=int(c_size), warc_offset=int(offset))
+
+def insert(cur, batch):
+ sql = """
+ INSERT INTO
+ cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset)
+ VALUES %s
+ ON CONFLICT ON CONSTRAINT cdx_pkey DO NOTHING
+ RETURNING 1;
+ """
+ batch = [(d['url'], d['datetime'], d['sha1hex'], d['mimetype'],
+ d['warc_path'], d['warc_csize'], d['warc_offset'])
+ for d in batch]
+ res = psycopg2.extras.execute_values(cur, sql, batch) # fetch=True
+ #return len(res)
+
+def stdin_to_pg():
+ # no host means it will use local domain socket by default
+ conn = psycopg2.connect(database="sandcrawler", user="postgres")
+ cur = conn.cursor()
+ counts = collections.Counter({'total': 0})
+ batch = []
+ for l in sys.stdin:
+ l = l.strip()
+ if counts['raw_lines'] > 0 and counts['raw_lines'] % 10000 == 0:
+ print("Progress: {}...".format(counts))
+ counts['raw_lines'] += 1
+ if not l:
+ continue
+ info = parse_cdx_line(l)
+ if not info:
+ continue
+ batch.append(info)
+ counts['total'] += 1
+ if len(batch) >= 1000:
+ insert(cur, batch)
+ conn.commit()
+ #counts['inserted'] += i
+ #counts['existing'] += len(batch) - i
+ batch = []
+ counts['batches'] += 1
+ if batch:
+ insert(cur, batch)
+ #counts['inserted'] += i
+ #counts['existing'] += len(batch) - i
+ batch = []
+ conn.commit()
+ cur.close()
+ print("Done: {}".format(counts))
+
+if __name__=='__main__':
+ stdin_to_pg()
diff --git a/postgrest/backfill/backfill_file_meta.py b/postgrest/backfill/backfill_file_meta.py
new file mode 100755
index 0000000..e3b40a0
--- /dev/null
+++ b/postgrest/backfill/backfill_file_meta.py
@@ -0,0 +1,55 @@
+#!/usr/bin/env python3
+"""
+This is a "one-time" tranform helper script for file_meta backfill into
+sandcrawler postgresql.
+
+Most of this file was copied from '../python/common.py'.
+"""
+
+import json, os, sys, collections
+import psycopg2
+import psycopg2.extras
+
+
+def insert(cur, batch):
+ sql = """
+ INSERT INTO
+ file_meta
+ VALUES %s
+ ON CONFLICT DO NOTHING;
+ """
+ res = psycopg2.extras.execute_values(cur, sql, batch)
+
+def stdin_to_pg():
+ # no host means it will use local domain socket by default
+ conn = psycopg2.connect(database="sandcrawler", user="postgres")
+ cur = conn.cursor()
+ counts = collections.Counter({'total': 0})
+ batch = []
+ for l in sys.stdin:
+ if counts['raw_lines'] > 0 and counts['raw_lines'] % 10000 == 0:
+ print("Progress: {}...".format(counts))
+ counts['raw_lines'] += 1
+ if not l.strip():
+ continue
+ info = l.split("\t")
+ if not info:
+ continue
+ assert len(info) == 5
+ info[-1] = info[-1].strip() or None
+ batch.append(info)
+ counts['total'] += 1
+ if len(batch) >= 1000:
+ insert(cur, batch)
+ conn.commit()
+ batch = []
+ counts['batches'] += 1
+ if batch:
+ insert(cur, batch)
+ batch = []
+ conn.commit()
+ cur.close()
+ print("Done: {}".format(counts))
+
+if __name__=='__main__':
+ stdin_to_pg()
diff --git a/postgrest/backfill/backfill_grobid.py b/postgrest/backfill/backfill_grobid.py
new file mode 100755
index 0000000..08fad7f
--- /dev/null
+++ b/postgrest/backfill/backfill_grobid.py
@@ -0,0 +1,91 @@
+#!/usr/bin/env python3
+"""
+This is a "one-time" tranform helper script for GROBID backfill into
+sandcrawler minio and postgresql.
+"""
+
+import json, os, sys, collections, io
+import base64
+import requests
+from minio import Minio
+import psycopg2
+import psycopg2.extras
+
+
+def b32_hex(s):
+ s = s.strip().split()[0].lower()
+ if s.startswith("sha1:"):
+ s = s[5:]
+ if len(s) != 32:
+ return s
+ return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8')
+
+def insert(cur, batch):
+ sql = """
+ INSERT INTO
+ grobid (sha1hex, grobid_version, status_code, status, fatcat_release, metadata)
+ VALUES %s
+ ON CONFLICT DO NOTHING;
+ """
+ batch = [(d['sha1hex'], d['grobid_version'], d['status_code'], d['status'], d['fatcat_release'], d['metadata'])
+ for d in batch]
+ res = psycopg2.extras.execute_values(cur, sql, batch)
+
+def stdin_to_pg():
+ mc = Minio('localhost:9000',
+ access_key=os.environ['MINIO_ACCESS_KEY'],
+ secret_key=os.environ['MINIO_SECRET_KEY'],
+ secure=False)
+ # no host means it will use local domain socket by default
+ conn = psycopg2.connect(database="sandcrawler", user="postgres")
+ cur = conn.cursor()
+ counts = collections.Counter({'total': 0})
+ batch = []
+ for l in sys.stdin:
+ if counts['raw_lines'] > 0 and counts['raw_lines'] % 10000 == 0:
+ print("Progress: {}...".format(counts))
+ counts['raw_lines'] += 1
+ l = l.strip()
+ if not l:
+ continue
+ row = json.loads(l)
+ if not row:
+ continue
+ sha1hex = b32_hex(row['pdf_hash'])
+ grobid_xml = row['tei_xml'].encode('utf-8')
+ grobid_xml_len = len(grobid_xml)
+ grobid_xml = io.BytesIO(grobid_xml)
+
+ key = "{}/{}/{}.tei.xml".format(
+ sha1hex[0:2],
+ sha1hex[2:4],
+ sha1hex)
+ mc.put_object("grobid", key, grobid_xml, grobid_xml_len,
+ content_type="application/tei+xml",
+ metadata=None)
+ counts['minio-success'] += 1
+
+ info = dict(
+ sha1hex=sha1hex,
+ grobid_version=None, # TODO
+ status_code=200,
+ status=None,
+ fatcat_release=None,
+ metadata=None,
+ )
+ batch.append(info)
+ counts['total'] += 1
+ if len(batch) >= 1000:
+ insert(cur, batch)
+ conn.commit()
+ batch = []
+ counts['batches'] += 1
+ if batch:
+ insert(cur, batch)
+ batch = []
+ conn.commit()
+ cur.close()
+ print("Done: {}".format(counts))
+
+if __name__=='__main__':
+ stdin_to_pg()
diff --git a/postgrest/backfill/filter_transform_cdx.py b/postgrest/backfill/filter_transform_cdx.py
new file mode 100755
index 0000000..3507dfc
--- /dev/null
+++ b/postgrest/backfill/filter_transform_cdx.py
@@ -0,0 +1,88 @@
+#!/usr/bin/env python3
+"""
+This is a "one-time" tranform helper script for CDX backfill into sandcrawler
+postgresql.
+
+Most of this file was copied from '../python/common.py'.
+"""
+
+import json, os, sys
+import base64
+
+NORMAL_MIME = (
+ 'application/pdf',
+ 'application/postscript',
+ 'text/html',
+ 'text/xml',
+)
+
+def normalize_mime(raw):
+ raw = raw.lower()
+ for norm in NORMAL_MIME:
+ if raw.startswith(norm):
+ return norm
+
+ # Special cases
+ if raw.startswith('application/xml'):
+ return 'text/xml'
+ if raw.startswith('application/x-pdf'):
+ return 'application/pdf'
+ return None
+
+
+def test_normalize_mime():
+ assert normalize_mime("asdf") is None
+ assert normalize_mime("application/pdf") == "application/pdf"
+ assert normalize_mime("application/pdf+journal") == "application/pdf"
+ assert normalize_mime("Application/PDF") == "application/pdf"
+ assert normalize_mime("application/p") is None
+ assert normalize_mime("application/xml+stuff") == "text/xml"
+ assert normalize_mime("application/x-pdf") == "application/pdf"
+ assert normalize_mime("application/x-html") is None
+
+def b32_hex(s):
+ s = s.strip().split()[0].lower()
+ if s.startswith("sha1:"):
+ s = s[5:]
+ if len(s) != 32:
+ return s
+ return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8')
+
+
+def parse_cdx_line(raw_cdx):
+
+ cdx = raw_cdx.split()
+ if len(cdx) < 11:
+ return None
+
+ surt = cdx[0]
+ dt = cdx[1]
+ url = cdx[2]
+ mime = normalize_mime(cdx[3])
+ http_status = cdx[4]
+ key = cdx[5]
+ c_size = cdx[8]
+ offset = cdx[9]
+ warc = cdx[10]
+
+ if not (key.isalnum() and c_size.isdigit() and offset.isdigit()
+ and http_status == "200" and len(key) == 32 and dt.isdigit()
+ and mime != None):
+ return None
+
+ if '-' in (surt, dt, url, mime, http_status, key, c_size, offset, warc):
+ return None
+
+ # these are the new/specific bits
+ sha1 = b32_hex(key)
+ return dict(url=url, datetime=dt, sha1hex=sha1, cdx_sha1hex=None, mimetype=mime, warc_path=warc, warc_csize=int(c_size), warc_offset=int(offset))
+
+for l in sys.stdin:
+ l = l.strip()
+ if not l:
+ continue
+ info = parse_cdx_line(l)
+ if not info:
+ continue
+ print("\t".join([info['url'], info['datetime'], info['sha1hex'], info['mimetype'], info['warc_path'], str(info['warc_csize']), str(info['warc_offset'])]))
+
diff --git a/postgrest/backfill/petabox_transform.py b/postgrest/backfill/petabox_transform.py
new file mode 100755
index 0000000..b638911
--- /dev/null
+++ b/postgrest/backfill/petabox_transform.py
@@ -0,0 +1,24 @@
+#!/usr/bin/env python3
+
+import json, sys, os
+
+for l in sys.stdin.readlines():
+ l = l.strip()
+ if not l:
+ continue
+ r = json.loads(l)
+ if not r['sha1']:
+ continue
+ sha1hex = r['sha1']
+ for url in r['urls']:
+ u = url['url']
+ if not '//archive.org/' in u:
+ continue
+ u = u.split('/')
+ if u[2] == 'web.archive.org':
+ continue
+ #print(u)
+ assert u[2] == 'archive.org' and u[3] in ('download', 'serve')
+ item = u[4]
+ path = '/'.join(u[5:])
+ print("\t".join([item, path, sha1hex]))