diff options
author | Bryan Newbold <bnewbold@archive.org> | 2019-09-23 23:00:23 -0700 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2019-09-23 23:00:23 -0700 |
commit | b362abd38ad4a6624bc056c58eb90ae235c63f00 (patch) | |
tree | 026556fe548e28be1556c24b6ee865cb276755ca /postgrest | |
parent | b438f52dbb7578c9a5c2153bc4ba50e33fdae7c3 (diff) | |
download | sandcrawler-b362abd38ad4a6624bc056c58eb90ae235c63f00.tar.gz sandcrawler-b362abd38ad4a6624bc056c58eb90ae235c63f00.zip |
rename postgrest directory sql
Diffstat (limited to 'postgrest')
-rw-r--r-- | postgrest/README.md | 126 | ||||
-rw-r--r-- | postgrest/backfill/backfill.md | 135 | ||||
-rwxr-xr-x | postgrest/backfill/backfill_cdx.py | 131 | ||||
-rwxr-xr-x | postgrest/backfill/backfill_file_meta.py | 55 | ||||
-rwxr-xr-x | postgrest/backfill/backfill_grobid.py | 91 | ||||
-rwxr-xr-x | postgrest/backfill/backfill_grobid_unpaywall.py | 59 | ||||
-rwxr-xr-x | postgrest/backfill/filter_transform_cdx.py | 88 | ||||
-rwxr-xr-x | postgrest/backfill/petabox_transform.py | 24 | ||||
-rw-r--r-- | postgrest/sandcrawler_schema.sql | 59 |
9 files changed, 0 insertions, 768 deletions
diff --git a/postgrest/README.md b/postgrest/README.md deleted file mode 100644 index b171614..0000000 --- a/postgrest/README.md +++ /dev/null @@ -1,126 +0,0 @@ - -TL;DR: replace hbase with postgresql tables with REST API (http://postgrest.org) - -No primary storage of anything in this table. Everything should be rapidly -re-creatable from dumps, kafka topics (compressed), CDX, petabox metadata, etc. -This is a secondary view on all of that. - -## Schema - - schema/database name is 'sandcrawler' - - cdx: include revisits or not? - id: int64, PK - sha1hex: string, not null, index - cdx_sha1hex: string - url: string, not null - datetime: ISO 8601:1988 (string?), not null - mimetype: string - warc_path: string (item and filename) - warc_offset: i64 - created: datetime, index (?) - ?crawl: string - ?domain: string - - file_meta - sha1hex, string, PK - md5hex: string - sha256hex: string - size_bytes: i64 - mime: string (verifying file status; optional for now?) - - fatcat_file - sha1hex: string, PK - file_ident: string, index? - release_ident: ? - - petabox - id: int64, PK - sha1hex: string, notnull, index - item: string, notnull - path: string, notnull (TODO: URL encoded? separate sub-archive path?) - - grobid - sha1hex: string, PK - updated: datetime - grobid_version (string) - status_code: i32 - status: string (JSONB?), only if status != 200 - metadata: JSONB, title, first author, year (not for now?) - glutton_fatcat_release: string, index - - shadow - sha1hex: string, PK - shadow_corpus: string, PK - shadow_id: string - doi: string - pmid: string - isbn13: string - -Alternatively, to be more like existing system could have "one big table" or -multiple tables all with same key (sha1b32) and UNIQ. As is, every sha1 pk -column is 40 bytes of both index and data, or 8+ GByte (combined) for each -table with 100 million rows. using raw bytes could help, but makes all -code/queries much trickier. - -Should we have "created" or "updated" timestamps on all these columns to enable -kafka tailing? - -TODO: -- how to indicate CDX sha1 vs. true sha1 mis-match? pretty rare. recrawl and delete row from `gwb_cdx`? -- only most recent GROBID? or keep multiple versions? here and minio - -## Existing Stuff Sizes (estimates) - - 78.5G /user/bnewbold/journal_crawl_cdx - 19.7G /user/bnewbold/sandcrawler/output-prod/2018-12-14-1737.00-dumpfilemeta - 2.7G file_hashes.tsv - 228.5G /user/bnewbold/sandcrawler/output-prod/2018-09-23-0405.30-dumpgrobidmetainsertable - -## Use Cases - -Core goal here is to mostly kill hbase/hadoop. What jobs are actually used there? - -- backfill: load in-scope (fulltext) crawl results from CDX - => bulk (many line) inserts -- rowcount: "how many unique PDFs crawled?" - => trivial SQL query -- status code count: "how much GROBID progress?" - => trivial SQL query -- dumpungrobided: "what files still need to be processed" - => SQL join with a "first" on CDX side -- dumpgrobidxml: "merge CDX/file info with extracted XML, for those that were successful" - => SQL dump or rowscan, then minio fetches - -This table is generally "single file raw fulltext metadata". - -"Enrichment" jobs: - -- GROBID -- glutton (if not GROBID) -- extra file metadata -- match newly enriched files to fatcat - -What else? - -- track additional raw file metadata -- dump all basic GROBID metadata (title, authors, year) to attempt merge/match - -Questions we might want to answer - -- total size of PDF corpus (terabytes) -- unqiue files hit per domain - -## Prototype Plan - -- backfill all CDX crawl files (TSV transform?) -- load full GROBID XML (both into minio and into SQL) -- load full fatcat file dump (TSV transform) -- load dumpfilemeta - -## Example Useful Lookups - - - http get :3030/cdx?url=eq.https://coleccionables.mercadolibre.com.ar/arduino-pdf_Installments_NoInterest_BestSellers_YES - http get :3030/file_meta?sha1hex=eq.120582c855a7cc3c70a8527c560d7f27e6027278 - diff --git a/postgrest/backfill/backfill.md b/postgrest/backfill/backfill.md deleted file mode 100644 index f1a5f86..0000000 --- a/postgrest/backfill/backfill.md +++ /dev/null @@ -1,135 +0,0 @@ - -SQL Backfill Notes ------------------------ - -GROBID is going to be somewhat complex. - -TODO: -x CDX backfill script (CDX to postgresql direct, bulk inserts, python) -x `file_meta` bulk insert (TSV to postgresql direct, bulk upserts, python) -x GROBID insert (python, dump TSV to minio then postgresql) - -## `cdx` - - #cat example.cdx | rg ' 200 ' | cut -d' ' -f2,3,4,6,9,10,11 - #cat example.cdx | rg ' 200 ' | awk '{print $6 "\t" $3 "\t" $2 "\t" $4 "\t\t" $6 "\t" $11 "\t" $9 "\t" $10}' | b32_hex.py | awk '{print $2 "\t" $3 "\t" $4 "\t" $1 "\t" $6 "\t" $7 "\t" $8}' > cdx.example.tsv - cat example.cdx | ./filter_transform_cdx.py > cdx.example.tsv - - COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.example.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); - -Big HDFS import: - - # but actually didn't import those; don't want to need to re-import - hdfs dfs -get journal_crawl_cdx/* - - cat citeseerx_crawl_2017.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.citeseerx_crawl_2017.tsv - cat gwb-pdf-20171227034923-surt-filter/* | rg ' 200 ' | ./filter_transform_cdx.py > gwb-pdf-20171227034923-surt-filter.tsv - cat UNPAYWALL-PDF-CRAWL-2018-07.filtered.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.UNPAYWALL-PDF-CRAWL-2018-07.filtered.tsv - cat MSAG-PDF-CRAWL-2017.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.MSAG-PDF-CRAWL-2017.tsv - - cat CORE-UPSTREAM-CRAWL-2018-11.sorted.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.CORE-UPSTREAM-CRAWL-2018-11.sorted.tsv - cat DIRECT-OA-CRAWL-2019.pdfs.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.DIRECT-OA-CRAWL-2019.pdfs.tsv - cat DOI-LANDING-CRAWL-2018-06.200_pdf.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.DOI-LANDING-CRAWL-2018-06.200_pdf.tsv - cat OA-JOURNAL-TESTCRAWL-TWO-2018.pdf.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.OA-JOURNAL-TESTCRAWL-TWO-2018.pdf.tsv - cat SEMSCHOLAR-PDF-CRAWL-2017.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.SEMSCHOLAR-PDF-CRAWL-2017.tsv - cat TARGETED-PDF-CRAWL-2017.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.TARGETED-PDF-CRAWL-2017.tsv - cat UNPAYWALL-PDF-CRAWL-2019-04.pdfs_sorted.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.UNPAYWALL-PDF-CRAWL-2019-04.pdfs_sorted.tsv - -TODO: nasty escaping? - -In psql: - - COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.UNPAYWALL-PDF-CRAWL-2018-07.filtered.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); - # ERROR - COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.MSAG-PDF-CRAWL-2017.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); - # ERROR - COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.citeseerx_crawl_2017.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); - # COPY 1653840 - COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.CORE-UPSTREAM-CRAWL-2018-11.sorted.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); - # COPY 2827563 - COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.DIRECT-OA-CRAWL-2019.pdfs.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); - # COPY 10651736 - COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.DOI-LANDING-CRAWL-2018-06.200_pdf.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); - # COPY 768565 - COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.OA-JOURNAL-TESTCRAWL-TWO-2018.pdf.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); - # COPY 5310017 - COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.SEMSCHOLAR-PDF-CRAWL-2017.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); - # COPY 2219839 - COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.TARGETED-PDF-CRAWL-2017.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); - # ERROR - COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.UNPAYWALL-PDF-CRAWL-2019-04.pdfs_sorted.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); - # ERROR - -NOTE: these largely didn't work; will need to write a batch importer. - -Batch import process: - - cat UNPAYWALL-PDF-CRAWL-2018-07.filtered.cdx MSAG-PDF-CRAWL-2017.cdx TARGETED-PDF-CRAWL-2017.cdx UNPAYWALL-PDF-CRAWL-2019-04.pdfs_sorted.cdx | ./backfill_cdx.py - # Done: Counter({'raw_lines': 123254127, 'total': 51365599, 'batches': 51365}) - -## `fatcat_file` - - zcat file_export.2019-07-07.json.gz | pv -l | jq -r 'select(.sha1 != null) | [.sha1, .ident, .release_ids[0]] | @tsv' | sort -S 8G | uniq -w 40 > /sandcrawler-db/backfill/fatcat_file.2019-07-07.tsv - -In psql: - - COPY fatcat_file FROM '/sandcrawler-db/backfill/fatcat_file.2019-07-07.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); - # => COPY 24727350 - -## `file_meta` - - zcat /fast/download/file_export.2019-07-07.json.gz | pv -l | jq -r 'select(.md5 != null) | [.sha1, .sha256, .md5, .size, .mimetype] | @tsv' | sort -S 8G | uniq -w 40 > /sandcrawler-db/backfill/file_meta.2019-07-07.tsv - -In psql: - - COPY file_meta FROM '/sandcrawler-db/backfill/file_meta.2019-07-07.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); - # -> COPY 5860092 - -## `petabox` - - zcat /fast/download/file_export.2019-07-07.json.gz | rg '//archive.org/' | pigz > /fast/download/file_export.2019-07-07.petabox.json.gz - zcat /fast/download/file_export.2019-07-07.petabox.json.gz | ./petabox_transform.py | sort -u -S 8G | awk '{print $3 "\t" $1 "\t" $2}' | uniq -s40 | awk '{print $2 "\t" $3 "\t" $1}' > petabox.fatcat_2019-07-07.tsv - -In psql: - - COPY petabox FROM '/sandcrawler-db/backfill/petabox.fatcat_2019-07-07.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); - # -> COPY 2887834 - -## `grobid` - -Quick test: - - zcat /bigger/unpaywall-transfer/2019-07-17-1741.30-dumpgrobidxml/part-00000.gz | cut -f2 | head | ./backfill_grobid.py - -Run big batch: - - ls /bigger/unpaywall-transfer/2019-07-17-1741.30-dumpgrobidxml/part*gz | parallel --progress -j8 'zcat {} | cut -f2 | ./backfill_grobid.py' - # [...] - # Done: Counter({'minio-success': 161605, 'total': 161605, 'raw_lines': 161605, 'batches': 161}) - # [...] - -Was running slow with lots of iowait and 99% jdb2. This seems to be disk I/O. Going to try: - - sudo mount /dev/sdc1 /sandcrawler-minio/ -o data=writeback,noatime,nobarrier - - # -j8: 20+ M/s write, little jdb2 - # -j16: 30+ M/s write, little jdb2 - # -j12: 30+ M/s write, going with this - -For general use should go back to: - - sudo mount /dev/sdc1 /sandcrawler-minio/ -o data=noatime - - # -j4: Still pretty slow, only ~3-5 M/s disk write. jbd2 consistently at 99%, 360 K/s write - -## rough table sizes - - table_name | table_size | indexes_size | total_size - --------------------------------------------------------------+------------+--------------+------------ - "public"."cdx" | 11 GB | 8940 MB | 20 GB - "public"."shadow" | 8303 MB | 7205 MB | 15 GB - "public"."fatcat_file" | 5206 MB | 2094 MB | 7300 MB - "public"."file_meta" | 814 MB | 382 MB | 1196 MB - "public"."petabox" | 403 MB | 594 MB | 997 MB - [...] - diff --git a/postgrest/backfill/backfill_cdx.py b/postgrest/backfill/backfill_cdx.py deleted file mode 100755 index 1c452ca..0000000 --- a/postgrest/backfill/backfill_cdx.py +++ /dev/null @@ -1,131 +0,0 @@ -#!/usr/bin/env python3 -""" -This is a "one-time" tranform helper script for CDX backfill into sandcrawler -postgresql. - -Most of this file was copied from '../python/common.py'. -""" - -import json, os, sys, collections -import base64 -import psycopg2 -import psycopg2.extras - -NORMAL_MIME = ( - 'application/pdf', - 'application/postscript', - 'text/html', - 'text/xml', -) - -def normalize_mime(raw): - raw = raw.lower() - for norm in NORMAL_MIME: - if raw.startswith(norm): - return norm - - # Special cases - if raw.startswith('application/xml'): - return 'text/xml' - if raw.startswith('application/x-pdf'): - return 'application/pdf' - return None - - -def test_normalize_mime(): - assert normalize_mime("asdf") is None - assert normalize_mime("application/pdf") == "application/pdf" - assert normalize_mime("application/pdf+journal") == "application/pdf" - assert normalize_mime("Application/PDF") == "application/pdf" - assert normalize_mime("application/p") is None - assert normalize_mime("application/xml+stuff") == "text/xml" - assert normalize_mime("application/x-pdf") == "application/pdf" - assert normalize_mime("application/x-html") is None - -def b32_hex(s): - s = s.strip().split()[0].lower() - if s.startswith("sha1:"): - s = s[5:] - if len(s) != 32: - return s - return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8') - - -def parse_cdx_line(raw_cdx): - - cdx = raw_cdx.split() - if len(cdx) < 11: - return None - - surt = cdx[0] - dt = cdx[1] - url = cdx[2] - mime = normalize_mime(cdx[3]) - http_status = cdx[4] - key = cdx[5] - c_size = cdx[8] - offset = cdx[9] - warc = cdx[10] - - if not (key.isalnum() and c_size.isdigit() and offset.isdigit() - and http_status == "200" and len(key) == 32 and dt.isdigit() - and mime != None): - return None - - if '-' in (surt, dt, url, mime, http_status, key, c_size, offset, warc): - return None - - # these are the new/specific bits - sha1 = b32_hex(key) - return dict(url=url, datetime=dt, sha1hex=sha1, cdx_sha1hex=None, mimetype=mime, warc_path=warc, warc_csize=int(c_size), warc_offset=int(offset)) - -def insert(cur, batch): - sql = """ - INSERT INTO - cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) - VALUES %s - ON CONFLICT ON CONSTRAINT cdx_pkey DO NOTHING - RETURNING 1; - """ - batch = [(d['url'], d['datetime'], d['sha1hex'], d['mimetype'], - d['warc_path'], d['warc_csize'], d['warc_offset']) - for d in batch] - res = psycopg2.extras.execute_values(cur, sql, batch) # fetch=True - #return len(res) - -def stdin_to_pg(): - # no host means it will use local domain socket by default - conn = psycopg2.connect(database="sandcrawler", user="postgres") - cur = conn.cursor() - counts = collections.Counter({'total': 0}) - batch = [] - for l in sys.stdin: - l = l.strip() - if counts['raw_lines'] > 0 and counts['raw_lines'] % 10000 == 0: - print("Progress: {}...".format(counts)) - counts['raw_lines'] += 1 - if not l: - continue - info = parse_cdx_line(l) - if not info: - continue - batch.append(info) - counts['total'] += 1 - if len(batch) >= 1000: - insert(cur, batch) - conn.commit() - #counts['inserted'] += i - #counts['existing'] += len(batch) - i - batch = [] - counts['batches'] += 1 - if batch: - insert(cur, batch) - #counts['inserted'] += i - #counts['existing'] += len(batch) - i - batch = [] - conn.commit() - cur.close() - print("Done: {}".format(counts)) - -if __name__=='__main__': - stdin_to_pg() diff --git a/postgrest/backfill/backfill_file_meta.py b/postgrest/backfill/backfill_file_meta.py deleted file mode 100755 index e3b40a0..0000000 --- a/postgrest/backfill/backfill_file_meta.py +++ /dev/null @@ -1,55 +0,0 @@ -#!/usr/bin/env python3 -""" -This is a "one-time" tranform helper script for file_meta backfill into -sandcrawler postgresql. - -Most of this file was copied from '../python/common.py'. -""" - -import json, os, sys, collections -import psycopg2 -import psycopg2.extras - - -def insert(cur, batch): - sql = """ - INSERT INTO - file_meta - VALUES %s - ON CONFLICT DO NOTHING; - """ - res = psycopg2.extras.execute_values(cur, sql, batch) - -def stdin_to_pg(): - # no host means it will use local domain socket by default - conn = psycopg2.connect(database="sandcrawler", user="postgres") - cur = conn.cursor() - counts = collections.Counter({'total': 0}) - batch = [] - for l in sys.stdin: - if counts['raw_lines'] > 0 and counts['raw_lines'] % 10000 == 0: - print("Progress: {}...".format(counts)) - counts['raw_lines'] += 1 - if not l.strip(): - continue - info = l.split("\t") - if not info: - continue - assert len(info) == 5 - info[-1] = info[-1].strip() or None - batch.append(info) - counts['total'] += 1 - if len(batch) >= 1000: - insert(cur, batch) - conn.commit() - batch = [] - counts['batches'] += 1 - if batch: - insert(cur, batch) - batch = [] - conn.commit() - cur.close() - print("Done: {}".format(counts)) - -if __name__=='__main__': - stdin_to_pg() diff --git a/postgrest/backfill/backfill_grobid.py b/postgrest/backfill/backfill_grobid.py deleted file mode 100755 index 08fad7f..0000000 --- a/postgrest/backfill/backfill_grobid.py +++ /dev/null @@ -1,91 +0,0 @@ -#!/usr/bin/env python3 -""" -This is a "one-time" tranform helper script for GROBID backfill into -sandcrawler minio and postgresql. -""" - -import json, os, sys, collections, io -import base64 -import requests -from minio import Minio -import psycopg2 -import psycopg2.extras - - -def b32_hex(s): - s = s.strip().split()[0].lower() - if s.startswith("sha1:"): - s = s[5:] - if len(s) != 32: - return s - return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8') - -def insert(cur, batch): - sql = """ - INSERT INTO - grobid (sha1hex, grobid_version, status_code, status, fatcat_release, metadata) - VALUES %s - ON CONFLICT DO NOTHING; - """ - batch = [(d['sha1hex'], d['grobid_version'], d['status_code'], d['status'], d['fatcat_release'], d['metadata']) - for d in batch] - res = psycopg2.extras.execute_values(cur, sql, batch) - -def stdin_to_pg(): - mc = Minio('localhost:9000', - access_key=os.environ['MINIO_ACCESS_KEY'], - secret_key=os.environ['MINIO_SECRET_KEY'], - secure=False) - # no host means it will use local domain socket by default - conn = psycopg2.connect(database="sandcrawler", user="postgres") - cur = conn.cursor() - counts = collections.Counter({'total': 0}) - batch = [] - for l in sys.stdin: - if counts['raw_lines'] > 0 and counts['raw_lines'] % 10000 == 0: - print("Progress: {}...".format(counts)) - counts['raw_lines'] += 1 - l = l.strip() - if not l: - continue - row = json.loads(l) - if not row: - continue - sha1hex = b32_hex(row['pdf_hash']) - grobid_xml = row['tei_xml'].encode('utf-8') - grobid_xml_len = len(grobid_xml) - grobid_xml = io.BytesIO(grobid_xml) - - key = "{}/{}/{}.tei.xml".format( - sha1hex[0:2], - sha1hex[2:4], - sha1hex) - mc.put_object("grobid", key, grobid_xml, grobid_xml_len, - content_type="application/tei+xml", - metadata=None) - counts['minio-success'] += 1 - - info = dict( - sha1hex=sha1hex, - grobid_version=None, # TODO - status_code=200, - status=None, - fatcat_release=None, - metadata=None, - ) - batch.append(info) - counts['total'] += 1 - if len(batch) >= 1000: - insert(cur, batch) - conn.commit() - batch = [] - counts['batches'] += 1 - if batch: - insert(cur, batch) - batch = [] - conn.commit() - cur.close() - print("Done: {}".format(counts)) - -if __name__=='__main__': - stdin_to_pg() diff --git a/postgrest/backfill/backfill_grobid_unpaywall.py b/postgrest/backfill/backfill_grobid_unpaywall.py deleted file mode 100755 index 58e9e3c..0000000 --- a/postgrest/backfill/backfill_grobid_unpaywall.py +++ /dev/null @@ -1,59 +0,0 @@ -#!/usr/bin/env python3 -""" -This is a "one-time" tranform helper script for GROBID backfill into -sandcrawler minio and postgresql. - -This variant of backfill_grobid.py pushes into the unpaywall bucket of -sandcrawler-minio and doesn't push anything to sandcrawler table in general. -""" - -import json, os, sys, collections, io -import base64 -import requests -from minio import Minio -import psycopg2 -import psycopg2.extras - - -def b32_hex(s): - s = s.strip().split()[0].lower() - if s.startswith("sha1:"): - s = s[5:] - if len(s) != 32: - return s - return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8') - -def stdin_to_minio(): - mc = Minio('localhost:9000', - access_key=os.environ['MINIO_ACCESS_KEY'], - secret_key=os.environ['MINIO_SECRET_KEY'], - secure=False) - counts = collections.Counter({'total': 0}) - for l in sys.stdin: - if counts['raw_lines'] > 0 and counts['raw_lines'] % 10000 == 0: - print("Progress: {}...".format(counts)) - counts['raw_lines'] += 1 - l = l.strip() - if not l: - continue - row = json.loads(l) - if not row: - continue - sha1hex = b32_hex(row['pdf_hash']) - grobid_xml = row['tei_xml'].encode('utf-8') - grobid_xml_len = len(grobid_xml) - grobid_xml = io.BytesIO(grobid_xml) - - key = "grobid/{}/{}/{}.tei.xml".format( - sha1hex[0:2], - sha1hex[2:4], - sha1hex) - mc.put_object("unpaywall", key, grobid_xml, grobid_xml_len, - content_type="application/tei+xml", - metadata=None) - counts['minio-success'] += 1 - - print("Done: {}".format(counts)) - -if __name__=='__main__': - stdin_to_minio() diff --git a/postgrest/backfill/filter_transform_cdx.py b/postgrest/backfill/filter_transform_cdx.py deleted file mode 100755 index 3507dfc..0000000 --- a/postgrest/backfill/filter_transform_cdx.py +++ /dev/null @@ -1,88 +0,0 @@ -#!/usr/bin/env python3 -""" -This is a "one-time" tranform helper script for CDX backfill into sandcrawler -postgresql. - -Most of this file was copied from '../python/common.py'. -""" - -import json, os, sys -import base64 - -NORMAL_MIME = ( - 'application/pdf', - 'application/postscript', - 'text/html', - 'text/xml', -) - -def normalize_mime(raw): - raw = raw.lower() - for norm in NORMAL_MIME: - if raw.startswith(norm): - return norm - - # Special cases - if raw.startswith('application/xml'): - return 'text/xml' - if raw.startswith('application/x-pdf'): - return 'application/pdf' - return None - - -def test_normalize_mime(): - assert normalize_mime("asdf") is None - assert normalize_mime("application/pdf") == "application/pdf" - assert normalize_mime("application/pdf+journal") == "application/pdf" - assert normalize_mime("Application/PDF") == "application/pdf" - assert normalize_mime("application/p") is None - assert normalize_mime("application/xml+stuff") == "text/xml" - assert normalize_mime("application/x-pdf") == "application/pdf" - assert normalize_mime("application/x-html") is None - -def b32_hex(s): - s = s.strip().split()[0].lower() - if s.startswith("sha1:"): - s = s[5:] - if len(s) != 32: - return s - return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8') - - -def parse_cdx_line(raw_cdx): - - cdx = raw_cdx.split() - if len(cdx) < 11: - return None - - surt = cdx[0] - dt = cdx[1] - url = cdx[2] - mime = normalize_mime(cdx[3]) - http_status = cdx[4] - key = cdx[5] - c_size = cdx[8] - offset = cdx[9] - warc = cdx[10] - - if not (key.isalnum() and c_size.isdigit() and offset.isdigit() - and http_status == "200" and len(key) == 32 and dt.isdigit() - and mime != None): - return None - - if '-' in (surt, dt, url, mime, http_status, key, c_size, offset, warc): - return None - - # these are the new/specific bits - sha1 = b32_hex(key) - return dict(url=url, datetime=dt, sha1hex=sha1, cdx_sha1hex=None, mimetype=mime, warc_path=warc, warc_csize=int(c_size), warc_offset=int(offset)) - -for l in sys.stdin: - l = l.strip() - if not l: - continue - info = parse_cdx_line(l) - if not info: - continue - print("\t".join([info['url'], info['datetime'], info['sha1hex'], info['mimetype'], info['warc_path'], str(info['warc_csize']), str(info['warc_offset'])])) - diff --git a/postgrest/backfill/petabox_transform.py b/postgrest/backfill/petabox_transform.py deleted file mode 100755 index b638911..0000000 --- a/postgrest/backfill/petabox_transform.py +++ /dev/null @@ -1,24 +0,0 @@ -#!/usr/bin/env python3 - -import json, sys, os - -for l in sys.stdin.readlines(): - l = l.strip() - if not l: - continue - r = json.loads(l) - if not r['sha1']: - continue - sha1hex = r['sha1'] - for url in r['urls']: - u = url['url'] - if not '//archive.org/' in u: - continue - u = u.split('/') - if u[2] == 'web.archive.org': - continue - #print(u) - assert u[2] == 'archive.org' and u[3] in ('download', 'serve') - item = u[4] - path = '/'.join(u[5:]) - print("\t".join([item, path, sha1hex])) diff --git a/postgrest/sandcrawler_schema.sql b/postgrest/sandcrawler_schema.sql deleted file mode 100644 index fd921ed..0000000 --- a/postgrest/sandcrawler_schema.sql +++ /dev/null @@ -1,59 +0,0 @@ - -CREATE TABLE IF NOT EXISTS cdx ( - url TEXT NOT NULL CHECK (octet_length(url) >= 1), - datetime TEXT NOT NULL CHECK (octet_length(datetime) = 14), - sha1hex TEXT NOT NULL CHECK (octet_length(sha1hex) = 40), - cdx_sha1hex TEXT CHECK (octet_length(cdx_sha1hex) = 40), - mimetype TEXT CHECK (octet_length(mimetype) >= 1), - warc_path TEXT CHECK (octet_length(warc_path) >= 1), - warc_csize BIGINT, - warc_offset BIGINT, - row_created TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, - PRIMARY KEY(url, datetime) -); -CREATE INDEX IF NOT EXISTS cdx_sha1hex_idx ON cdx(sha1hex); -CREATE INDEX IF NOT EXISTS cdx_row_created_idx ON cdx(row_created); - -CREATE TABLE IF NOT EXISTS file_meta ( - sha1hex TEXT PRIMARY KEY CHECK (octet_length(sha1hex) = 40), - sha256hex TEXT CHECK (octet_length(sha256hex) = 64), - md5hex TEXT CHECK (octet_length(md5hex) = 32), - size_bytes BIGINT, - mimetype TEXT CHECK (octet_length(mimetype) >= 1) -); - -CREATE TABLE IF NOT EXISTS fatcat_file ( - sha1hex TEXT PRIMARY KEY CHECK (octet_length(sha1hex) = 40), - file_ident TEXT CHECK (octet_length(file_ident) = 26), - first_release_ident TEXT CHECK (octet_length(first_release_ident) = 26) -); - -CREATE TABLE IF NOT EXISTS petabox ( - item TEXT NOT NULL CHECK (octet_length(item) >= 1), - path TEXT NOT NULL CHECK (octet_length(path) >= 1), - sha1hex TEXT NOT NULL CHECK (octet_length(sha1hex) = 40), - PRIMARY KEY(item, path) -); -CREATE INDEX petabox_sha1hex_idx ON petabox(sha1hex); - -CREATE TABLE IF NOT EXISTS grobid ( - sha1hex TEXT PRIMARY KEY CHECK (octet_length(sha1hex) = 40), - updated TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, - grobid_version TEXT CHECK (octet_length(grobid_version) >= 1), - status_code INT NOT NULL, - status TEXT CHECK (octet_length(status) >= 1), - fatcat_release TEXT CHECK (octet_length(fatcat_release) = 26), - metadata JSONB -); --- CREATE INDEX grobid_fatcat_release_idx ON grobid(fatcat_release); - -CREATE TABLE IF NOT EXISTS shadow ( - shadow_corpus TEXT NOT NULL CHECK (octet_length(shadow_corpus) >= 1), - shadow_id TEXT NOT NULL CHECK (octet_length(shadow_id) >= 1), - sha1hex TEXT NOT NULL CHECK (octet_length(sha1hex) = 40), - doi TEXT CHECK (octet_length(doi) >= 1), - pmid TEXT CHECK (octet_length(pmid) >= 1), - isbn13 TEXT CHECK (octet_length(isbn13) >= 1), - PRIMARY KEY(shadow_corpus, shadow_id) -); -CREATE INDEX shadow_sha1hex_idx ON shadow(sha1hex); |