#!/usr/bin/env python3 """ Set of luigi tasks to derive a citation graph. $ refcat.pyz ____ __ ________ / __/________ _/ /_ / ___/ _ \/ /_/ ___/ __ `/ __/ / / / __/ __/ /__/ /_/ / /_ /_/ \___/_/ \___/\__,_/\__/ Command line entry point for running various data tasks. $ refcat.pyz [COMMAND | TASK] [OPTIONS] Commands: ls, ll, deps, tasks, files, config, cat, completion To run a task, e.g. URLList: $ refcat.pyz URLList To run a subcommand, e.g. to show task dependencies: $ refcat.pyz deps URLList To install completion run: $ source <(refcat.pyz completion) VERSION 0.1.0 SETTINGS /home/tir/.config/refcat/settings.ini BASE /bigger/.cache TMPDIR /bigger/tmp BrefZipArxiv Refs BrefZipDOI RefsArxiv BrefZipFuzzy RefsDOI BrefZipPMCID RefsMapped BrefZipPMID RefsPMCID FatcatArxiv RefsPMID FatcatDOI RefsToRelease FatcatMapped RefsWithUnstructured FatcatPMCID RefsWithoutIdentifiers FatcatPMID ReleaseExportExpanded MAGPapers ReleaseExportReduced OpenLibraryAuthorMapping URLList OpenLibraryAuthors URLTabs OpenLibraryDump UnmatchedMapped OpenLibraryEditions UnmatchedOpenLibraryMatchTable OpenLibraryEditionsByWork UnmatchedRefs OpenLibraryEditionsMapped UnmatchedRefsToRelease OpenLibraryEditionsToRelease UnmatchedResolveJournalNames OpenLibraryWorks UnmatchedResolveJournalNamesMapped OpenLibraryWorksSorted WikipediaCitationsMinimalDataset Refcat ------------------------------------------------------------------------ Overview -------- * raw input "tasks" as luigi.ExternalTask * derivationss Note: We mostly use some shell pipelines with UNIX and custom tools (see: skate); we may get rid of this "python layer" altogether, if we converged on what to build. The most common pattern is "map-reduce", e.g. derive a key from docs, combine the results from e.g. two such key extractions and apply some reduction, e.g. output schema generation. Various schema -------------- * release (fatcat database export) * ref (one document per reference) * OL editions (open library editions) * OL authors (open library authors) * wiki (a particular wikipedia reference export) * biblioref (or bref, the schema we store the citation graph in, ATM) Some operations, e.g. "fuzzy verification" require both compared documents to be release entities. This means, that we need to convert different formats into the release format at some point. Mappers ------- For catalog (fatcat) and refs, we extract ids: * doi * pmid * pmcid * arxiv We run fuzzy title matching and verification. Here, we need to convert refs to releases to be able to run verify (could implement a verification for various schemas, too -- but release seems complete enough). For OL we need to fuse authors into the editions dataset first. Reducers -------- Exact mode for ids: * doi * pmid * pmcid * arxiv For fuzzy matching, we use "fuzzy" mode (and keep only exact and strong matches). Config ------ Config (e.g. raw input data) taken from $HOME/.config/refcat/settings.ini. TODO ---- * [ ] partial (hold) Prepared resolver for journal abbreviations; most entries have some journal name, so use journal name or issn (extra step) to group candidates per journal. Journals may on average have 1K publications (few have 100K+); then for each candidate ref find most likely match in the releases of a journal. Also, many partial records do have more information in unstructured; parse this out first. * [x] OL fuzzy Beside 200K links via ISBN, about 10M links via title. Many "year" mismatches, which might indicate different editions (debug this later). * [ ] unmatched (in a final pass) We can match by id and key, e.g. extract id and key, sort and merge (id, key) from graph, and if not available use raw input. > QA things * [x] find duplicates and clean them up * [x] generate stats on match types TODO: Unmatched --------------- * raw refs may contain duplicates (e.g. "crossref" and "grobid") * refs should appear in order as they are found in the paper; can we guarantee that? Idea was that "source release ident + ref index" should allow completeness and order. "crossref" and "grobid" order may vary. In any way, we may want the raw ref blob sorted by (source) release ident - it's already sorted by work ident. We do have a work ident for all brefs as well, so we need to sort the combined bref blob by work id. bref blob raw ref blob work_id work_id For each work_id we want to know, for what entries we found some ID somewhere. For all others, we want to include them from the raw ref; need to convert from ref to bref on the fly. Comparison by e.g. identifiers or title. Make sure it's kind of unique. We should end up with 1. """ import argparse import collections import datetime import json import logging import multiprocessing import os import sys import tempfile import grobid_tei_xml import luigi import requests from refcat.base import BaseTask, Zstd, shellout from refcat.settings import settings # Directory structure will be like `base/tag/task/file.ext`, and we use an # isodate (e.g. 2021-01-01) as a convention for tag. That way we can keep old # pipeline results around, if needed. # # We also carry the date as a parameter in all tasks (we should probably get # rid of it, it is not needed). In order to match the dates, we use the # following date_from_tag parsing with fallback. try: date_from_tag = datetime.datetime.strptime(settings.TAG, "%Y-%m-%d").date() except ValueError: date_from_tag = datetime.date.today() # Raw inputs are luigi.ExternalTask instances. We can use settings.ini entries # to configure paths for raw inputs. class Refcat(BaseTask): """ A base tasks for all refcat related tasks. """ BASE = settings.BASE TAG = settings.TAG # e.g. "2021-07-28", but can be anything; TODO: converge on a pattern or simplify! date = luigi.DateParameter(default=date_from_tag, description="a versioning help, will be part of filename") tmpdir = luigi.Parameter(default=settings.TMPDIR, description="set tempdir", significant=False) n = luigi.IntParameter(default=multiprocessing.cpu_count(), significant=False) @property def logger(self): """ Return the logger. Module logging uses singleton internally, so no worries. """ return logging.getLogger('refcat') class Refs(luigi.ExternalTask, Refcat): """ Compressed (zstd) references, as of 01/2021 containing ~1.8B docs; this might increase in a next version. This comes from a custom derivation from an "heavy intermediate" format in a scholar pipeline. As of 07/2021, we have 2,507,793,772 raw refs. """ def output(self): return luigi.LocalTarget(path=settings.REFS_FILE, format=Zstd) class ReleaseExportExpanded(luigi.ExternalTask, Refcat): """ Fatcat database release export, zstd version, from e.g. https://archive.org/details/fatcat_snapshots_and_exports """ def output(self): return luigi.LocalTarget(path=settings.RELEASE_EXPORT_EXPANDED_FILE, format=Zstd) class WikipediaCitationsMinimalDataset(luigi.ExternalTask, Refcat): """ From https://archive.org/details/wikipedia_citations_2020-07-14 (Wikipedia Citations: A comprehensive dataset of citations with identifiers extracted from English Wikipedia); http://doi.org/10.5281/zenodo.3940692. Dataset contains parquet, but we want JSON here: $ parquet-tools cat --json minimal_dataset.parquet > minimal_dataset.json Contains (07/2021) around 29276667 rows. Rough id type distribution: 2160819 ISBN 1442176 DOI 825970 PMID 353425 ISSN 279369 PMC 185742 OCLC 181375 BIBCODE 110921 JSTOR 47601 ARXIV 15202 LCCN 12878 MR 8270 ASIN 6293 OL 3790 SSRN 3013 ZBL The minimal version looks like this: { "type_of_citation": "citation", "page_title": "List of R1a frequency by population", "Title": "High-resolution phylogenetic analysis ...", "ID_list": "{PMID=15944443, DOI=10.1093/molbev/msi185}" } An updated version: wikipedia-citations-enwiki-20211201, with better ID extraction. """ def output(self): return luigi.LocalTarget(path=os.path.join(settings.WIKIPEDIA_CITATIONS, "minimal_dataset.json")) class WikipediaCitations20211201(luigi.ExternalTask, Refcat): """ Update wikipedia citations dataset: https://archive.org/details/wikipedia-citations-enwiki-20211201 Generated with https://github.com/dissemin/wikiciteparser. * DOI, PMID, PMCID, arxiv-id, webarchive (prefix version) * other identifier exact * does not have some title cases Example line: { "revision_id": 991003499, "refs": [ { "Authors": [ { "first": "Liévin", "last": "Ndayizeye" }, { "first": "Benoît", "last": "Nzigidahera" }, { "first": "Abdelaziz Elamin", "last": "Gesmallah" } ], "CitationClass": "journal", "Date": "2019-03-27", "ID_list": { "DOI": "10.1007/s42690-019-00013-w", "ISSN": "1742-7592" }, "Issue": "2", "Pages": "125-130", "Periodical": "International Journal of Tropical Insect Science", "PublisherName": "Springer Science and Business Media LLC", "Title": "Current distribution of Bactrocera latifrons Hendel in the different agro-ecological zones of Burundi", "Volume": "39" } ], "site_name": "enwiki", "page_title": "List of Bactrocera species" } """ def output(self): return luigi.LocalTarget(path=os.path.join(settings.WIKIPEDIA_CITATIONS_20211201, "enwiki-20211201-pages-articles.citations.json")) class OpenLibraryEditions(luigi.ExternalTask, Refcat): """ Editions file (converted to zstd) https://openlibrary.org/developers/dumps. """ def output(self): return luigi.LocalTarget(path=settings.OL_DUMP_EDITIONS, format=Zstd) class OpenLibraryAuthors(luigi.ExternalTask, Refcat): """ Author dump (converted to zstd), from https://openlibrary.org/developers/dumps. """ def output(self): return luigi.LocalTarget(path=settings.OL_DUMP_AUTHORS, format=Zstd) class MAGPapers(luigi.ExternalTask, Refcat): """ Microsoft Academic dump as archived, e.g. https://archive.org/details/mag-2020-06-25 - we want this mainly for comparisons. """ def output(self): return luigi.LocalTarget(path=os.path.join(settings.MAG, "Papers.txt.gz"), format=Zstd) class OpenCitations(luigi.ExternalTask, Refcat): """ OpenCitations distributes a zip file containing zip files containing files with doi-doi lines. We prepare the raw file to have a single zstd compressed file to work with. Raw data looks like: oci,citing,cited,creation,timespan,journal_sc,author_sc 02003080406360106010101060909370200010237070005020502-02001000106361937231430122422370200000837000737000200,10.3846/16111699.2012.705252,10.1016/j.neucom.2008.07.020,2012-10-04,P3Y0M,no,no 02003080406360106010101060909370200010237070005020502-0200308040636010601016301060909370200000837093701080963010908,10.3846/16111699.2012.705252,10.3846/1611-1699.2008.9.189-198,2012-10-04,P4Y0M4D,yes,no 02003080406360106010101060909370200010237070005020502-02001000106361937102818141224370200000737000237000003,10.3846/16111699.2012.705252,10.1016/j.asieco.2007.02.003,2012-10-04,P5Y6M,no,no 02003080406360106010101060909370200010237070005020502-02003080406360106010101060909370200010137050505030808,10.3846/16111699.2012.705252,10.3846/16111699.2011.555388,2012-10-04,P1Y5M22D,yes,no ... Combine, e.g. via: $ find . -name "*.csv" -exec cat {} + | grep -v '^oci,' | zstd -c -T0 > coci.csv.zst """ def output(self): return luigi.LocalTarget(path=settings.COCI, format=Zstd) # ----8< Derivations # # Augmentation and reductions of raw data # --------------------------------------- # class RefsWithUnstructured(Refcat): """ Augment refs with data from biblio.unstructured - do this first, so we can use it in all subsequent steps. Do some basic cleanup. """ def requires(self): return Refs() def run(self): output = shellout(""" zstdcat -T0 {input} | skate-cleanup -c ref | skate-from-unstructured | zstd -T0 -c > {output} """, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) class ReleaseExportReduced(Refcat): """ Reduce fatcat exported dataset size, stripping some heavy fields (110min). """ def requires(self): return ReleaseExportExpanded() def run(self): output = shellout(""" zstdcat -T0 {input} | parallel --block 10M -j 16 --pipe "jq -rc 'del(.files) | del(.refs) | del(.container.extra)'" | zstd -T0 > {output} """, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) class ReleaseIdentDOIList(Refcat): """ Create TSV (ident, doi). """ def requires(self): return ReleaseExportExpanded() def run(self): output = shellout(""" zstdcat -T0 {input} | parallel --block 10M -j 20 --pipe "jq -rc '[.ident, .ext_ids.doi] | @tsv' | LC_ALL=C grep -F '.'" > {output} """, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="tsv")) class ReleaseIdentDOIMapping(Refcat): """ Create a mapping database from release ident to DOI. 21min. """ def requires(self): return ReleaseIdentDOIList() def run(self): output = shellout("""tabby -C -o {output} {input}""", input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="db")) class BrefWithDOI(Refcat): """ Take the paper matches and add source and target DOI. 1660min. """ def requires(self): return { "bref": Bref(), "mapping": ReleaseIdentDOIMapping(), } def run(self): output = shellout(""" zstdcat {bref} | tabby -A -db {mapping} -m source_release_ident:source_doi -m target_release_ident:target_doi | zstd -c -T0 > {output} """, bref=self.input().get("bref").path, mapping=self.input().get("mapping").path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) class BrefDOITable(Refcat): """ Extract a table with source ident, target ident, source doi, target doi. """ def requires(self): return BrefWithDOI() def run(self): output = shellout(""" zstdcat -T0 {input} | skate-map -B -m bidt | zstd -c -T0 > {output} """, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="tsv.zst")) class UnmatchedRefs(Refcat): """ File with not yet considered refs (e.g. no title, doi, ...); around 260,749,705. Note that this is a lower bound, since docs with titles may not be matched as well. Note, that this data contains refs, which have more information, just hidden in "unstructured" field. TODO: Parse all unparsed field data. """ def requires(self): return RefsWithUnstructured() def run(self): output = shellout(""" zstdcat -T0 {input} | parallel -j {n} --block 10M --pipe "jq -rc 'select(.biblio.doi == null and .biblio.title == null and .biblio.pmid == null and .biblio.pmcid == null and .biblio.arxiv_id == null)'" | zstd -T0 -c > {output}""", n=self.n, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) class RefsWithoutIdentifiers(Refcat): """ All references, which do not have an identifier. """ def requires(self): return RefsWithUnstructured() def run(self): output = shellout(""" zstdcat -T0 {input} | parallel -j {n} --block 10M --pipe "jq -rc 'select(.biblio.doi == null and .biblio.pmid == null and .biblio.pmcid == null and .biblio.arxiv_id == null)'" | zstd -T0 -c > {output}""", n=self.n, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) # # Generate URL list for CDX lookup # -------------------------------- # class URLTabs(Refcat): """ Extract (work ident, release ident, url, doc) from refs (519m45.710s, about 55k docs/s); sorted by url. """ def requires(self): return RefsWithUnstructured() def run(self): output = shellout(""" zstdcat -T0 {input} | skate-map -m ru -skip-on-empty 3 | LC_ALL=C sort -T {tmpdir} -k3,3 -S25% | zstd -T0 -c > {output} """, n=self.n, tmpdir=self.tmpdir, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) class URLTabsCleaned(Refcat): """ URLTabs, cleaned, sorted by url. Notes: https://is.gd/C7upZq """ def requires(self): return URLTabs() def run(self): output = shellout(""" zstdcat -T0 {input} | skate-cleanup -c url -allow http,https -X -B -S -f 3 | LC_ALL=C sort -T {tmpdir} -k3,3 -S25% | zstd -T0 -c > {output} """, n=self.n, tmpdir=self.tmpdir, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) class URLList(Refcat): """ List of mostly cleaned, unique URLs from refs. """ def requires(self): return URLTabsCleaned() def run(self): output = shellout(""" zstdcat -T0 {input} | cut -f 3 | zstd -T0 -c > {output} """, n=self.n, tmpdir=self.tmpdir, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) # # Mapping tasks # ------------- # class RefsDOI(Refcat): """ Sorted (doi, doc) tuples from refs. 225m48.755s """ def requires(self): return RefsWithUnstructured() def run(self): output = shellout(""" zstdcat -T0 {input} | skate-map -m ff -x biblio.doi -skip-on-empty 1 | skate-cleanup -S -c doi -f 1 | LC_ALL=C sort -T {tmpdir} -k1,1 -S25% | zstd -T0 -c > {output} """, n=self.n, tmpdir=self.tmpdir, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) class RefsPMID(Refcat): """ Sorted (pmid, doc) tuples from refs; PMID is an integer, https://www.ncbi.nlm.nih.gov/pmc/pmctopmid/ """ def requires(self): return RefsWithUnstructured() def run(self): output = shellout(""" zstdcat -T0 {input} | skate-map -m ff -x biblio.pmid -skip-on-empty 1 | LC_ALL=C sort -T {tmpdir} -k1,1 -S25% | zstd -T0 -c > {output} """, n=self.n, tmpdir=self.tmpdir, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) class RefsPMCID(Refcat): """ Sorted (pmcid, doc) tuples from refs, e.g. PMC2860560, https://www.ncbi.nlm.nih.gov/pmc/pmctopmid/ """ def requires(self): return RefsWithUnstructured() def run(self): output = shellout(""" zstdcat -T0 {input} | skate-map -m ff -x biblio.pmcid -skip-on-empty 1 | LC_ALL=C sort -T {tmpdir} -k1,1 -S25% | zstd -T0 -c > {output} """, n=self.n, tmpdir=self.tmpdir, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) class RefsArxiv(Refcat): """ Sorted (arxiv, doc) tuples from refs, e.g. 1802.3912, ... """ def requires(self): return RefsWithUnstructured() def run(self): output = shellout(""" zstdcat -T0 {input} | skate-map -m ff -x biblio.arxiv_id -skip-on-empty 1 | LC_ALL=C sort -T {tmpdir} -k1,1 -S25% | zstd -T0 -c > {output} """, n=self.n, tmpdir=self.tmpdir, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) # # Generate (key, doc) from fatcat # ------------------------------- # class FatcatDOI(Refcat): """ DOI from fatcat. """ def requires(self): return ReleaseExportReduced() def run(self): output = shellout(""" zstdcat -T0 {input} | skate-map -m ff -x ext_ids.doi -skip-on-empty 1 | skate-cleanup -S -c doi -f 1 | LC_ALL=C sort -T {tmpdir} -k1,1 -S25% | zstd -T0 -c > {output} """, n=self.n, tmpdir=self.tmpdir, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) class FatcatPMID(Refcat): """ PMID from fatcat. """ def requires(self): return ReleaseExportReduced() def run(self): output = shellout(""" zstdcat -T0 {input} | skate-map -m ff -x ext_ids.pmid -skip-on-empty 1 | LC_ALL=C sort -T {tmpdir} -k1,1 -S25% | zstd -T0 -c > {output} """, n=self.n, tmpdir=self.tmpdir, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) class FatcatPMCID(Refcat): """ PMCID from fatcat. """ def requires(self): return ReleaseExportReduced() def run(self): output = shellout(""" zstdcat -T0 {input} | skate-map -m ff -x ext_ids.pmcid -skip-on-empty 1 | LC_ALL=C sort -T {tmpdir} -k1,1 -S25% | zstd -T0 -c > {output} """, n=self.n, tmpdir=self.tmpdir, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) class FatcatArxiv(Refcat): """ Arxiv from fatcat. """ def requires(self): return ReleaseExportReduced() def run(self): output = shellout(""" zstdcat -T0 {input} | skate-map -m ff -x extra.arxiv.base_id -skip-on-empty 1 | LC_ALL=C sort -T {tmpdir} -k1,1 -S25% | zstd -T0 -c > {output} """, n=self.n, tmpdir=self.tmpdir, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) # # Key extraction for fuzzy matching # --------------------------------- # class FatcatMapped(Refcat): """ Fatcat mapped "tsand". """ mapper = luigi.Parameter(default="ts", description="mapper short name") def requires(self): return ReleaseExportReduced() def run(self): output = shellout(""" zstdcat -T0 {input} | skate-map -m {mapper} -skip-on-empty 1 | LC_ALL=C sort -T {tmpdir} -k1,1 -S25% | zstd -T0 -c > {output} """, mapper=self.mapper, tmpdir=self.tmpdir, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) class RefsToRelease(Refcat): """ Convert refs to release, since fuzzy verification works on release entities currently. """ def requires(self): return RefsWithUnstructured() def run(self): output = shellout(""" zstdcat -T0 {input} | skate-conv -f ref -w 24 -b 100000 | zstd -T0 -c > {output} """, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) class RefsMapped(Refcat): """ Apply mapper on refs. 281min (about 100k/s). """ mapper = luigi.Parameter(default="ts", description="mapper short name") def requires(self): return RefsToRelease() def run(self): output = shellout(""" zstdcat -T0 {input} | skate-map -m {mapper} -skip-on-empty 1 | LC_ALL=C sort -T {tmpdir} -k1,1 -S25% | zstd -T0 -c > {output} """, n=self.n, mapper=self.mapper, tmpdir=self.tmpdir, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) # # Biblioref generation from identifier matches # -------------------------------------------- # class BrefZipDOI(Refcat): """ Run skate-reduce from two files. """ def requires(self): return { "refs": RefsDOI(), "fatcat": FatcatDOI(), } def run(self): output = shellout(r""" skate-reduce -m exact -r doi -F <(zstdcat -T0 {refs}) -L <(zstdcat -T0 {fatcat}) | zstd -c -T0 > {output} """, refs=self.input().get("refs").path, fatcat=self.input().get("fatcat").path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) class BrefZipPMID(Refcat): """ Run skate-reduce from two files. """ def requires(self): return { "refs": RefsPMID(), "fatcat": FatcatPMID(), } def run(self): output = shellout(r""" skate-reduce -m exact -r pmid -F <(zstdcat -T0 {refs}) -L <(zstdcat -T0 {fatcat}) | zstd -c -T0 > {output} """, refs=self.input().get("refs").path, fatcat=self.input().get("fatcat").path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) class BrefZipPMCID(Refcat): """ Run skate-reduce from two files. """ def requires(self): return { "refs": RefsPMCID(), "fatcat": FatcatPMCID(), } def run(self): output = shellout(r""" skate-reduce -m exact -r pmcid -F <(zstdcat -T0 {refs}) -L <(zstdcat -T0 {fatcat}) | zstd -c -T0 > {output} """, refs=self.input().get("refs").path, fatcat=self.input().get("fatcat").path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) class BrefZipArxiv(Refcat): """ Run skate-reduce from two files. """ def requires(self): return { "refs": RefsArxiv(), "fatcat": FatcatArxiv(), } def run(self): output = shellout(r""" skate-reduce -m exact -r arxiv -F <(zstdcat -T0 {refs}) -L <(zstdcat -T0 {fatcat}) | zstd -c -T0 > {output} """, refs=self.input().get("refs").path, fatcat=self.input().get("fatcat").path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) # # Biblioref generation from fuzzy matching # ---------------------------------------- # class BrefZipFuzzy(Refcat): """ Run skate-reduce from two files, fuzzy mode; 1039m55.350s, skate-reduce not parallelized yet. """ mapper = luigi.Parameter(default="ts", description="mapper short name") def requires(self): return { "refs": RefsMapped(mapper=self.mapper), "fatcat": FatcatMapped(mapper=self.mapper), } def run(self): output = shellout(r""" skate-reduce -m fuzzy -F <(zstdcat -T0 {refs}) -L <(zstdcat -T0 {fatcat}) | zstd -c -T0 > {output} """, refs=self.input().get("refs").path, fatcat=self.input().get("fatcat").path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) # # Open Library Fuzzy matching (OL editions -> release, key extraction) # -------------------------------------------------------------------- # class OpenLibraryAuthorMapping(Refcat): """ Create an OL author id to author name TSV mapping. Output like: /authors/OL1000002A Īfilīn Farīd Jūrj Yārid /authors/OL1000025A Khālid ibn Aḥmad Sulaymān /authors/OL1000435A Muḥammad Shawqī ibn Ibrāhīm Makkī /authors/OL1000449A Fāris Mūsá Muṭṭalib Mashāqbah """ def requires(self): return OpenLibraryAuthors() def run(self): output = shellout(""" zstdcat -T0 {input} | LC_ALL=C cut -f 5 | jq -rc '[.key, .name]|@tsv' | zstd -T0 > {output} """, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) class OpenLibraryEditionsToRelease(Refcat): """ Turn Open Library editions into release entities with author mapping. """ def requires(self): return { "oled": OpenLibraryEditions(), "map": OpenLibraryAuthorMapping(), } def run(self): output = shellout(""" zstdcat -T0 {input} | cut -f5 | skate-conv -B -f oled -Xa <(zstdcat -T0 {map}) | zstd -T0 -c > {output} """, input=self.input().get("oled").path, map=self.input().get("map").path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) class OpenLibraryEditionsMapped(Refcat): """ A mapped open library editions set. """ mapper = luigi.Parameter(default="ts", description="mapper short name") def requires(self): return OpenLibraryEditionsToRelease() def run(self): output = shellout(""" zstdcat -T0 {input} | skate-map -m {mapper} -skip-on-empty 1 | LC_ALL=C sort -T {tmpdir} -k1,1 -S25% | zstd -T0 -c > {output} """, n=self.n, mapper=self.mapper, tmpdir=self.tmpdir, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) class BrefOpenLibraryZipISBN(Refcat): """ Run skate-reduce from two files. """ def requires(self): return { "refs": RefsMapped(mapper="isbn"), "ol": OpenLibraryReleaseMapped(mapper="isbn"), } def run(self): output = shellout(r""" skate-reduce -m rere -r isbn -F <(zstdcat -T0 {refs}) -L <(zstdcat -T0 {ol}) | zstd -c -T0 > {output} """, refs=self.input().get("refs").path, ol=self.input().get("ol").path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) # # Open Library Fuzzy # ------------------ # class UnmatchedMapped(Refcat): """ Map unmatched refs (converted to release schema on the fly) to container names to do approximate matches with OL. 221m55.746s. """ def requires(self): return RefsWithoutIdentifiers() def run(self): output = shellout(""" zstdcat -T0 {input} | skate-conv -f ref | skate-map -m rcns -skip-on-empty 1 | LC_ALL=C sort -T {tmpdir} -S25% -k1,1 | zstd -T0 -c > {output} """, tmpdir=self.tmpdir, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) class UnmatchedOpenLibraryMatchTable(Refcat): """ Run matching and write tabular results to file. 158m15.856s. Total rows: 139507963, exact/strong matches: 11777185. 93924122 different 33779488 ambiguous 11670030 strong 107155 exact """ def requires(self): return { "unmatched": UnmatchedMapped(), # We could include a bit more here, namely records with titles. "ol": OpenLibraryEditionsMapped(), } def run(self): output = shellout(""" skate-reduce -m oledt -O <(zstdcat -T0 {ol}) -F <(zstdcat -T0 {unmatched}) | zstd -c > {output} """, ol=self.input().get("ol").path, unmatched=self.input().get("unmatched").path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) class BrefZipOpenLibrary(Refcat): """ Export fuzzy matches to open library targets. 178m23.701s, finds 11777185 matches; but many false negatives. """ def requires(self): return { "unmatched": UnmatchedMapped(), "ol": OpenLibraryEditionsMapped(), } def run(self): output = shellout(""" skate-reduce -m oled -O <(zstdcat -T0 {ol}) -F <(zstdcat -T0 {unmatched}) | zstd -c > {output} """, ol=self.input().get("ol").path, unmatched=self.input().get("unmatched").path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) class OpenLibraryReleaseMapped(Refcat): """ OL is small compared to the whole ref corpus, 3m28.841s. """ mapper = luigi.Parameter(default="isbn", description="mapper short name") def requires(self): return OpenLibraryEditionsToRelease() def run(self): output = shellout(""" zstdcat -T0 {input} | skate-map -m {mapper} -skip-on-empty 1 | LC_ALL=C sort -T {tmpdir} -k1,1 -S25% | zstd -T0 -c > {output} """, mapper=self.mapper, tmpdir=self.tmpdir, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) # # Combined Bref File # class Bref(Refcat): """ Combine bref files from various sources. Note that we want to include any dataset which points from fatcat to an external dataset - in order to later fuse the rest of the unmatched entries with the matches. """ def requires(self): return { "doi": BrefZipDOI(), "pmid": BrefZipPMID(), "pmcid": BrefZipPMCID(), "arxiv": BrefZipArxiv(), "fuzzy": BrefZipFuzzy(), "openlibrary-isbn": BrefOpenLibraryZipISBN(), "openlibrary-fuzzy": BrefZipOpenLibrary(), } def run(self): _, tmpf = tempfile.mkstemp() for k, v in self.input().items(): self.logger.debug("adding {}".format(k)) shellout("""cat "{}" >> {}""".format(v.path, tmpf)) luigi.LocalTarget(tmpf).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) # Final Assembly # -------------- # # Currently, "BrefCombined" is the result of the "Bref" matches and the raw # refs. The joined dataset should be directly indexable into elasticsearch in # fatcat_refs schema. class BrefSortedByWorkID(Refcat): """ Sort by work id. Keep only docs that actually have a work id. 237m45.094s. """ def requires(self): return Bref() def run(self): output = shellout(""" zstdcat -T0 {bref} | skate-map -skip-on-empty 1 -B -m ff -x source_work_ident | LC_ALL=C sort -T {tmpdir} -S25% -k1,1 | zstd -c -T0 > {output} """, tmpdir=self.tmpdir, bref=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) class RefsByWorkID(Refcat): """ Key raw refs by work id. Since data is already sorted by work id, this can skip the sorting step. 174m13.837s (~170K extractions/s). Seems, ordering is off, e.g. BrefSortedByWorkID starts with "22222dgdnzgxpmeq77nyyuj2x4". """ def requires(self): return Refs() def run(self): output = shellout(""" zstdcat -T0 {input} | skate-map -m ff -x work_ident | LC_ALL=C sort -T {tmpdir} -S25% -k1,1 | zstd -c -T0 > {output} """, tmpdir=self.tmpdir, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) class BrefCombined(Refcat): """ TODO: We'll need another final assembly of ref and non-ref matches. Merge the raw references from papers with our biblioref format, such that we include all non-matched items and also consider duplicates. This is basically a reduce step, where we group by work id (since the raw refs were already sorted by work id). Data points: version 2021-05-06 results in 1,323,614,061 docs (77G compressed; about 285G when indexed in ES7); version 2021-07-06 contained 1,865,637,767 docs (116G). Data point: 72G matches, 170G unmatched (compressed); about 3.8B docs (close to 300k docs/s): real 214m12.019s user 1482m46.429s sys 114m9.439s Result file is 116G compressed (9M/s); 1,865,637,767 docs; 797,304,866,555b. """ def requires(self): return { "refs": RefsByWorkID(), "matched": BrefSortedByWorkID(), } def run(self): ts = datetime.datetime.now().strftime("%Y%m%d%H%M%S") logfile = os.path.join(self.tmpdir, "refcat-bref-combined-{}.log.gz".format(ts)) output = shellout(""" skate-reduce -log {logfile} -m unmatched -B <(zstdcat -T0 {matched}) -F <(zstdcat -T0 {refs}) | zstd -c -T0 > {output} """, logfile=logfile, matched=self.input().get("matched").path, refs=self.input().get("refs").path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) # Stats from BrefCombined # ======================= # # Calculate stats off the match result and other comparisons. # # TODO: # # [ ] match status and reason freq table # * [ ] [A] minimal source-target ident set (plus status, reason), sort by work ident # * [ ] [B] fatcat db source ident plus ext id sorted by work ident # * [ ] [C] turn [A] and [B] into a DOI to DOI match table (sorted by source doi) -- we only have source ident doi, not target ident doi (!) # * [ ] [D] sort COCI by citing (or cited) # * [ ] [E] compare COCI and "ASC" doi matches (as set ops, only COCI, only "ASC", etc class ExtraMatchedByWorkIdent(Refcat): """ Matched sorted by source work ident. 309m6.832s. """ def requires(self): return BrefCombined() def run(self): output = shellout(""" zstdcat -T0 {input} | skate-map -m bref -skip-on-empty 1 | LC_ALL=C sort -T {tmpdir} -k1,1 -S25% | zstd -T0 -c > {output} """, tmpdir=self.tmpdir, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) class ExtraReleaseByWorkIdent(Refcat): """ Fatcat entries by work id. 68m54.340s. """ def requires(self): return ReleaseExportReduced() def run(self): output = shellout(""" zstdcat -T0 {input} | skate-map -m rewo -skip-on-empty 1 | LC_ALL=C sort -T {tmpdir} -k1,1 -S25% | zstd -T0 -c > {output} """, tmpdir=self.tmpdir, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) # Extra # ----- # # Tinking with suffix arrays to pluck out journal names from abbreviations, etc. # # TODO: Be more principled, some stats on how many refs we could match this way. class UnmatchedRefsToRelease(Refcat): """ Convert unmatched refs to releases. """ def requires(self): return UnmatchedRefs() def run(self): output = shellout(""" zstdcat -T0 {input} | skate-conv -f ref | zstd -T0 -c > {output} """, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) class UnmatchedResolveJournalNames(Refcat): """ Try to resolve journal names so we can match against both abbreviations. Keep only the resolved docs (for now). """ def requires(self): return UnmatchedRefsToRelease() def run(self): output = shellout(""" zstdcat -T0 {input} | skate-resolve-journal-name -R -f 1 -B -A {abbrev} | zstd -T0 -c > {output} """, abbrev=settings.JOURNAL_ABBREVIATIONS, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) class UnmatchedResolveJournalNamesMapped(Refcat): """ Take the augmented unmatched refs data and map the container names (abbrev and full). """ def requires(self): return UnmatchedResolveJournalNames() def run(self): output = shellout(""" zstdcat -T0 {input} | skate-map -m vcns -skip-on-empty 1 | LC_ALL=C sort -T {tmpdir} -k1,1 -S25% | zstd -T0 -c > {output} """, tmpdir=self.tmpdir, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) # Wikipedia related tasks; pages referencing papers we know about, e.g. # Wiki_page -> target_release_ident. # # Using prepared datasets, just using DOI for the moment. # TODO: use more than just DOI. class WikipediaDOI(Refcat): """ Sorted DOI keys from wikipedia. Takes about a minute. """ def requires(self): return WikipediaCitationsMinimalDataset() def run(self): output = shellout(""" skate-wikipedia-doi < {input} | LC_ALL=C sort -T {tmpdir} -S 20% -k1,1 | zstd -T0 -c > {output} """, tmpdir=self.tmpdir, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) class Wikipedia20211201DOI(Refcat): """ Updated wikipedia citations dataset. {"type_of_citation":"cite journal","page_title":"Abortion in Alabama","Title":"Why We Should Stop Using the Term \"Elective Abortion\"","ID_list":"{PMID=30585581, DOI=10.1001/amajethics.2018.1175}"} """ def requires(self): return WikipediaCitations20211201() def run(self): with tempfile.NamedTemporaryFile(delete=False, mode="w") as tf: with self.input().open() as handle: for line in handle: doc = json.loads(line) if not doc["page_title"]: continue for i, ref in enumerate(doc.get("refs", [])): if not "ID_list" in ref: continue if not "DOI" in ref["ID_list"]: continue doi = ref["ID_list"]["DOI"].strip() reduced = doc reduced["refs"] = [] reduced["index"] = i reduced["Title"] = ref.get("Title") fields = [doi, doc["page_title"], json.dumps(reduced)] if not all(fields): continue tf.write("\t".join(fields) + "\n") output = shellout("LC_ALL=C sort -S30% {input} > {output}", input=tf.name) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="tsv")) class BrefZipWikiDOI(Refcat): """ Generate biblioref for inbound wikipedia articles through exact matches. Yields about 1125638 edges. """ def requires(self): return { # "wiki": WikipediaDOI(), "wiki": Wikipedia20211201DOI(), "fatcat": FatcatDOI(), } def run(self): output = shellout(""" skate-reduce -m wiki -W {wiki} -L <(zstdcat -T0 {fatcat}) | zstd -T0 -c > {output} """, wiki=self.input().get("wiki").path, fatcat=self.input().get("fatcat").path) # output = shellout(""" # skate-reduce -m wiki -W <(zstdcat -T0 {wiki}) -L <(zstdcat -T0 {fatcat}) | # zstd -T0 -c > {output} # """, # wiki=self.input().get("wiki").path, # fatcat=self.input().get("fatcat").path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) # Grobid reparse via grobid_tei_xml class UnmatchedRefsReparse(Refcat): """ Reparse unmatched refs which have an unstructured field; about 190M/270M unmatched, currently. We have more unmatched - these are only the ones where we do not have a title. """ def requires(self): return UnmatchedRefs() def run(self): with self.output().open("w") as output: with self.input().open() as f: for i, line in enumerate(f): if i % 100000 == 0: self.logger.debug("@{}".format(i)) doc = json.loads(line) if not "biblio" in doc: continue if not "unstructured" in doc["biblio"]: continue unstructured = doc["biblio"]["unstructured"] if len(unstructured) < 5: continue grobid_resp = requests.post( "https://grobid.qa.fatcat.wiki/api/processCitation", data={ 'citations': unstructured, 'consolidateCitations': 0, }, timeout=10.0, ) grobid_resp.raise_for_status() citations = grobid_tei_xml.parse_citations_xml(grobid_resp.text) if len(citations) == 0: continue # self.logger.debug("[parsing] {} sent, {} from grobid, for {}, {}".format( # len(unstructured), len(grobid_resp.text), unstructured, citations)) data = json.dumps(citations[0].to_dict()) output.write(data.encode("utf-8")) output.write(b"\n") def output(self): return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) # Wayback related, extract URL, query CDX. # # TODO: Make CDX lookup more, genenic, maybe a separate library or tool or mass # query utility via hadoop streaming or the like. class RefsURL(Refcat): """ Extract (url, doc), sort by url. """ def requires(self): return RefsWithUnstructured() def run(self): output = shellout(""" zstdcat -T0 {input} | skate-map -m ur -skip-on-empty 1 | skate-cleanup -c url -allow http,https -X -B -S -f 1 | LC_ALL=C sort -T {tmpdir} -k1,1 -S25% | zstd -T0 -c > {output} """, n=self.n, tmpdir=self.tmpdir, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) class CDXURL(Refcat): """ Stub implementation of ad-hoc CDX. We only consider a subset of documents. """ cache = luigi.Parameter(default="/magna/.cache/skate/cdx", significant=False) limit = luigi.IntParameter(default=80000, significant=False) def requires(self): return RefsURL() def run(self): output = shellout(""" zstdcat -T0 {input} | LC_ALL=C cut -f 1 | LC_ALL=C head -n {limit} | skate-cdx-lookup -q -s 50ms -c {cache} -j -B | skate-map -m cdxu | LC_ALL=C sort -u -T {tmpdir} -k1,1 -S25% | zstd -c -T0 > {output} """, tmpdir=self.tmpdir, limit=self.limit, input=self.input().path, cache=self.cache, ignoremap={141: "todo: root cause"}) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) class BrefZipWayback(Refcat): def requires(self): return { "refs": RefsURL(), "cdx": CDXURL(), } def run(self): output = shellout(""" skate-reduce -m wb -F <(zstdcat -T0 {refs}) -C <(zstdcat -T0 {cdx}) | zstd -c -T0 > {output} """, refs=self.input().get("refs").path, cdx=self.input().get("cdx").path) luigi.LocalTarget(output).move(self.output().path) def output(self): return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd)