diff options
author | Martin Czygan <martin.czygan@gmail.com> | 2021-05-08 00:16:36 +0200 |
---|---|---|
committer | Martin Czygan <martin.czygan@gmail.com> | 2021-05-08 00:16:36 +0200 |
commit | cf19a24e1e8f96496bcf08554243d5bf6ee50b36 (patch) | |
tree | 16ad221e4204e5a3eaec5b22db2c2f2d92130e92 /python | |
parent | 15756219df1157a84feb5c162a01882d39e4e3a3 (diff) | |
download | refcat-cf19a24e1e8f96496bcf08554243d5bf6ee50b36.tar.gz refcat-cf19a24e1e8f96496bcf08554243d5bf6ee50b36.zip |
start over; move previous tasks to attic
Diffstat (limited to 'python')
-rw-r--r-- | python/refcat/attic.py | 1172 | ||||
-rw-r--r-- | python/refcat/tasks.py | 1208 |
2 files changed, 1194 insertions, 1186 deletions
diff --git a/python/refcat/attic.py b/python/refcat/attic.py new file mode 100644 index 0000000..38a5853 --- /dev/null +++ b/python/refcat/attic.py @@ -0,0 +1,1172 @@ +# +# +# class URLList(Refcat): +# """ +# TSV URL extracted, 44368911. +# """ +# def requires(self): +# return URLTabs() +# +# def run(self): +# stats = collections.Counter() +# with self.input().open("rb") as f: +# with self.output().open("w") as output: +# for i, line in enumerate(f, start=1): +# parts = line.decode("utf-8").strip().split("\t") +# if len(parts) != 3: +# stats["no-url"] += 1 +# continue +# urls = extract_urls(parts[2]) +# stats["found-{}".format(len(urls))] += 1 +# for link in urls: +# link = link + "\n" +# output.write(link.encode("utf-8")) +# self.logger.debug(json.dumps(dict(stats))) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) +# +# +# class RefsDOI(Refcat): +# """ +# TSV with (ident, doi, full doc). +# """ +# def requires(self): +# return RefsWithUnstructured() +# +# def run(self): +# """ +# Note: we want the full JSON document, so we use jq tostring, which +# escapes "too much", hence we need to clean up with sed, unfortunately. +# """ +# # XXX: skate-doi could be an awk function, too. +# # XXX: jq tostring might escape too much +# output = shellout(r""" +# zstdcat -T0 {input} | +# LC_ALL=C tr -d '\t' | +# parallel -j {n} --block 10M --pipe "jq -rc 'select(.biblio.doi != null) | [.release_ident, .biblio.doi, (.|tostring)] | @tsv'" | +# LC_ALL=C sed 's/\\\\/\\/g' | +# LC_ALL=C awk -F $'\t' -v OFS='\t' '$2=tolower($2)' | +# skate-to-doi -B -S -f 2 | +# LC_ALL=C sort -S 30% --parallel 6 -T {tmpdir} -k2,2 | +# zstd -c -T0 > {output} +# """, +# tmpdir=self.tmpdir, +# n=self.n, +# input=self.input().path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) +# +# +# class RefsPMID(Refcat): +# """ +# List of PMID, 74M refs seem to have one. +# """ +# def requires(self): +# return RefsWithUnstructured() +# +# def run(self): +# output = shellout(r""" +# zstdcat -T0 {input} | +# parallel -j {n} --block 10M --pipe "jq -rc 'select(.biblio.pmid != null and .biblio.doi == null) | [.release_ident, .biblio.pmid, (.|tostring)] | @tsv'" | +# LC_ALL=C sed 's/\\\\/\\/g' | +# LC_ALL=C sort -S 30% -T {tmpdir} -k2,2 | +# zstd -T0 -c > {output} +# """, +# tmpdir=self.tmpdir, +# n=self.n, +# input=self.input().path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) +# +# +# class RefsPMCID(Refcat): +# """ +# List of PMCID. +# """ +# def requires(self): +# return RefsWithUnstructured() +# +# def run(self): +# output = shellout(r""" +# zstdcat -T0 {input} | +# parallel -j {n} --block 10M --pipe "jq -rc 'select(.biblio.pmcid != null and .biblio.doi == null) | [.release_ident, .biblio.pmcid, (.|tostring)] | @tsv'" | +# LC_ALL=C sed 's/\\\\/\\/g' | +# LC_ALL=C sed -e 's@PMC@@g' | +# LC_ALL=C sort -S 30% -T {tmpdir} -k2,2 | +# zstd -T0 -c > {output} +# """, +# tmpdir=self.tmpdir, +# n=self.n, +# input=self.input().path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) +# +# +# class RefsArxiv(Refcat): +# """ +# List of arxiv ids from refs. +# """ +# def requires(self): +# return RefsWithUnstructured() +# +# def run(self): +# output = shellout(r""" +# zstdcat -T0 {input} | +# parallel -j {n} --block 10M --pipe "jq -rc 'select(.biblio.arxiv_id != null and .biblio.doi == null) | [.release_ident, .biblio.arxiv_id, (.|tostring)] | @tsv'" | +# LC_ALL=C sed 's/\\\\/\\/g' | +# LC_ALL=C sort -S 30% -k2,2 -T {tmpdir} | +# zstd -T0 -c > {output} +# """, +# tmpdir=self.tmpdir, +# n=self.n, +# input=self.input().path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) +# +# +# class RefsTitles(Refcat): +# """ +# Extract titles. +# +# Contains many artifacts, e.g.: ! Accurate! and! efficient! insertional! +# RNA!editing!in!isolated!Physarum!mitochondria.!RNA* +# """ +# def requires(self): +# return RefsWithUnstructured() +# +# def run(self): +# output = shellout(r""" +# zstdcat -T0 {input} | +# parallel -j {n} --block 10M --pipe "jq -rc 'select(.biblio.title != null and .biblio.doi == null) | +# [.release_ident, (.biblio.title | ltrimstr(\" \") | rtrimstr(\" \") | gsub(\"\\n\"; \" \"))] | @tsv'" | +# zstd -c -T0 > {output} +# """, +# input=self.input().path, +# n=self.n) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) +# +# +# class RefsTitlesLower(Refcat): +# """ +# Unique lowercase titles; 223m46.443s. +# """ +# def requires(self): +# return RefsTitles() +# +# def run(self): +# output = shellout(""" +# zstdcat -T0 {input} | +# tr '[:upper:]' '[:lower:]' | +# LC_ALL=C sort -k2 | +# zstd -T0 -c > {output} +# """, +# input=self.input().path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) +# +# +# class FatcatDOI(Refcat): +# """ +# List of DOIs, lowercase on the fly. +# """ +# def requires(self): +# return ReleaseExportReduced() +# +# def run(self): +# output = shellout(r""" +# zstdcat -T0 {input} | +# parallel -j {n} --block 10M --pipe "jq -rc 'select(.ext_ids.doi != null) | [.ident, .ext_ids.doi, (.|tostring)] | @tsv'" | +# LC_ALL=C sed 's/\\\\/\\/g' | +# LC_ALL=C awk -F $'\t' -v OFS='\t' '$2=tolower($2)' | +# LC_ALL=C sort -S 25% --parallel 6 -k2,2 -T {tmpdir} | +# zstd -c -T0 > {output} +# """, +# tmpdir=self.tmpdir, +# input=self.input().path, +# n=self.n) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) +# +# +# class FatcatPMID(Refcat): +# """ +# List of PMID. +# """ +# def requires(self): +# return ReleaseExportReduced() +# +# def run(self): +# output = shellout(r""" +# zstdcat -T0 {input} | +# parallel -j {n} --block 10M --pipe "jq -rc 'select(.ext_ids.pmid != null) | [.ident, .ext_ids.pmid, (.|tostring)] | @tsv'" | +# LC_ALL=C sed 's/\\\\/\\/g' | +# LC_ALL=C sort -S 30% -T {tmpdir} -k2,2 | +# zstd -c -T0 > {output} +# """, +# tmpdir=self.tmpdir, +# input=self.input().path, +# n=self.n) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) +# +# +# class FatcatPMCID(Refcat): +# """ +# List of PMCID. +# """ +# def requires(self): +# return ReleaseExportReduced() +# +# def run(self): +# output = shellout(r""" +# zstdcat -T0 {input} | +# parallel -j {n} --block 10M --pipe "jq -rc 'select(.ext_ids.pmcid != null) | [.ident, .ext_ids.pmcid, (.|tostring)] | @tsv'" | +# LC_ALL=C sed 's/\\\\/\\/g' | +# LC_ALL=C sed -e 's@PMC@@g' | +# LC_ALL=C sort -S 30% -T {tmpdir} -k2,2 | +# zstd -c -T0 > {output} +# """, +# tmpdir=self.tmpdir, +# input=self.input().path, +# n=self.n) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) +# +# +# class FatcatArxiv(Refcat): +# """ +# List of arxiv ids. +# """ +# def requires(self): +# return ReleaseExportReduced() +# +# def run(self): +# output = shellout(r""" +# zstdcat -T0 {input} | +# parallel -j {n} --block 10M --pipe "jq -rc 'select(.extra.arxiv.base_id != null) | [.ident, .extra.arxiv.base_id, (.|tostring)] | @tsv'" | +# LC_ALL=C sed 's/\\\\/\\/g' | +# LC_ALL=C sort -S 30% -k2,2 -T {tmpdir} | +# zstd -c -T0 > {output}""", +# tmpdir=self.tmpdir, +# input=self.input().path, +# n=self.n) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) +# +# +# class FatcatTitles(Refcat): +# """ +# Get a list of non-normalized, sorted titles; ~104min. +# """ +# def requires(self): +# return ReleaseExportReduced() +# +# def run(self): +# output = shellout(r""" +# zstdcat -T0 {input} | +# parallel -j {n} --block 10M --pipe "jq -rc 'select(.title != null and .biblio.doi == null) | +# [.ident, (.title | ltrimstr(\" \") | rtrimstr(\" \") | gsub(\"\\n\"; \" \"))] | @tsv'" | +# zstd -c -T0 > {output} +# """, +# input=self.input().path, +# n=self.n) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) +# +# +# class FatcatTitlesLower(Refcat): +# """ +# Lowercase titles. +# """ +# def requires(self): +# return FatcatTitles() +# +# def run(self): +# output = shellout(""" +# zstdcat -T0 {input} | +# tr '[:upper:]' '[:lower:]' | +# LC_ALL=C sort -k2 | +# zstd -T0 -c > {output} +# """, +# input=self.input().path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) +# +# +# class FatcatSortedKeys(Refcat): +# """ +# Derive key and sort; key derivation (150M docs) took 39min; total 61min. +# """ +# def requires(self): +# return ReleaseExportReduced() +# +# def run(self): +# output = shellout(""" +# zstdcat -T0 {input} | +# skate-derive-key -b 50000 -verbose -f tsand | +# LC_ALL=C sort -k2,2 -S 35% --parallel 6 --compress-program pzstd -T {tmpdir} | +# zstd -T0 -c > {output} +# """, +# tmpdir=self.tmpdir, +# input=self.input().path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) +# +# +# class CommonDOI(Refcat): +# """ +# DOI that appear in the catalog and in the refs. +# """ +# def requires(self): +# return { +# "fatcat": FatcatDOI(), +# "refs": RefsDOI(), +# } +# +# def run(self): +# f1 = shellout("zstdcat -T0 {fatcat} | cut -f2 > {output}", fatcat=self.input().get("fatcat").path) +# f2 = shellout("zstdcat -T0 {refs} | cut -f2 > {output}", refs=self.input().get("refs").path) +# output = shellout(""" LC_ALL=C comm {f1} {f2} | zstd -c > {output}""", f1=f1, f2=f2) +# luigi.LocalTarget(output).move(self.output().path) +# os.remove(f1) +# os.remove(f2) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) +# +# +# class CommonTitles(Refcat): +# def requires(self): +# return { +# "fatcat": FatcatTitles(), +# "refs": RefsTitles(), +# } +# +# def run(self): +# f1 = shellout("zstdcat -T0 {fatcat} | cut -f2 > {output}", fatcat=self.input().get("fatcat")) +# f2 = shellout("zstdcat -T0 {refs} | cut -f2 > {output}", refs=self.input().get("refs")) +# output = shellout(""" LC_ALL=C comm -12 {f1} {f2} | zstd -c > {output}""", f1=f1, f2=f2) +# luigi.LocalTarget(output).move(self.output().path) +# os.remove(f1) +# os.remove(f2) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) +# +# +# class CommonTitlesLower(Refcat): +# def requires(self): +# return { +# "fatcat": FatcatTitlesLower(), +# "refs": RefsTitlesLower(), +# } +# +# def run(self): +# f1 = shellout("zstdcat -T0 {fatcat} | cut -f2 > {output}", fatcat=self.input().get("fatcat").path) +# f2 = shellout("zstdcat -T0 {refs} | cut -f2 > {output}", refs=self.input().get("refs").path) +# output = shellout(""" comm -12 {f1} {f2} | zstd -c > {output}""", f1=f1, f2=f2) +# luigi.LocalTarget(output).move(self.output().path) +# os.remove(f1) +# os.remove(f2) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) +# +# +# class RefsFatcatDOIJoin(Refcat): +# """ +# Join fatcat and refs DOI lists. +# +# Output will be like: +# +# ---- DOI -------------- ------ Fatcat ----------- -------- Refs ------------- +# +# 10.1001/2012.jama.10158 m7eoa3hbivcq5kgzzlepbifbna paygwq34z5hsnm5ypnwp2kz6wq +# 10.1001/2012.jama.10159 xsw5qtrv3jg7pjoj67e3kijtwq 4ug6jvnedbau3nnkhuqegepw2q +# 10.1001/2012.jama.10161 7m7yv5xkkjakxh3wuncqoctphe yllvkrxtgnhnfcyxwbj3swhegu +# 10.1001/2012.jama.10368 dw2djv2qdzecncwmh4o7esg4ie ghgshdzpbranbcwsr4xsh3yfhy +# +# To count the number of citations per DOI, count occurences on the second +# column. +# +# """ +# def requires(self): +# return { +# "fatcat": FatcatDOI(), +# "refs": RefsDOI(), +# } +# +# def run(self): +# output = shellout(""" +# LC_ALL=C join -1 2 -2 2 <(zstdcat -T0 {fatcat}) <(zstdcat -T0 {refs}) | +# zstd -T0 -c > {output} +# """, +# fatcat=self.input().get("fatcat").path, +# refs=self.input().get("refs").path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="tsv.zst")) +# +# +# class RefsFatcatPMIDJoin(Refcat): +# """ +# Join fatcat and refs PMID lists. +# """ +# def requires(self): +# return { +# "fatcat": FatcatPMID(), +# "refs": RefsPMID(), +# } +# +# def run(self): +# output = shellout(""" +# LC_ALL=C join -1 2 -2 2 <(zstdcat -T0 {fatcat}) <(zstdcat -T0 {refs}) | +# zstd -c -T0 > {output} +# """, +# fatcat=self.input().get("fatcat").path, +# refs=self.input().get("refs").path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="tsv.zst")) +# +# +# class RefsFatcatPMCIDJoin(Refcat): +# """ +# Join fatcat and refs PMCID lists. +# """ +# def requires(self): +# return { +# "fatcat": FatcatPMCID(), +# "refs": RefsPMCID(), +# } +# +# def run(self): +# output = shellout(""" +# LC_ALL=C join -1 2 -2 2 <(zstdcat -T0 {fatcat}) <(zstdcat -T0 {refs}) | +# zstd -c -T0 > {output} +# """, +# fatcat=self.input().get("fatcat").path, +# refs=self.input().get("refs").path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="tsv.zst")) +# +# +# class RefsFatcatArxivJoin(Refcat): +# """ +# Join fatcat, refs on arxiv (base) id. +# """ +# def requires(self): +# return { +# "fatcat": FatcatArxiv(), +# "refs": RefsArxiv(), +# } +# +# def run(self): +# # TODO: We want a zippy join here (e.g. to generate biblioref docs). +# output = shellout(""" +# LC_ALL=C join -1 2 -2 2 <(zstdcat -T0 {fatcat}) <(zstdcat -T0 {refs}) | +# zstd -c -T0 > {output} +# """, +# fatcat=self.input().get("fatcat").path, +# refs=self.input().get("refs").path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="tsv.zst")) +# +# +# class RefsFatcatTitleLowerJoin(Refcat): +# """ +# Join fatcat and refs titles. +# +# Output will be textfile (title, fatcat ident, refs ident). XXX: need to +# filter out too common titles first. +# """ +# def requires(self): +# return { +# "fatcat": FatcatTitlesLower(), +# "refs": RefsTitlesLower(), +# } +# +# def run(self): +# output = shellout(""" +# LC_ALL=C join -1 2 -2 2 {fatcat} {refs} > {output} +# """, +# fatcat=self.input().get("fatcat").path, +# refs=self.input().get("refs").path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="tsv")) +# +# +# class RefsFatcatGroupJoin(Refcat): +# """ +# Concat joins. +# +# 10.1001/2012.jama.11274 of7donzkmrbiddbyrr4guqbzum nncja4imynb4rajadrlbnoklxy +# 10.1001/2012.jama.11274 of7donzkmrbiddbyrr4guqbzum noimcv5xdzd6hfqu2mebcrzr34 +# 10.1001/2012.jama.11274 of7donzkmrbiddbyrr4guqbzum nqzg5lgdxvbhniy2hajlqd3aqi +# ... +# """ +# def requires(self): +# return [RefsFatcatDOIJoin(), RefsFatcatPMIDJoin(), RefsFatcatArxivJoin(), RefsFatcatPMCIDJoin()] +# +# def run(self): +# _, tmpf = tempfile.mkstemp(prefix="refcat-") +# for target in self.input(): +# shellout("""cat {file} >> {output}""", file=target.path, output=tmpf) +# luigi.LocalTarget(tmpf).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="tsv.zst")) +# +# +# class RefsFatcatRanked(Refcat): +# """ +# Inbound count, ident; 32m34.142s. +# +# 15175 ui64apmob5gnrfwe7pwgk7egju +# 15167 cejzj3ddszcdrmij7np36am5fa +# 15165 2b2ok43pirduva7ai3745k5xa4 +# 15158 cn4c33ctb5g5fax3touxjdmfle +# 15155 rrlbmbro4rhwri3zawz3uhp5va +# 15138 o62kjogy4zdyrlvy7cu7rlcs3m +# """ +# def requires(self): +# return RefsFatcatGroupJoin() +# +# def run(self): +# output = shellout(""" +# zstdcat -T0 {file} | +# LC_ALL=C sort -k2,3 -u | +# LC_ALL=C cut -d ' ' -f 2 | +# LC_ALL=C uniq -c | +# LC_ALL=C sort -nr > {output} +# """, +# file=self.input().path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="tsv")) +# +# +# # +# # +# # # TODO: merge refs docs and release docs, maybe add an source label, then +# # # cluster; run verify and report on the number of similar records; generate a list of common titles +# # # +# # # TODO: find non-matched items and check for any pattern +# # +# # +# class RefsCounter(Refcat): +# """ +# Key counts, see: ref_counter.py. +# """ +# def requires(self): +# return RefsWithUnstructured() +# +# def run(self): +# counts = collections.Counter() +# with self.input().open("r") as f: +# for i, line in enumerate(f): +# obj = json.loads(line) +# counts['total'] += 1 +# for k in obj.keys(): +# if k == 'biblio': +# continue +# elif k == 'ref_source': +# counts["source_" + obj[k]] += 1 +# elif obj.get(k): +# counts["has_" + k] += 1 +# biblio = obj.get('biblio') +# if not biblio: +# continue +# for k in biblio.keys(): +# if biblio.get(k): +# counts["has_" + k] += 1 +# if biblio.get('doi') or biblio.get('pmcid') or biblio.get('pmid') or biblio.get('arxiv_id'): +# counts['has_any_extid'] += 1 +# if biblio.get('container_name') and biblio.get('volume') and biblio.get('issue') and biblio.get('pages'): +# counts['has_container_volume_issue_pages'] += 1 +# if biblio.get('title') and biblio.get('contrib_raw_names') and biblio.get('year'): +# counts['has_title_contrib_year'] += 1 +# if biblio.get('container_name') and biblio.get('contrib_raw_names') and biblio.get('year'): +# counts['has_contrib_container_year'] += 1 +# if biblio.get('title') and biblio.get('container_name') and biblio.get('year'): +# counts['has_title_container_year'] += 1 +# +# if i % 1000000 == 0: +# print(json.dumps(counts, indent=4, sort_keys=True), file=sys.stderr) +# +# with self.output().open("w") as output: +# json.dump(counts, output) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="json")) +# +# +# class RefsKeyStats(Refcat): +# """ +# How many titles, DOI, etc. do we have in refs? +# """ +# def requires(self): +# return RefsWithUnstructured() +# +# def run(self): +# stats = { +# "total": 0, +# "no_biblio": 0, +# "stats": collections.Counter(), +# } +# with self.input().open("r") as f: +# for i, line in enumerate(f): +# stats["total"] += 1 +# doc = json.loads(line) +# if "biblio" not in doc: +# stats["no_biblio"] += 1 +# continue +# biblio = doc["biblio"] +# key = "|".join(sorted(biblio.keys())) +# stats["stats"][key] += 1 +# if i % 1000000 == 0: +# print(json.dumps(stats, indent=4, sort_keys=True), file=sys.stderr) +# +# with self.output().open("w") as output: +# json.dumps(stats, output) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="json")) +# +# +# class RefsToRelease(Refcat): +# """ +# Convert a refs doc into a minimalistic release entity. Requires "skate" +# tools - XXX: polish. +# """ +# def requires(self): +# return RefsWithUnstructured() +# +# def run(self): +# output = shellout(""" +# zstdcat -T0 {input} | +# skate-conv -f ref -w 24 -b 100000 | +# zstd -T0 -c > {output} +# """, +# input=self.input().path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) +# +# +# class RefsSortedKeys(Refcat): +# """ +# Derive key and sort; 1.8B json docs, took: 255min; 122k/s; key extration +# almost 3h (might be faster with rust); 90G compressed. +# +# Keys based on title will have many empty keys; e.g. "2021-02-20", +# 838,057,412 docs have no key. +# """ +# def requires(self): +# return RefsToRelease() +# +# def run(self): +# output = shellout(""" +# zstdcat -T0 {input} | +# skate-derive-key -skip-empty-keys -b 50000 -verbose -f tsand | +# LC_ALL=C sort -k2,2 -S 35% --parallel 6 --compress-program pzstd -T {tmpdir} | +# zstd -T0 -c > {output} +# """, +# tmpdir=self.tmpdir, +# input=self.input().path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) +# +# +# class RefsReleasesMerged(Refcat): +# """ +# Merge release and refs (in release form). +# +# wc: 1579687186 53137849922 913692185284 +# """ +# def requires(self): +# return { +# "release": ReleaseExportReduced(), +# "refs": RefsToRelease(), +# } +# +# def run(self): +# _, f = tempfile.mkstemp(prefix="refcat-") +# for k, v in self.input().items(): +# shellout("cat {input} >> {output}", input=v.path, output=f) +# luigi.LocalTarget(f).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) +# +# +# class RefsTitleFrequency(Refcat): +# """ +# Dig into common titles. +# """ +# tmpdir = luigi.Parameter(default="/fast/tmp", description="set tempdir", significant=False) +# +# def requires(self): +# return RefsTitlesLower() +# +# def run(self): +# output = shellout(""" +# zstdcat -T0 {input} | +# LC_ALL=C cut -f2 | +# LC_ALL=C sort -T {tmpdir} -S20% --compress-program pzstd --parallel 6 | +# LC_ALL=C uniq -c | +# LC_ALL=C sort -nr | +# zstd -c9 > {output} +# """, +# tmpdir=self.tmpdir, +# input=self.input().path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) +# +# +# # # XXX: After RefsReleasesMerged, we want to cluster. +# # # python -m fuzzycat cluster -t tsandcrawler < data/re.json > cluster.json.zst +# # # +# # # Note: First run with no compression filled the disk, add zstd to fuzzycat. +# +# +# class RefsFatcatSortedKeys(Refcat): +# """ +# Extract keys and sort. +# """ +# def requires(self): +# return RefsReleasesMerged() +# +# def run(self): +# output = shellout(""" +# zstdcat -T0 {input} | +# skate-derive-key -skip-empty-keys -b 50000 -verbose -f tsand | +# LC_ALL=C sort -k2,2 -S 35% --parallel 6 --compress-program pzstd -T {tmpdir} | +# zstd -T0 -c > {output} +# """, +# tmpdir=self.tmpdir, +# input=self.input().path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) +# +# +# class RefsFatcatClusters(Refcat): +# """ +# Group by clusters. Full set will be ~90GB compressed, about 40M clusters +# (already filtered, so 2+ docs only, with at least on ref and one release, etc). +# """ +# def requires(self): +# return RefsFatcatSortedKeys() +# +# def run(self): +# output = shellout(""" +# zstdcat -T0 {input} | +# skate-cluster -both | +# zstd -T0 -c9 > {output} +# """, +# tmpdir=self.tmpdir, +# input=self.input().path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) +# +# +# # ==== new style zippy biblioref generation +# +# +# class BiblioRefFromFuzzyClusters(Refcat): +# """ +# Use "bref" mode to generate a biblioref document from verified clusters. +# """ +# def requires(self): +# return RefsFatcatClusters() +# +# def run(self): +# output = shellout(""" +# zstdcat -T0 {input} | +# skate-verify -m bref > {output} +# """, +# input=self.input().path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) +# +# +# class BiblioRefZippyDOI(Refcat): +# """ +# Generate proposed biblioref docs from two sorted key files, sorted by DOI. +# """ +# def requires(self): +# return { +# "refs": RefsDOI(), +# "releases": FatcatDOI(), +# } +# +# def run(self): +# output = shellout(r""" +# skate-verify -m exact -r doi -R <(zstdcat -T0 {releases}) -F <(zstdcat -T0 {refs}) | +# zstd -c -T0 > {output} +# """, +# releases=self.input().get("releases").path, +# refs=self.input().get("refs").path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) +# +# +# class BiblioRefZippyArxiv(Refcat): +# """ +# Generate proposed biblioref docs from two sorted key files, sorted by DOI. +# """ +# def requires(self): +# return { +# "refs": RefsArxiv(), +# "releases": FatcatArxiv(), +# } +# +# def run(self): +# output = shellout(r""" +# skate-verify -m exact -r arxiv -R <(zstdcat -T0 {releases}) -F <(zstdcat -T0 {refs}) | +# zstd -c -T0 > {output} +# """, +# releases=self.input().get("releases").path, +# refs=self.input().get("refs").path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) +# +# +# class BiblioRefZippyPMID(Refcat): +# """ +# Generate proposed biblioref docs from two sorted key files, sorted by DOI. +# """ +# def requires(self): +# return { +# "refs": RefsPMID(), +# "releases": FatcatPMID(), +# } +# +# def run(self): +# output = shellout(r""" +# skate-verify -m exact -r pmid -R <(zstdcat -T0 {releases}) -F <(zstdcat -T0 {refs}) | +# zstd -c -T0 > {output} +# """, +# releases=self.input().get("releases").path, +# refs=self.input().get("refs").path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) +# +# +# class BiblioRefZippyPMCID(Refcat): +# """ +# Generate proposed biblioref docs from two sorted key files, sorted by DOI. +# """ +# def requires(self): +# return { +# "refs": RefsPMCID(), +# "releases": FatcatPMCID(), +# } +# +# def run(self): +# output = shellout(r""" +# skate-verify -m exact -r pmcid -R <(zstdcat -T0 {releases}) -F <(zstdcat -T0 {refs}) | +# zstd -c -T0 > {output} +# """, +# releases=self.input().get("releases").path, +# refs=self.input().get("refs").path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) +# +# +# class BiblioRefV2(Refcat): +# """ +# A v1 set of biblioref schema docs. +# """ +# def requires(self): +# return [BiblioRefZippyDOI(), BiblioRefZippyArxiv(), BiblioRefZippyPMID(), BiblioRefZippyPMCID(), BiblioRefFromFuzzyClusters()] +# +# def run(self): +# _, tmpf = tempfile.mkstemp(prefix="refcat-") +# for target in self.input(): +# shellout(""" +# zstdcat -T0 {input} | +# skate-bref-id | +# zstd -T0 >> {output} +# """, +# input=target.path, +# output=tmpf) +# luigi.LocalTarget(tmpf).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) +# +# +# # ==== V3 related +# +# +# # ==== RG title match example +# +# +# class RGSitemapToRelease(Refcat): +# """ +# Turn sitemap data to skeleton release. +# """ +# def run(self): +# link = "https://archive.org/download/rg_sitemap_2021_02_23/rg_sitemap_2021_02_23.ndj.zst" +# output = shellout(""" +# curl -sL {link} | +# zstdcat -T0 | +# parallel --block 10M -j 16 --pipe "jq -rc '{{\"title\": .title, \"extra\": {{\"rg\": {{\"sitemap\": true}}}}}}'" | +# zstd -T0 -c > {output} +# """, +# link=link) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) +# +# +# class RGSitemapFatcatMerged(Refcat): +# """ +# A minimal combined fatcat and RG dataset. +# """ +# def requires(self): +# return [RGSitemapToRelease(), ReleaseExportTitleOnly()] +# +# def run(self): +# _, tmpf = tempfile.mkstemp(prefix="refcat-") +# for target in self.input(): +# shellout("""cat {file} >> {output}""", file=target.path, output=tmpf) +# luigi.LocalTarget(tmpf).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) +# +# +# class RGSitemapFatcatSortedKeys(Refcat): +# """ +# Extract keys and sort. +# """ +# def requires(self): +# return RGSitemapFatcatMerged() +# +# def run(self): +# output = shellout(""" +# zstdcat -T0 {input} | +# skate-derive-key -b 50000 -verbose -f tsand | +# LC_ALL=C sort -k2,2 -S 35% --parallel 6 --compress-program pzstd -T {tmpdir} | +# zstd -T0 -c > {output}""", +# tmpdir=self.tmpdir, +# input=self.input().path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) +# +# +# # ==== MAG +# +# +# class MAGDOI(Refcat): +# """ +# List of MAG DOI. +# """ +# def requires(self): +# return MAGPapers() +# +# def run(self): +# output = shellout(""" +# unpigz -c {input} | +# cut -f3 | +# grep -v ^$ | +# zstd -T0 -c > {output} +# """, +# input=self.input().path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) +# +# +# # ==== WikipediaCitations +# +# +# class BiblioRefWikiDOISortedKeys(Refcat): +# """ +# Sorted DOI keys from wikipedia. +# """ +# def requires(self): +# return WikipediaCitationsMinimalDataset() +# +# def run(self): +# output = shellout(""" +# cat {input} | +# skate-wikipedia-doi | +# LC_ALL=C sort -S 10% -k2,2 | +# zstd -T0 -c > {output} +# """, +# input=self.input().path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) +# +# +# class BiblioRefWiki(Refcat): +# def requires(self): +# return { +# "wiki": BiblioRefWikiDOISortedKeys(), +# "releases": FatcatDOI(), +# } +# +# def run(self): +# output = shellout(r""" +# skate-verify -m wiki -r doi -R <(zstdcat -T0 {releases}) -W <(zstdcat -T0 {wiki}) | +# zstd -c -T0 > {output} +# """, +# releases=self.input().get("releases").path, +# wiki=self.input().get("wiki").path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) +# +# +# # ==== Prepare unmatched +# +# +# class BiblioRefSortedIdent(Refcat): +# def requires(self): +# return BiblioRefV2() +# +# def run(self): +# output = shellout(""" +# zstdcat -T0 {input} | +# skate-derive-key -b 50000 -verbose -F source_release_ident | +# LC_ALL=C sort -k2,2 -S 35% --parallel 6 --compress-program pzstd -T {tmpdir} | +# zstd -T0 -c > {output} +# """, +# tmpdir=self.tmpdir, +# input=self.input().path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) +# +# +# class RefsSortedIdent(Refcat): +# def requires(self): +# return RefsWithUnstructured() +# +# def run(self): +# output = shellout(""" +# zstdcat -T0 {input} | +# skate-derive-key -b 50000 -verbose -F release_ident | +# LC_ALL=C sort -k2,2 -S 35% --parallel 6 --compress-program pzstd -T {tmpdir} | +# zstd -T0 -c > {output} +# """, +# tmpdir=self.tmpdir, +# input=self.input().path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) +# +# +# # OL +# +# +# class WithISBN(Refcat): +# """ +# Keeps converted refs with isbn. +# """ +# def requires(self): +# return RefsToRelease() +# +# def run(self): +# output = shellout(""" +# zstdcat -T0 {input} | +# parallel -j {n} --block 10M --pipe "jq -rc 'select(.ext_ids.isbn != null)'" | +# zstd -T0 -c > {output} +# """, +# n=self.n, +# tmpdir=self.tmpdir, +# input=self.input().path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) +# +# +# class OpenLibraryWorks(Refcat): +# """ +# Extract just the works. +# """ +# def requires(self): +# return OpenLibraryDump() +# +# def run(self): +# output = shellout(""" +# zstdcat -T0 {input} | +# parallel -j {n} --block 10M --pipe "jq -rc 'select(.type == \\"work\\")'" | +# zstd -T0 -c > {output} +# """, +# n=self.n, +# tmpdir=self.tmpdir, +# input=self.input().path) +# luigi.LocalTarget(output).move(self.output().path) +# +# def output(self): +# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) diff --git a/python/refcat/tasks.py b/python/refcat/tasks.py index 4c6723c..c3e32ee 100644 --- a/python/refcat/tasks.py +++ b/python/refcat/tasks.py @@ -6,7 +6,7 @@ reference data. Reference data can come from metadata or ML extraction Rationale is that we successively want to build a derived dataset (citation graph and various reports) and we expect the data to be messy and partial. We -want to cache intermediate results for quicker feature development. +want to cache intermediate results for a uniform structure of outputs. Notes. @@ -137,10 +137,10 @@ class Refcat(BaseTask): A base tasks for all refcat related tasks. """ BASE = settings.BASE - TAG = 'refcat' + TAG = '2021-05-06' date = luigi.DateParameter(default=datetime.date(2021, 5, 6), description="a versioning help, change this manually") - tmpdir = luigi.Parameter(default="/magna/tmp", description="set tempdir", significant=False) + tmpdir = luigi.Parameter(default=settings.TMPDIR, description="set tempdir", significant=False) n = luigi.IntParameter(default=multiprocessing.cpu_count(), significant=False) @property @@ -207,7 +207,7 @@ class OpenLibraryDump(luigi.ExternalTask, Refcat): class RefsWithUnstructured(Refcat): """ - Augment refs with data from biblio.unstructured. Do this first, so we can use it + Augment refs with data from biblio.unstructured - do this first, so we can use it all subsequent steps. """ def requires(self): @@ -236,8 +236,9 @@ class ReleaseExportReduced(Refcat): def run(self): output = shellout(""" zstdcat -T0 {input} | - parallel --block 10M -j 16 --pipe "jq -rc 'del(.files) | del(.refs) | del(.container.extra)'" | - zstd -T0 -c9 > {output} + parallel --block 10M -j 16 --pipe + "jq -rc 'del(.files) | del(.refs) | del(.container.extra)'" | + zstd -T0 > {output} """, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) @@ -246,990 +247,6 @@ class ReleaseExportReduced(Refcat): return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) - -class URLTabs(Refcat): - """ - Tabular URLs, note: URL can contain artifacts from parsing. - - Performance data point: - - real 70m6.309s - user 757m4.317s - sys 85m54.710s - """ - def requires(self): - return RefsWithUnstructured() - - def run(self): - output = shellout(""" - zstdcat -T0 {input} | - parallel -j {n} --block 100M --pipe "jq -rc '[.work_ident, .release_ident, .biblio.url?] | @tsv'" | - zstd -T0 -c > {output} - """, - n=self.n, - input=self.input().path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) - - -class URLList(Refcat): - """ - TSV URL extracted, 44368911. - """ - def requires(self): - return URLTabs() - - def run(self): - stats = collections.Counter() - with self.input().open("rb") as f: - with self.output().open("w") as output: - for i, line in enumerate(f, start=1): - parts = line.decode("utf-8").strip().split("\t") - if len(parts) != 3: - stats["no-url"] += 1 - continue - urls = extract_urls(parts[2]) - stats["found-{}".format(len(urls))] += 1 - for link in urls: - link = link + "\n" - output.write(link.encode("utf-8")) - self.logger.debug(json.dumps(dict(stats))) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) - - -class RefsDOI(Refcat): - """ - TSV with (ident, doi, full doc). - """ - def requires(self): - return RefsWithUnstructured() - - def run(self): - """ - Note: we want the full JSON document, so we use jq tostring, which - escapes "too much", hence we need to clean up with sed, unfortunately. - """ - # XXX: skate-doi could be an awk function, too. - # XXX: jq tostring might escape too much - output = shellout(r""" - zstdcat -T0 {input} | - LC_ALL=C tr -d '\t' | - parallel -j {n} --block 10M --pipe "jq -rc 'select(.biblio.doi != null) | [.release_ident, .biblio.doi, (.|tostring)] | @tsv'" | - LC_ALL=C sed 's/\\\\/\\/g' | - LC_ALL=C awk -F $'\t' -v OFS='\t' '$2=tolower($2)' | - skate-to-doi -B -S -f 2 | - LC_ALL=C sort -S 30% --parallel 6 -T {tmpdir} -k2,2 | - zstd -c -T0 > {output} - """, - tmpdir=self.tmpdir, - n=self.n, - input=self.input().path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) - - -class RefsPMID(Refcat): - """ - List of PMID, 74M refs seem to have one. - """ - def requires(self): - return RefsWithUnstructured() - - def run(self): - output = shellout(r""" - zstdcat -T0 {input} | - parallel -j {n} --block 10M --pipe "jq -rc 'select(.biblio.pmid != null and .biblio.doi == null) | [.release_ident, .biblio.pmid, (.|tostring)] | @tsv'" | - LC_ALL=C sed 's/\\\\/\\/g' | - LC_ALL=C sort -S 30% -T {tmpdir} -k2,2 | - zstd -T0 -c > {output} - """, - tmpdir=self.tmpdir, - n=self.n, - input=self.input().path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) - - -class RefsPMCID(Refcat): - """ - List of PMCID. - """ - def requires(self): - return RefsWithUnstructured() - - def run(self): - output = shellout(r""" - zstdcat -T0 {input} | - parallel -j {n} --block 10M --pipe "jq -rc 'select(.biblio.pmcid != null and .biblio.doi == null) | [.release_ident, .biblio.pmcid, (.|tostring)] | @tsv'" | - LC_ALL=C sed 's/\\\\/\\/g' | - LC_ALL=C sed -e 's@PMC@@g' | - LC_ALL=C sort -S 30% -T {tmpdir} -k2,2 | - zstd -T0 -c > {output} - """, - tmpdir=self.tmpdir, - n=self.n, - input=self.input().path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) - - -class RefsArxiv(Refcat): - """ - List of arxiv ids from refs. - """ - def requires(self): - return RefsWithUnstructured() - - def run(self): - output = shellout(r""" - zstdcat -T0 {input} | - parallel -j {n} --block 10M --pipe "jq -rc 'select(.biblio.arxiv_id != null and .biblio.doi == null) | [.release_ident, .biblio.arxiv_id, (.|tostring)] | @tsv'" | - LC_ALL=C sed 's/\\\\/\\/g' | - LC_ALL=C sort -S 30% -k2,2 -T {tmpdir} | - zstd -T0 -c > {output} - """, - tmpdir=self.tmpdir, - n=self.n, - input=self.input().path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) - - -class RefsTitles(Refcat): - """ - Extract titles. - - Contains many artifacts, e.g.: ! Accurate! and! efficient! insertional! - RNA!editing!in!isolated!Physarum!mitochondria.!RNA* - """ - def requires(self): - return RefsWithUnstructured() - - def run(self): - output = shellout(r""" - zstdcat -T0 {input} | - parallel -j {n} --block 10M --pipe "jq -rc 'select(.biblio.title != null and .biblio.doi == null) | - [.release_ident, (.biblio.title | ltrimstr(\" \") | rtrimstr(\" \") | gsub(\"\\n\"; \" \"))] | @tsv'" | - zstd -c -T0 > {output} - """, - input=self.input().path, - n=self.n) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) - - -class RefsTitlesLower(Refcat): - """ - Unique lowercase titles; 223m46.443s. - """ - def requires(self): - return RefsTitles() - - def run(self): - output = shellout(""" - zstdcat -T0 {input} | - tr '[:upper:]' '[:lower:]' | - LC_ALL=C sort -k2 | - zstd -T0 -c > {output} - """, - input=self.input().path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) - - -class FatcatDOI(Refcat): - """ - List of DOIs, lowercase on the fly. - """ - def requires(self): - return ReleaseExportReduced() - - def run(self): - output = shellout(r""" - zstdcat -T0 {input} | - parallel -j {n} --block 10M --pipe "jq -rc 'select(.ext_ids.doi != null) | [.ident, .ext_ids.doi, (.|tostring)] | @tsv'" | - LC_ALL=C sed 's/\\\\/\\/g' | - LC_ALL=C awk -F $'\t' -v OFS='\t' '$2=tolower($2)' | - LC_ALL=C sort -S 25% --parallel 6 -k2,2 -T {tmpdir} | - zstd -c -T0 > {output} - """, - tmpdir=self.tmpdir, - input=self.input().path, - n=self.n) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) - - -class FatcatPMID(Refcat): - """ - List of PMID. - """ - def requires(self): - return ReleaseExportReduced() - - def run(self): - output = shellout(r""" - zstdcat -T0 {input} | - parallel -j {n} --block 10M --pipe "jq -rc 'select(.ext_ids.pmid != null) | [.ident, .ext_ids.pmid, (.|tostring)] | @tsv'" | - LC_ALL=C sed 's/\\\\/\\/g' | - LC_ALL=C sort -S 30% -T {tmpdir} -k2,2 | - zstd -c -T0 > {output} - """, - tmpdir=self.tmpdir, - input=self.input().path, - n=self.n) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) - - -class FatcatPMCID(Refcat): - """ - List of PMCID. - """ - def requires(self): - return ReleaseExportReduced() - - def run(self): - output = shellout(r""" - zstdcat -T0 {input} | - parallel -j {n} --block 10M --pipe "jq -rc 'select(.ext_ids.pmcid != null) | [.ident, .ext_ids.pmcid, (.|tostring)] | @tsv'" | - LC_ALL=C sed 's/\\\\/\\/g' | - LC_ALL=C sed -e 's@PMC@@g' | - LC_ALL=C sort -S 30% -T {tmpdir} -k2,2 | - zstd -c -T0 > {output} - """, - tmpdir=self.tmpdir, - input=self.input().path, - n=self.n) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) - - -class FatcatArxiv(Refcat): - """ - List of arxiv ids. - """ - def requires(self): - return ReleaseExportReduced() - - def run(self): - output = shellout(r""" - zstdcat -T0 {input} | - parallel -j {n} --block 10M --pipe "jq -rc 'select(.extra.arxiv.base_id != null) | [.ident, .extra.arxiv.base_id, (.|tostring)] | @tsv'" | - LC_ALL=C sed 's/\\\\/\\/g' | - LC_ALL=C sort -S 30% -k2,2 -T {tmpdir} | - zstd -c -T0 > {output}""", - tmpdir=self.tmpdir, - input=self.input().path, - n=self.n) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) - - -class FatcatTitles(Refcat): - """ - Get a list of non-normalized, sorted titles; ~104min. - """ - def requires(self): - return ReleaseExportReduced() - - def run(self): - output = shellout(r""" - zstdcat -T0 {input} | - parallel -j {n} --block 10M --pipe "jq -rc 'select(.title != null and .biblio.doi == null) | - [.ident, (.title | ltrimstr(\" \") | rtrimstr(\" \") | gsub(\"\\n\"; \" \"))] | @tsv'" | - zstd -c -T0 > {output} - """, - input=self.input().path, - n=self.n) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) - - -class FatcatTitlesLower(Refcat): - """ - Lowercase titles. - """ - def requires(self): - return FatcatTitles() - - def run(self): - output = shellout(""" - zstdcat -T0 {input} | - tr '[:upper:]' '[:lower:]' | - LC_ALL=C sort -k2 | - zstd -T0 -c > {output} - """, - input=self.input().path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) - - -class FatcatSortedKeys(Refcat): - """ - Derive key and sort; key derivation (150M docs) took 39min; total 61min. - """ - def requires(self): - return ReleaseExportReduced() - - def run(self): - output = shellout(""" - zstdcat -T0 {input} | - skate-derive-key -b 50000 -verbose -f tsand | - LC_ALL=C sort -k2,2 -S 35% --parallel 6 --compress-program pzstd -T {tmpdir} | - zstd -T0 -c > {output} - """, - tmpdir=self.tmpdir, - input=self.input().path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) - - -class CommonDOI(Refcat): - """ - DOI that appear in the catalog and in the refs. - """ - def requires(self): - return { - "fatcat": FatcatDOI(), - "refs": RefsDOI(), - } - - def run(self): - f1 = shellout("zstdcat -T0 {fatcat} | cut -f2 > {output}", fatcat=self.input().get("fatcat").path) - f2 = shellout("zstdcat -T0 {refs} | cut -f2 > {output}", refs=self.input().get("refs").path) - output = shellout(""" LC_ALL=C comm {f1} {f2} | zstd -c > {output}""", f1=f1, f2=f2) - luigi.LocalTarget(output).move(self.output().path) - os.remove(f1) - os.remove(f2) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) - - -class CommonTitles(Refcat): - def requires(self): - return { - "fatcat": FatcatTitles(), - "refs": RefsTitles(), - } - - def run(self): - f1 = shellout("zstdcat -T0 {fatcat} | cut -f2 > {output}", fatcat=self.input().get("fatcat")) - f2 = shellout("zstdcat -T0 {refs} | cut -f2 > {output}", refs=self.input().get("refs")) - output = shellout(""" LC_ALL=C comm -12 {f1} {f2} | zstd -c > {output}""", f1=f1, f2=f2) - luigi.LocalTarget(output).move(self.output().path) - os.remove(f1) - os.remove(f2) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) - - -class CommonTitlesLower(Refcat): - def requires(self): - return { - "fatcat": FatcatTitlesLower(), - "refs": RefsTitlesLower(), - } - - def run(self): - f1 = shellout("zstdcat -T0 {fatcat} | cut -f2 > {output}", fatcat=self.input().get("fatcat").path) - f2 = shellout("zstdcat -T0 {refs} | cut -f2 > {output}", refs=self.input().get("refs").path) - output = shellout(""" comm -12 {f1} {f2} | zstd -c > {output}""", f1=f1, f2=f2) - luigi.LocalTarget(output).move(self.output().path) - os.remove(f1) - os.remove(f2) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) - - -class RefsFatcatDOIJoin(Refcat): - """ - Join fatcat and refs DOI lists. - - Output will be like: - - ---- DOI -------------- ------ Fatcat ----------- -------- Refs ------------- - - 10.1001/2012.jama.10158 m7eoa3hbivcq5kgzzlepbifbna paygwq34z5hsnm5ypnwp2kz6wq - 10.1001/2012.jama.10159 xsw5qtrv3jg7pjoj67e3kijtwq 4ug6jvnedbau3nnkhuqegepw2q - 10.1001/2012.jama.10161 7m7yv5xkkjakxh3wuncqoctphe yllvkrxtgnhnfcyxwbj3swhegu - 10.1001/2012.jama.10368 dw2djv2qdzecncwmh4o7esg4ie ghgshdzpbranbcwsr4xsh3yfhy - - To count the number of citations per DOI, count occurences on the second - column. - - """ - def requires(self): - return { - "fatcat": FatcatDOI(), - "refs": RefsDOI(), - } - - def run(self): - output = shellout(""" - LC_ALL=C join -1 2 -2 2 <(zstdcat -T0 {fatcat}) <(zstdcat -T0 {refs}) | - zstd -T0 -c > {output} - """, - fatcat=self.input().get("fatcat").path, - refs=self.input().get("refs").path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="tsv.zst")) - - -class RefsFatcatPMIDJoin(Refcat): - """ - Join fatcat and refs PMID lists. - """ - def requires(self): - return { - "fatcat": FatcatPMID(), - "refs": RefsPMID(), - } - - def run(self): - output = shellout(""" - LC_ALL=C join -1 2 -2 2 <(zstdcat -T0 {fatcat}) <(zstdcat -T0 {refs}) | - zstd -c -T0 > {output} - """, - fatcat=self.input().get("fatcat").path, - refs=self.input().get("refs").path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="tsv.zst")) - - -class RefsFatcatPMCIDJoin(Refcat): - """ - Join fatcat and refs PMCID lists. - """ - def requires(self): - return { - "fatcat": FatcatPMCID(), - "refs": RefsPMCID(), - } - - def run(self): - output = shellout(""" - LC_ALL=C join -1 2 -2 2 <(zstdcat -T0 {fatcat}) <(zstdcat -T0 {refs}) | - zstd -c -T0 > {output} - """, - fatcat=self.input().get("fatcat").path, - refs=self.input().get("refs").path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="tsv.zst")) - - -class RefsFatcatArxivJoin(Refcat): - """ - Join fatcat, refs on arxiv (base) id. - """ - def requires(self): - return { - "fatcat": FatcatArxiv(), - "refs": RefsArxiv(), - } - - def run(self): - # TODO: We want a zippy join here (e.g. to generate biblioref docs). - output = shellout(""" - LC_ALL=C join -1 2 -2 2 <(zstdcat -T0 {fatcat}) <(zstdcat -T0 {refs}) | - zstd -c -T0 > {output} - """, - fatcat=self.input().get("fatcat").path, - refs=self.input().get("refs").path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="tsv.zst")) - - -class RefsFatcatTitleLowerJoin(Refcat): - """ - Join fatcat and refs titles. - - Output will be textfile (title, fatcat ident, refs ident). XXX: need to - filter out too common titles first. - """ - def requires(self): - return { - "fatcat": FatcatTitlesLower(), - "refs": RefsTitlesLower(), - } - - def run(self): - output = shellout(""" - LC_ALL=C join -1 2 -2 2 {fatcat} {refs} > {output} - """, - fatcat=self.input().get("fatcat").path, - refs=self.input().get("refs").path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="tsv")) - - -class RefsFatcatGroupJoin(Refcat): - """ - Concat joins. - - 10.1001/2012.jama.11274 of7donzkmrbiddbyrr4guqbzum nncja4imynb4rajadrlbnoklxy - 10.1001/2012.jama.11274 of7donzkmrbiddbyrr4guqbzum noimcv5xdzd6hfqu2mebcrzr34 - 10.1001/2012.jama.11274 of7donzkmrbiddbyrr4guqbzum nqzg5lgdxvbhniy2hajlqd3aqi - ... - """ - def requires(self): - return [RefsFatcatDOIJoin(), RefsFatcatPMIDJoin(), RefsFatcatArxivJoin(), RefsFatcatPMCIDJoin()] - - def run(self): - _, tmpf = tempfile.mkstemp(prefix="refcat-") - for target in self.input(): - shellout("""cat {file} >> {output}""", file=target.path, output=tmpf) - luigi.LocalTarget(tmpf).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="tsv.zst")) - - -class RefsFatcatRanked(Refcat): - """ - Inbound count, ident; 32m34.142s. - - 15175 ui64apmob5gnrfwe7pwgk7egju - 15167 cejzj3ddszcdrmij7np36am5fa - 15165 2b2ok43pirduva7ai3745k5xa4 - 15158 cn4c33ctb5g5fax3touxjdmfle - 15155 rrlbmbro4rhwri3zawz3uhp5va - 15138 o62kjogy4zdyrlvy7cu7rlcs3m - """ - def requires(self): - return RefsFatcatGroupJoin() - - def run(self): - output = shellout(""" - zstdcat -T0 {file} | - LC_ALL=C sort -k2,3 -u | - LC_ALL=C cut -d ' ' -f 2 | - LC_ALL=C uniq -c | - LC_ALL=C sort -nr > {output} - """, - file=self.input().path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="tsv")) - - -# -# -# # TODO: merge refs docs and release docs, maybe add an source label, then -# # cluster; run verify and report on the number of similar records; generate a list of common titles -# # -# # TODO: find non-matched items and check for any pattern -# -# -class RefsCounter(Refcat): - """ - Key counts, see: ref_counter.py. - """ - def requires(self): - return RefsWithUnstructured() - - def run(self): - counts = collections.Counter() - with self.input().open("r") as f: - for i, line in enumerate(f): - obj = json.loads(line) - counts['total'] += 1 - for k in obj.keys(): - if k == 'biblio': - continue - elif k == 'ref_source': - counts["source_" + obj[k]] += 1 - elif obj.get(k): - counts["has_" + k] += 1 - biblio = obj.get('biblio') - if not biblio: - continue - for k in biblio.keys(): - if biblio.get(k): - counts["has_" + k] += 1 - if biblio.get('doi') or biblio.get('pmcid') or biblio.get('pmid') or biblio.get('arxiv_id'): - counts['has_any_extid'] += 1 - if biblio.get('container_name') and biblio.get('volume') and biblio.get('issue') and biblio.get('pages'): - counts['has_container_volume_issue_pages'] += 1 - if biblio.get('title') and biblio.get('contrib_raw_names') and biblio.get('year'): - counts['has_title_contrib_year'] += 1 - if biblio.get('container_name') and biblio.get('contrib_raw_names') and biblio.get('year'): - counts['has_contrib_container_year'] += 1 - if biblio.get('title') and biblio.get('container_name') and biblio.get('year'): - counts['has_title_container_year'] += 1 - - if i % 1000000 == 0: - print(json.dumps(counts, indent=4, sort_keys=True), file=sys.stderr) - - with self.output().open("w") as output: - json.dump(counts, output) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="json")) - - -class RefsKeyStats(Refcat): - """ - How many titles, DOI, etc. do we have in refs? - """ - def requires(self): - return RefsWithUnstructured() - - def run(self): - stats = { - "total": 0, - "no_biblio": 0, - "stats": collections.Counter(), - } - with self.input().open("r") as f: - for i, line in enumerate(f): - stats["total"] += 1 - doc = json.loads(line) - if "biblio" not in doc: - stats["no_biblio"] += 1 - continue - biblio = doc["biblio"] - key = "|".join(sorted(biblio.keys())) - stats["stats"][key] += 1 - if i % 1000000 == 0: - print(json.dumps(stats, indent=4, sort_keys=True), file=sys.stderr) - - with self.output().open("w") as output: - json.dumps(stats, output) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="json")) - - -class RefsToRelease(Refcat): - """ - Convert a refs doc into a minimalistic release entity. Requires "skate" - tools - XXX: polish. - """ - def requires(self): - return RefsWithUnstructured() - - def run(self): - output = shellout(""" - zstdcat -T0 {input} | - skate-conv -f ref -w 24 -b 100000 | - zstd -T0 -c > {output} - """, - input=self.input().path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) - - -class RefsSortedKeys(Refcat): - """ - Derive key and sort; 1.8B json docs, took: 255min; 122k/s; key extration - almost 3h (might be faster with rust); 90G compressed. - - Keys based on title will have many empty keys; e.g. "2021-02-20", - 838,057,412 docs have no key. - """ - def requires(self): - return RefsToRelease() - - def run(self): - output = shellout(""" - zstdcat -T0 {input} | - skate-derive-key -skip-empty-keys -b 50000 -verbose -f tsand | - LC_ALL=C sort -k2,2 -S 35% --parallel 6 --compress-program pzstd -T {tmpdir} | - zstd -T0 -c > {output} - """, - tmpdir=self.tmpdir, - input=self.input().path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) - - -class RefsReleasesMerged(Refcat): - """ - Merge release and refs (in release form). - - wc: 1579687186 53137849922 913692185284 - """ - def requires(self): - return { - "release": ReleaseExportReduced(), - "refs": RefsToRelease(), - } - - def run(self): - _, f = tempfile.mkstemp(prefix="refcat-") - for k, v in self.input().items(): - shellout("cat {input} >> {output}", input=v.path, output=f) - luigi.LocalTarget(f).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) - - -class RefsTitleFrequency(Refcat): - """ - Dig into common titles. - """ - tmpdir = luigi.Parameter(default="/fast/tmp", description="set tempdir", significant=False) - - def requires(self): - return RefsTitlesLower() - - def run(self): - output = shellout(""" - zstdcat -T0 {input} | - LC_ALL=C cut -f2 | - LC_ALL=C sort -T {tmpdir} -S20% --compress-program pzstd --parallel 6 | - LC_ALL=C uniq -c | - LC_ALL=C sort -nr | - zstd -c9 > {output} - """, - tmpdir=self.tmpdir, - input=self.input().path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) - - -# # XXX: After RefsReleasesMerged, we want to cluster. -# # python -m fuzzycat cluster -t tsandcrawler < data/re.json > cluster.json.zst -# # -# # Note: First run with no compression filled the disk, add zstd to fuzzycat. - - -class RefsFatcatSortedKeys(Refcat): - """ - Extract keys and sort. - """ - def requires(self): - return RefsReleasesMerged() - - def run(self): - output = shellout(""" - zstdcat -T0 {input} | - skate-derive-key -skip-empty-keys -b 50000 -verbose -f tsand | - LC_ALL=C sort -k2,2 -S 35% --parallel 6 --compress-program pzstd -T {tmpdir} | - zstd -T0 -c > {output} - """, - tmpdir=self.tmpdir, - input=self.input().path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) - - -class RefsFatcatClusters(Refcat): - """ - Group by clusters. Full set will be ~90GB compressed, about 40M clusters - (already filtered, so 2+ docs only, with at least on ref and one release, etc). - """ - def requires(self): - return RefsFatcatSortedKeys() - - def run(self): - output = shellout(""" - zstdcat -T0 {input} | - skate-cluster -both | - zstd -T0 -c9 > {output} - """, - tmpdir=self.tmpdir, - input=self.input().path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) - - -# ==== new style zippy biblioref generation - - -class BiblioRefFromFuzzyClusters(Refcat): - """ - Use "bref" mode to generate a biblioref document from verified clusters. - """ - def requires(self): - return RefsFatcatClusters() - - def run(self): - output = shellout(""" - zstdcat -T0 {input} | - skate-verify -m bref > {output} - """, - input=self.input().path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) - - -class BiblioRefZippyDOI(Refcat): - """ - Generate proposed biblioref docs from two sorted key files, sorted by DOI. - """ - def requires(self): - return { - "refs": RefsDOI(), - "releases": FatcatDOI(), - } - - def run(self): - output = shellout(r""" - skate-verify -m exact -r doi -R <(zstdcat -T0 {releases}) -F <(zstdcat -T0 {refs}) | - zstd -c -T0 > {output} - """, - releases=self.input().get("releases").path, - refs=self.input().get("refs").path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) - - -class BiblioRefZippyArxiv(Refcat): - """ - Generate proposed biblioref docs from two sorted key files, sorted by DOI. - """ - def requires(self): - return { - "refs": RefsArxiv(), - "releases": FatcatArxiv(), - } - - def run(self): - output = shellout(r""" - skate-verify -m exact -r arxiv -R <(zstdcat -T0 {releases}) -F <(zstdcat -T0 {refs}) | - zstd -c -T0 > {output} - """, - releases=self.input().get("releases").path, - refs=self.input().get("refs").path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) - - -class BiblioRefZippyPMID(Refcat): - """ - Generate proposed biblioref docs from two sorted key files, sorted by DOI. - """ - def requires(self): - return { - "refs": RefsPMID(), - "releases": FatcatPMID(), - } - - def run(self): - output = shellout(r""" - skate-verify -m exact -r pmid -R <(zstdcat -T0 {releases}) -F <(zstdcat -T0 {refs}) | - zstd -c -T0 > {output} - """, - releases=self.input().get("releases").path, - refs=self.input().get("refs").path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) - - -class BiblioRefZippyPMCID(Refcat): - """ - Generate proposed biblioref docs from two sorted key files, sorted by DOI. - """ - def requires(self): - return { - "refs": RefsPMCID(), - "releases": FatcatPMCID(), - } - - def run(self): - output = shellout(r""" - skate-verify -m exact -r pmcid -R <(zstdcat -T0 {releases}) -F <(zstdcat -T0 {refs}) | - zstd -c -T0 > {output} - """, - releases=self.input().get("releases").path, - refs=self.input().get("refs").path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) - - -class BiblioRefV2(Refcat): - """ - A v1 set of biblioref schema docs. - """ - def requires(self): - return [BiblioRefZippyDOI(), BiblioRefZippyArxiv(), BiblioRefZippyPMID(), BiblioRefZippyPMCID(), BiblioRefFromFuzzyClusters()] - - def run(self): - _, tmpf = tempfile.mkstemp(prefix="refcat-") - for target in self.input(): - shellout(""" - zstdcat -T0 {input} | - skate-bref-id | - zstd -T0 >> {output} - """, - input=target.path, - output=tmpf) - luigi.LocalTarget(tmpf).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) - - -# ==== V3 related - - class UnmatchedRefs(Refcat): """ File with not yet considered refs (e.g. no title, doi, ...) @@ -1240,7 +257,12 @@ class UnmatchedRefs(Refcat): def run(self): output = shellout(""" zstdcat -T0 {input} | - parallel -j {n} --block 10M --pipe "jq -rc 'select(.biblio.doi == null and .biblio.title == null and .biblio.pmid == null and .biblio.pmcid == null and .biblio.arxiv_id == null)'" | + parallel -j {n} --block 10M --pipe + "jq -rc 'select(.biblio.doi == null and + .biblio.title == null and + .biblio.pmid == null and + .biblio.pmcid == null and + .biblio.arxiv_id == null)'" | zstd -T0 -c > {output}""", n=self.n, input=self.input().path) @@ -1250,217 +272,31 @@ class UnmatchedRefs(Refcat): return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) -# ==== RG title match example - - -class RGSitemapToRelease(Refcat): - """ - Turn sitemap data to skeleton release. - """ - def run(self): - link = "https://archive.org/download/rg_sitemap_2021_02_23/rg_sitemap_2021_02_23.ndj.zst" - output = shellout(""" - curl -sL {link} | - zstdcat -T0 | - parallel --block 10M -j 16 --pipe "jq -rc '{{\"title\": .title, \"extra\": {{\"rg\": {{\"sitemap\": true}}}}}}'" | - zstd -T0 -c > {output} - """, - link=link) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) - - -class RGSitemapFatcatMerged(Refcat): - """ - A minimal combined fatcat and RG dataset. - """ - def requires(self): - return [RGSitemapToRelease(), ReleaseExportTitleOnly()] - - def run(self): - _, tmpf = tempfile.mkstemp(prefix="refcat-") - for target in self.input(): - shellout("""cat {file} >> {output}""", file=target.path, output=tmpf) - luigi.LocalTarget(tmpf).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) - - -class RGSitemapFatcatSortedKeys(Refcat): - """ - Extract keys and sort. - """ - def requires(self): - return RGSitemapFatcatMerged() - - def run(self): - output = shellout(""" - zstdcat -T0 {input} | - skate-derive-key -b 50000 -verbose -f tsand | - LC_ALL=C sort -k2,2 -S 35% --parallel 6 --compress-program pzstd -T {tmpdir} | - zstd -T0 -c > {output}""", - tmpdir=self.tmpdir, - input=self.input().path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) - - -# ==== MAG - - -class MAGDOI(Refcat): - """ - List of MAG DOI. +class URLTabs(Refcat): """ - def requires(self): - return MAGPapers() - - def run(self): - output = shellout(""" - unpigz -c {input} | - cut -f3 | - grep -v ^$ | - zstd -T0 -c > {output} - """, - input=self.input().path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) - - -# ==== WikipediaCitations + Tabular URLs, note: URL can contain artifacts from parsing. + Performance data point: -class BiblioRefWikiDOISortedKeys(Refcat): - """ - Sorted DOI keys from wikipedia. + real 70m6.309s + user 757m4.317s + sys 85m54.710s """ def requires(self): - return WikipediaCitationsMinimalDataset() - - def run(self): - output = shellout(""" - cat {input} | - skate-wikipedia-doi | - LC_ALL=C sort -S 10% -k2,2 | - zstd -T0 -c > {output} - """, - input=self.input().path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) - - -class BiblioRefWiki(Refcat): - def requires(self): - return { - "wiki": BiblioRefWikiDOISortedKeys(), - "releases": FatcatDOI(), - } - - def run(self): - output = shellout(r""" - skate-verify -m wiki -r doi -R <(zstdcat -T0 {releases}) -W <(zstdcat -T0 {wiki}) | - zstd -c -T0 > {output} - """, - releases=self.input().get("releases").path, - wiki=self.input().get("wiki").path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) - - -# ==== Prepare unmatched - - -class BiblioRefSortedIdent(Refcat): - def requires(self): - return BiblioRefV2() - - def run(self): - output = shellout(""" - zstdcat -T0 {input} | - skate-derive-key -b 50000 -verbose -F source_release_ident | - LC_ALL=C sort -k2,2 -S 35% --parallel 6 --compress-program pzstd -T {tmpdir} | - zstd -T0 -c > {output} - """, - tmpdir=self.tmpdir, - input=self.input().path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) - - -class RefsSortedIdent(Refcat): - def requires(self): return RefsWithUnstructured() def run(self): output = shellout(""" zstdcat -T0 {input} | - skate-derive-key -b 50000 -verbose -F release_ident | - LC_ALL=C sort -k2,2 -S 35% --parallel 6 --compress-program pzstd -T {tmpdir} | - zstd -T0 -c > {output} - """, - tmpdir=self.tmpdir, - input=self.input().path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) - - -# OL - - -class WithISBN(Refcat): - """ - Keeps converted refs with isbn. - """ - def requires(self): - return RefsToRelease() - - def run(self): - output = shellout(""" - zstdcat -T0 {input} | - parallel -j {n} --block 10M --pipe "jq -rc 'select(.ext_ids.isbn != null)'" | + skate-map -m ru | + LC_ALL=C sort -k3,3 -S25% --parallel 4 | zstd -T0 -c > {output} """, n=self.n, - tmpdir=self.tmpdir, input=self.input().path) luigi.LocalTarget(output).move(self.output().path) def output(self): - return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) - + return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) -class OpenLibraryWorks(Refcat): - """ - Extract just the works. - """ - def requires(self): - return OpenLibraryDump() - def run(self): - output = shellout(""" - zstdcat -T0 {input} | - parallel -j {n} --block 10M --pipe "jq -rc 'select(.type == \\"work\\")'" | - zstd -T0 -c > {output} - """, - n=self.n, - tmpdir=self.tmpdir, - input=self.input().path) - luigi.LocalTarget(output).move(self.output().path) - - def output(self): - return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) |