diff options
-rw-r--r-- | python/refcat/attic.py | 2343 |
1 files changed, 1174 insertions, 1169 deletions
diff --git a/python/refcat/attic.py b/python/refcat/attic.py index 38a5853..9f26882 100644 --- a/python/refcat/attic.py +++ b/python/refcat/attic.py @@ -1,1172 +1,1177 @@ # -# -# class URLList(Refcat): -# """ -# TSV URL extracted, 44368911. -# """ -# def requires(self): -# return URLTabs() -# -# def run(self): -# stats = collections.Counter() -# with self.input().open("rb") as f: -# with self.output().open("w") as output: -# for i, line in enumerate(f, start=1): -# parts = line.decode("utf-8").strip().split("\t") -# if len(parts) != 3: -# stats["no-url"] += 1 -# continue -# urls = extract_urls(parts[2]) -# stats["found-{}".format(len(urls))] += 1 -# for link in urls: -# link = link + "\n" -# output.write(link.encode("utf-8")) -# self.logger.debug(json.dumps(dict(stats))) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) -# -# -# class RefsDOI(Refcat): -# """ -# TSV with (ident, doi, full doc). -# """ -# def requires(self): -# return RefsWithUnstructured() -# -# def run(self): -# """ -# Note: we want the full JSON document, so we use jq tostring, which -# escapes "too much", hence we need to clean up with sed, unfortunately. -# """ -# # XXX: skate-doi could be an awk function, too. -# # XXX: jq tostring might escape too much -# output = shellout(r""" -# zstdcat -T0 {input} | -# LC_ALL=C tr -d '\t' | -# parallel -j {n} --block 10M --pipe "jq -rc 'select(.biblio.doi != null) | [.release_ident, .biblio.doi, (.|tostring)] | @tsv'" | -# LC_ALL=C sed 's/\\\\/\\/g' | -# LC_ALL=C awk -F $'\t' -v OFS='\t' '$2=tolower($2)' | -# skate-to-doi -B -S -f 2 | -# LC_ALL=C sort -S 30% --parallel 6 -T {tmpdir} -k2,2 | -# zstd -c -T0 > {output} -# """, -# tmpdir=self.tmpdir, -# n=self.n, -# input=self.input().path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) -# -# -# class RefsPMID(Refcat): -# """ -# List of PMID, 74M refs seem to have one. -# """ -# def requires(self): -# return RefsWithUnstructured() -# -# def run(self): -# output = shellout(r""" -# zstdcat -T0 {input} | -# parallel -j {n} --block 10M --pipe "jq -rc 'select(.biblio.pmid != null and .biblio.doi == null) | [.release_ident, .biblio.pmid, (.|tostring)] | @tsv'" | -# LC_ALL=C sed 's/\\\\/\\/g' | -# LC_ALL=C sort -S 30% -T {tmpdir} -k2,2 | -# zstd -T0 -c > {output} -# """, -# tmpdir=self.tmpdir, -# n=self.n, -# input=self.input().path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) -# -# -# class RefsPMCID(Refcat): -# """ -# List of PMCID. -# """ -# def requires(self): -# return RefsWithUnstructured() -# -# def run(self): -# output = shellout(r""" -# zstdcat -T0 {input} | -# parallel -j {n} --block 10M --pipe "jq -rc 'select(.biblio.pmcid != null and .biblio.doi == null) | [.release_ident, .biblio.pmcid, (.|tostring)] | @tsv'" | -# LC_ALL=C sed 's/\\\\/\\/g' | -# LC_ALL=C sed -e 's@PMC@@g' | -# LC_ALL=C sort -S 30% -T {tmpdir} -k2,2 | -# zstd -T0 -c > {output} -# """, -# tmpdir=self.tmpdir, -# n=self.n, -# input=self.input().path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) -# -# -# class RefsArxiv(Refcat): -# """ -# List of arxiv ids from refs. -# """ -# def requires(self): -# return RefsWithUnstructured() -# -# def run(self): -# output = shellout(r""" -# zstdcat -T0 {input} | -# parallel -j {n} --block 10M --pipe "jq -rc 'select(.biblio.arxiv_id != null and .biblio.doi == null) | [.release_ident, .biblio.arxiv_id, (.|tostring)] | @tsv'" | -# LC_ALL=C sed 's/\\\\/\\/g' | -# LC_ALL=C sort -S 30% -k2,2 -T {tmpdir} | -# zstd -T0 -c > {output} -# """, -# tmpdir=self.tmpdir, -# n=self.n, -# input=self.input().path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) -# -# -# class RefsTitles(Refcat): -# """ -# Extract titles. -# -# Contains many artifacts, e.g.: ! Accurate! and! efficient! insertional! -# RNA!editing!in!isolated!Physarum!mitochondria.!RNA* -# """ -# def requires(self): -# return RefsWithUnstructured() -# -# def run(self): -# output = shellout(r""" -# zstdcat -T0 {input} | -# parallel -j {n} --block 10M --pipe "jq -rc 'select(.biblio.title != null and .biblio.doi == null) | -# [.release_ident, (.biblio.title | ltrimstr(\" \") | rtrimstr(\" \") | gsub(\"\\n\"; \" \"))] | @tsv'" | -# zstd -c -T0 > {output} -# """, -# input=self.input().path, -# n=self.n) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) -# -# -# class RefsTitlesLower(Refcat): -# """ -# Unique lowercase titles; 223m46.443s. -# """ -# def requires(self): -# return RefsTitles() -# -# def run(self): -# output = shellout(""" -# zstdcat -T0 {input} | -# tr '[:upper:]' '[:lower:]' | -# LC_ALL=C sort -k2 | -# zstd -T0 -c > {output} -# """, -# input=self.input().path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) -# -# -# class FatcatDOI(Refcat): -# """ -# List of DOIs, lowercase on the fly. -# """ -# def requires(self): -# return ReleaseExportReduced() -# -# def run(self): -# output = shellout(r""" -# zstdcat -T0 {input} | -# parallel -j {n} --block 10M --pipe "jq -rc 'select(.ext_ids.doi != null) | [.ident, .ext_ids.doi, (.|tostring)] | @tsv'" | -# LC_ALL=C sed 's/\\\\/\\/g' | -# LC_ALL=C awk -F $'\t' -v OFS='\t' '$2=tolower($2)' | -# LC_ALL=C sort -S 25% --parallel 6 -k2,2 -T {tmpdir} | -# zstd -c -T0 > {output} -# """, -# tmpdir=self.tmpdir, -# input=self.input().path, -# n=self.n) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) -# -# -# class FatcatPMID(Refcat): -# """ -# List of PMID. -# """ -# def requires(self): -# return ReleaseExportReduced() -# -# def run(self): -# output = shellout(r""" -# zstdcat -T0 {input} | -# parallel -j {n} --block 10M --pipe "jq -rc 'select(.ext_ids.pmid != null) | [.ident, .ext_ids.pmid, (.|tostring)] | @tsv'" | -# LC_ALL=C sed 's/\\\\/\\/g' | -# LC_ALL=C sort -S 30% -T {tmpdir} -k2,2 | -# zstd -c -T0 > {output} -# """, -# tmpdir=self.tmpdir, -# input=self.input().path, -# n=self.n) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) -# -# -# class FatcatPMCID(Refcat): -# """ -# List of PMCID. -# """ -# def requires(self): -# return ReleaseExportReduced() -# -# def run(self): -# output = shellout(r""" -# zstdcat -T0 {input} | -# parallel -j {n} --block 10M --pipe "jq -rc 'select(.ext_ids.pmcid != null) | [.ident, .ext_ids.pmcid, (.|tostring)] | @tsv'" | -# LC_ALL=C sed 's/\\\\/\\/g' | -# LC_ALL=C sed -e 's@PMC@@g' | -# LC_ALL=C sort -S 30% -T {tmpdir} -k2,2 | -# zstd -c -T0 > {output} -# """, -# tmpdir=self.tmpdir, -# input=self.input().path, -# n=self.n) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) -# -# -# class FatcatArxiv(Refcat): -# """ -# List of arxiv ids. -# """ -# def requires(self): -# return ReleaseExportReduced() -# -# def run(self): -# output = shellout(r""" -# zstdcat -T0 {input} | -# parallel -j {n} --block 10M --pipe "jq -rc 'select(.extra.arxiv.base_id != null) | [.ident, .extra.arxiv.base_id, (.|tostring)] | @tsv'" | -# LC_ALL=C sed 's/\\\\/\\/g' | -# LC_ALL=C sort -S 30% -k2,2 -T {tmpdir} | -# zstd -c -T0 > {output}""", -# tmpdir=self.tmpdir, -# input=self.input().path, -# n=self.n) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) -# -# -# class FatcatTitles(Refcat): -# """ -# Get a list of non-normalized, sorted titles; ~104min. -# """ -# def requires(self): -# return ReleaseExportReduced() -# -# def run(self): -# output = shellout(r""" -# zstdcat -T0 {input} | -# parallel -j {n} --block 10M --pipe "jq -rc 'select(.title != null and .biblio.doi == null) | -# [.ident, (.title | ltrimstr(\" \") | rtrimstr(\" \") | gsub(\"\\n\"; \" \"))] | @tsv'" | -# zstd -c -T0 > {output} -# """, -# input=self.input().path, -# n=self.n) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) -# -# -# class FatcatTitlesLower(Refcat): -# """ -# Lowercase titles. -# """ -# def requires(self): -# return FatcatTitles() -# -# def run(self): -# output = shellout(""" -# zstdcat -T0 {input} | -# tr '[:upper:]' '[:lower:]' | -# LC_ALL=C sort -k2 | -# zstd -T0 -c > {output} -# """, -# input=self.input().path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) -# -# -# class FatcatSortedKeys(Refcat): -# """ -# Derive key and sort; key derivation (150M docs) took 39min; total 61min. -# """ -# def requires(self): -# return ReleaseExportReduced() -# -# def run(self): -# output = shellout(""" -# zstdcat -T0 {input} | -# skate-derive-key -b 50000 -verbose -f tsand | -# LC_ALL=C sort -k2,2 -S 35% --parallel 6 --compress-program pzstd -T {tmpdir} | -# zstd -T0 -c > {output} -# """, -# tmpdir=self.tmpdir, -# input=self.input().path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) -# -# -# class CommonDOI(Refcat): -# """ -# DOI that appear in the catalog and in the refs. -# """ -# def requires(self): -# return { -# "fatcat": FatcatDOI(), -# "refs": RefsDOI(), -# } -# -# def run(self): -# f1 = shellout("zstdcat -T0 {fatcat} | cut -f2 > {output}", fatcat=self.input().get("fatcat").path) -# f2 = shellout("zstdcat -T0 {refs} | cut -f2 > {output}", refs=self.input().get("refs").path) -# output = shellout(""" LC_ALL=C comm {f1} {f2} | zstd -c > {output}""", f1=f1, f2=f2) -# luigi.LocalTarget(output).move(self.output().path) -# os.remove(f1) -# os.remove(f2) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) -# -# -# class CommonTitles(Refcat): -# def requires(self): -# return { -# "fatcat": FatcatTitles(), -# "refs": RefsTitles(), -# } -# -# def run(self): -# f1 = shellout("zstdcat -T0 {fatcat} | cut -f2 > {output}", fatcat=self.input().get("fatcat")) -# f2 = shellout("zstdcat -T0 {refs} | cut -f2 > {output}", refs=self.input().get("refs")) -# output = shellout(""" LC_ALL=C comm -12 {f1} {f2} | zstd -c > {output}""", f1=f1, f2=f2) -# luigi.LocalTarget(output).move(self.output().path) -# os.remove(f1) -# os.remove(f2) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) -# -# -# class CommonTitlesLower(Refcat): -# def requires(self): -# return { -# "fatcat": FatcatTitlesLower(), -# "refs": RefsTitlesLower(), -# } -# -# def run(self): -# f1 = shellout("zstdcat -T0 {fatcat} | cut -f2 > {output}", fatcat=self.input().get("fatcat").path) -# f2 = shellout("zstdcat -T0 {refs} | cut -f2 > {output}", refs=self.input().get("refs").path) -# output = shellout(""" comm -12 {f1} {f2} | zstd -c > {output}""", f1=f1, f2=f2) -# luigi.LocalTarget(output).move(self.output().path) -# os.remove(f1) -# os.remove(f2) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) -# -# -# class RefsFatcatDOIJoin(Refcat): -# """ -# Join fatcat and refs DOI lists. -# -# Output will be like: -# -# ---- DOI -------------- ------ Fatcat ----------- -------- Refs ------------- -# -# 10.1001/2012.jama.10158 m7eoa3hbivcq5kgzzlepbifbna paygwq34z5hsnm5ypnwp2kz6wq -# 10.1001/2012.jama.10159 xsw5qtrv3jg7pjoj67e3kijtwq 4ug6jvnedbau3nnkhuqegepw2q -# 10.1001/2012.jama.10161 7m7yv5xkkjakxh3wuncqoctphe yllvkrxtgnhnfcyxwbj3swhegu -# 10.1001/2012.jama.10368 dw2djv2qdzecncwmh4o7esg4ie ghgshdzpbranbcwsr4xsh3yfhy -# -# To count the number of citations per DOI, count occurences on the second -# column. -# -# """ -# def requires(self): -# return { -# "fatcat": FatcatDOI(), -# "refs": RefsDOI(), -# } -# -# def run(self): -# output = shellout(""" -# LC_ALL=C join -1 2 -2 2 <(zstdcat -T0 {fatcat}) <(zstdcat -T0 {refs}) | -# zstd -T0 -c > {output} -# """, -# fatcat=self.input().get("fatcat").path, -# refs=self.input().get("refs").path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="tsv.zst")) -# -# -# class RefsFatcatPMIDJoin(Refcat): -# """ -# Join fatcat and refs PMID lists. -# """ -# def requires(self): -# return { -# "fatcat": FatcatPMID(), -# "refs": RefsPMID(), -# } -# -# def run(self): -# output = shellout(""" -# LC_ALL=C join -1 2 -2 2 <(zstdcat -T0 {fatcat}) <(zstdcat -T0 {refs}) | -# zstd -c -T0 > {output} -# """, -# fatcat=self.input().get("fatcat").path, -# refs=self.input().get("refs").path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="tsv.zst")) -# -# -# class RefsFatcatPMCIDJoin(Refcat): -# """ -# Join fatcat and refs PMCID lists. -# """ -# def requires(self): -# return { -# "fatcat": FatcatPMCID(), -# "refs": RefsPMCID(), -# } -# -# def run(self): -# output = shellout(""" -# LC_ALL=C join -1 2 -2 2 <(zstdcat -T0 {fatcat}) <(zstdcat -T0 {refs}) | -# zstd -c -T0 > {output} -# """, -# fatcat=self.input().get("fatcat").path, -# refs=self.input().get("refs").path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="tsv.zst")) -# -# -# class RefsFatcatArxivJoin(Refcat): -# """ -# Join fatcat, refs on arxiv (base) id. -# """ -# def requires(self): -# return { -# "fatcat": FatcatArxiv(), -# "refs": RefsArxiv(), -# } -# -# def run(self): -# # TODO: We want a zippy join here (e.g. to generate biblioref docs). -# output = shellout(""" -# LC_ALL=C join -1 2 -2 2 <(zstdcat -T0 {fatcat}) <(zstdcat -T0 {refs}) | -# zstd -c -T0 > {output} -# """, -# fatcat=self.input().get("fatcat").path, -# refs=self.input().get("refs").path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="tsv.zst")) -# -# -# class RefsFatcatTitleLowerJoin(Refcat): -# """ -# Join fatcat and refs titles. -# -# Output will be textfile (title, fatcat ident, refs ident). XXX: need to -# filter out too common titles first. -# """ -# def requires(self): -# return { -# "fatcat": FatcatTitlesLower(), -# "refs": RefsTitlesLower(), -# } -# -# def run(self): -# output = shellout(""" -# LC_ALL=C join -1 2 -2 2 {fatcat} {refs} > {output} -# """, -# fatcat=self.input().get("fatcat").path, -# refs=self.input().get("refs").path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="tsv")) -# -# -# class RefsFatcatGroupJoin(Refcat): -# """ -# Concat joins. -# -# 10.1001/2012.jama.11274 of7donzkmrbiddbyrr4guqbzum nncja4imynb4rajadrlbnoklxy -# 10.1001/2012.jama.11274 of7donzkmrbiddbyrr4guqbzum noimcv5xdzd6hfqu2mebcrzr34 -# 10.1001/2012.jama.11274 of7donzkmrbiddbyrr4guqbzum nqzg5lgdxvbhniy2hajlqd3aqi -# ... -# """ -# def requires(self): -# return [RefsFatcatDOIJoin(), RefsFatcatPMIDJoin(), RefsFatcatArxivJoin(), RefsFatcatPMCIDJoin()] -# -# def run(self): -# _, tmpf = tempfile.mkstemp(prefix="refcat-") -# for target in self.input(): -# shellout("""cat {file} >> {output}""", file=target.path, output=tmpf) -# luigi.LocalTarget(tmpf).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="tsv.zst")) -# -# -# class RefsFatcatRanked(Refcat): -# """ -# Inbound count, ident; 32m34.142s. -# -# 15175 ui64apmob5gnrfwe7pwgk7egju -# 15167 cejzj3ddszcdrmij7np36am5fa -# 15165 2b2ok43pirduva7ai3745k5xa4 -# 15158 cn4c33ctb5g5fax3touxjdmfle -# 15155 rrlbmbro4rhwri3zawz3uhp5va -# 15138 o62kjogy4zdyrlvy7cu7rlcs3m -# """ -# def requires(self): -# return RefsFatcatGroupJoin() -# -# def run(self): -# output = shellout(""" -# zstdcat -T0 {file} | -# LC_ALL=C sort -k2,3 -u | -# LC_ALL=C cut -d ' ' -f 2 | -# LC_ALL=C uniq -c | -# LC_ALL=C sort -nr > {output} -# """, -# file=self.input().path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="tsv")) -# -# -# # +# dBBBBBb dBBBBBBP dBBBBBBP dBP dBBBP +# BB +# dBP BB dBP dBP dBP dBP +# dBP BB dBP dBP dBP dBP +# dBBBBBBB dBP dBP dBP dBBBBP + +class URLList(Refcat): + """ + TSV URL extracted, 44368911. + """ + def requires(self): + return URLTabs() + + def run(self): + stats = collections.Counter() + with self.input().open("rb") as f: + with self.output().open("w") as output: + for i, line in enumerate(f, start=1): + parts = line.decode("utf-8").strip().split("\t") + if len(parts) != 3: + stats["no-url"] += 1 + continue + urls = extract_urls(parts[2]) + stats["found-{}".format(len(urls))] += 1 + for link in urls: + link = link + "\n" + output.write(link.encode("utf-8")) + self.logger.debug(json.dumps(dict(stats))) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) + + +class RefsDOI(Refcat): + """ + TSV with (ident, doi, full doc). + """ + def requires(self): + return RefsWithUnstructured() + + def run(self): + """ + Note: we want the full JSON document, so we use jq tostring, which + escapes "too much", hence we need to clean up with sed, unfortunately. + """ + # XXX: skate-doi could be an awk function, too. + # XXX: jq tostring might escape too much + output = shellout(r""" + zstdcat -T0 {input} | + LC_ALL=C tr -d '\t' | + parallel -j {n} --block 10M --pipe "jq -rc 'select(.biblio.doi != null) | [.release_ident, .biblio.doi, (.|tostring)] | @tsv'" | + LC_ALL=C sed 's/\\\\/\\/g' | + LC_ALL=C awk -F $'\t' -v OFS='\t' '$2=tolower($2)' | + skate-to-doi -B -S -f 2 | + LC_ALL=C sort -S 30% --parallel 6 -T {tmpdir} -k2,2 | + zstd -c -T0 > {output} + """, + tmpdir=self.tmpdir, + n=self.n, + input=self.input().path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) + + +class RefsPMID(Refcat): + """ + List of PMID, 74M refs seem to have one. + """ + def requires(self): + return RefsWithUnstructured() + + def run(self): + output = shellout(r""" + zstdcat -T0 {input} | + parallel -j {n} --block 10M --pipe "jq -rc 'select(.biblio.pmid != null and .biblio.doi == null) | [.release_ident, .biblio.pmid, (.|tostring)] | @tsv'" | + LC_ALL=C sed 's/\\\\/\\/g' | + LC_ALL=C sort -S 30% -T {tmpdir} -k2,2 | + zstd -T0 -c > {output} + """, + tmpdir=self.tmpdir, + n=self.n, + input=self.input().path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) + + +class RefsPMCID(Refcat): + """ + List of PMCID. + """ + def requires(self): + return RefsWithUnstructured() + + def run(self): + output = shellout(r""" + zstdcat -T0 {input} | + parallel -j {n} --block 10M --pipe "jq -rc 'select(.biblio.pmcid != null and .biblio.doi == null) | [.release_ident, .biblio.pmcid, (.|tostring)] | @tsv'" | + LC_ALL=C sed 's/\\\\/\\/g' | + LC_ALL=C sed -e 's@PMC@@g' | + LC_ALL=C sort -S 30% -T {tmpdir} -k2,2 | + zstd -T0 -c > {output} + """, + tmpdir=self.tmpdir, + n=self.n, + input=self.input().path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) + + +class RefsArxiv(Refcat): + """ + List of arxiv ids from refs. + """ + def requires(self): + return RefsWithUnstructured() + + def run(self): + output = shellout(r""" + zstdcat -T0 {input} | + parallel -j {n} --block 10M --pipe "jq -rc 'select(.biblio.arxiv_id != null and .biblio.doi == null) | [.release_ident, .biblio.arxiv_id, (.|tostring)] | @tsv'" | + LC_ALL=C sed 's/\\\\/\\/g' | + LC_ALL=C sort -S 30% -k2,2 -T {tmpdir} | + zstd -T0 -c > {output} + """, + tmpdir=self.tmpdir, + n=self.n, + input=self.input().path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) + + +class RefsTitles(Refcat): + """ + Extract titles. + + Contains many artifacts, e.g.: ! Accurate! and! efficient! insertional! + RNA!editing!in!isolated!Physarum!mitochondria.!RNA* + """ + def requires(self): + return RefsWithUnstructured() + + def run(self): + output = shellout(r""" + zstdcat -T0 {input} | + parallel -j {n} --block 10M --pipe "jq -rc 'select(.biblio.title != null and .biblio.doi == null) | + [.release_ident, (.biblio.title | ltrimstr(\" \") | rtrimstr(\" \") | gsub(\"\\n\"; \" \"))] | @tsv'" | + zstd -c -T0 > {output} + """, + input=self.input().path, + n=self.n) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) + + +class RefsTitlesLower(Refcat): + """ + Unique lowercase titles; 223m46.443s. + """ + def requires(self): + return RefsTitles() + + def run(self): + output = shellout(""" + zstdcat -T0 {input} | + tr '[:upper:]' '[:lower:]' | + LC_ALL=C sort -k2 | + zstd -T0 -c > {output} + """, + input=self.input().path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) + + +class FatcatDOI(Refcat): + """ + List of DOIs, lowercase on the fly. + """ + def requires(self): + return ReleaseExportReduced() + + def run(self): + output = shellout(r""" + zstdcat -T0 {input} | + parallel -j {n} --block 10M --pipe "jq -rc 'select(.ext_ids.doi != null) | [.ident, .ext_ids.doi, (.|tostring)] | @tsv'" | + LC_ALL=C sed 's/\\\\/\\/g' | + LC_ALL=C awk -F $'\t' -v OFS='\t' '$2=tolower($2)' | + LC_ALL=C sort -S 25% --parallel 6 -k2,2 -T {tmpdir} | + zstd -c -T0 > {output} + """, + tmpdir=self.tmpdir, + input=self.input().path, + n=self.n) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) + + +class FatcatPMID(Refcat): + """ + List of PMID. + """ + def requires(self): + return ReleaseExportReduced() + + def run(self): + output = shellout(r""" + zstdcat -T0 {input} | + parallel -j {n} --block 10M --pipe "jq -rc 'select(.ext_ids.pmid != null) | [.ident, .ext_ids.pmid, (.|tostring)] | @tsv'" | + LC_ALL=C sed 's/\\\\/\\/g' | + LC_ALL=C sort -S 30% -T {tmpdir} -k2,2 | + zstd -c -T0 > {output} + """, + tmpdir=self.tmpdir, + input=self.input().path, + n=self.n) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) + + +class FatcatPMCID(Refcat): + """ + List of PMCID. + """ + def requires(self): + return ReleaseExportReduced() + + def run(self): + output = shellout(r""" + zstdcat -T0 {input} | + parallel -j {n} --block 10M --pipe "jq -rc 'select(.ext_ids.pmcid != null) | [.ident, .ext_ids.pmcid, (.|tostring)] | @tsv'" | + LC_ALL=C sed 's/\\\\/\\/g' | + LC_ALL=C sed -e 's@PMC@@g' | + LC_ALL=C sort -S 30% -T {tmpdir} -k2,2 | + zstd -c -T0 > {output} + """, + tmpdir=self.tmpdir, + input=self.input().path, + n=self.n) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) + + +class FatcatArxiv(Refcat): + """ + List of arxiv ids. + """ + def requires(self): + return ReleaseExportReduced() + + def run(self): + output = shellout(r""" + zstdcat -T0 {input} | + parallel -j {n} --block 10M --pipe "jq -rc 'select(.extra.arxiv.base_id != null) | [.ident, .extra.arxiv.base_id, (.|tostring)] | @tsv'" | + LC_ALL=C sed 's/\\\\/\\/g' | + LC_ALL=C sort -S 30% -k2,2 -T {tmpdir} | + zstd -c -T0 > {output}""", + tmpdir=self.tmpdir, + input=self.input().path, + n=self.n) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) + + +class FatcatTitles(Refcat): + """ + Get a list of non-normalized, sorted titles; ~104min. + """ + def requires(self): + return ReleaseExportReduced() + + def run(self): + output = shellout(r""" + zstdcat -T0 {input} | + parallel -j {n} --block 10M --pipe "jq -rc 'select(.title != null and .biblio.doi == null) | + [.ident, (.title | ltrimstr(\" \") | rtrimstr(\" \") | gsub(\"\\n\"; \" \"))] | @tsv'" | + zstd -c -T0 > {output} + """, + input=self.input().path, + n=self.n) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) + + +class FatcatTitlesLower(Refcat): + """ + Lowercase titles. + """ + def requires(self): + return FatcatTitles() + + def run(self): + output = shellout(""" + zstdcat -T0 {input} | + tr '[:upper:]' '[:lower:]' | + LC_ALL=C sort -k2 | + zstd -T0 -c > {output} + """, + input=self.input().path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) + + +class FatcatSortedKeys(Refcat): + """ + Derive key and sort; key derivation (150M docs) took 39min; total 61min. + """ + def requires(self): + return ReleaseExportReduced() + + def run(self): + output = shellout(""" + zstdcat -T0 {input} | + skate-derive-key -b 50000 -verbose -f tsand | + LC_ALL=C sort -k2,2 -S 35% --parallel 6 --compress-program pzstd -T {tmpdir} | + zstd -T0 -c > {output} + """, + tmpdir=self.tmpdir, + input=self.input().path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) + + +class CommonDOI(Refcat): + """ + DOI that appear in the catalog and in the refs. + """ + def requires(self): + return { + "fatcat": FatcatDOI(), + "refs": RefsDOI(), + } + + def run(self): + f1 = shellout("zstdcat -T0 {fatcat} | cut -f2 > {output}", fatcat=self.input().get("fatcat").path) + f2 = shellout("zstdcat -T0 {refs} | cut -f2 > {output}", refs=self.input().get("refs").path) + output = shellout(""" LC_ALL=C comm {f1} {f2} | zstd -c > {output}""", f1=f1, f2=f2) + luigi.LocalTarget(output).move(self.output().path) + os.remove(f1) + os.remove(f2) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) + + +class CommonTitles(Refcat): + def requires(self): + return { + "fatcat": FatcatTitles(), + "refs": RefsTitles(), + } + + def run(self): + f1 = shellout("zstdcat -T0 {fatcat} | cut -f2 > {output}", fatcat=self.input().get("fatcat")) + f2 = shellout("zstdcat -T0 {refs} | cut -f2 > {output}", refs=self.input().get("refs")) + output = shellout(""" LC_ALL=C comm -12 {f1} {f2} | zstd -c > {output}""", f1=f1, f2=f2) + luigi.LocalTarget(output).move(self.output().path) + os.remove(f1) + os.remove(f2) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) + + +class CommonTitlesLower(Refcat): + def requires(self): + return { + "fatcat": FatcatTitlesLower(), + "refs": RefsTitlesLower(), + } + + def run(self): + f1 = shellout("zstdcat -T0 {fatcat} | cut -f2 > {output}", fatcat=self.input().get("fatcat").path) + f2 = shellout("zstdcat -T0 {refs} | cut -f2 > {output}", refs=self.input().get("refs").path) + output = shellout(""" comm -12 {f1} {f2} | zstd -c > {output}""", f1=f1, f2=f2) + luigi.LocalTarget(output).move(self.output().path) + os.remove(f1) + os.remove(f2) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) + + +class RefsFatcatDOIJoin(Refcat): + """ + Join fatcat and refs DOI lists. + + Output will be like: + + ---- DOI -------------- ------ Fatcat ----------- -------- Refs ------------- + + 10.1001/2012.jama.10158 m7eoa3hbivcq5kgzzlepbifbna paygwq34z5hsnm5ypnwp2kz6wq + 10.1001/2012.jama.10159 xsw5qtrv3jg7pjoj67e3kijtwq 4ug6jvnedbau3nnkhuqegepw2q + 10.1001/2012.jama.10161 7m7yv5xkkjakxh3wuncqoctphe yllvkrxtgnhnfcyxwbj3swhegu + 10.1001/2012.jama.10368 dw2djv2qdzecncwmh4o7esg4ie ghgshdzpbranbcwsr4xsh3yfhy + + To count the number of citations per DOI, count occurences on the second + column. + + """ + def requires(self): + return { + "fatcat": FatcatDOI(), + "refs": RefsDOI(), + } + + def run(self): + output = shellout(""" + LC_ALL=C join -1 2 -2 2 <(zstdcat -T0 {fatcat}) <(zstdcat -T0 {refs}) | + zstd -T0 -c > {output} + """, + fatcat=self.input().get("fatcat").path, + refs=self.input().get("refs").path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="tsv.zst")) + + +class RefsFatcatPMIDJoin(Refcat): + """ + Join fatcat and refs PMID lists. + """ + def requires(self): + return { + "fatcat": FatcatPMID(), + "refs": RefsPMID(), + } + + def run(self): + output = shellout(""" + LC_ALL=C join -1 2 -2 2 <(zstdcat -T0 {fatcat}) <(zstdcat -T0 {refs}) | + zstd -c -T0 > {output} + """, + fatcat=self.input().get("fatcat").path, + refs=self.input().get("refs").path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="tsv.zst")) + + +class RefsFatcatPMCIDJoin(Refcat): + """ + Join fatcat and refs PMCID lists. + """ + def requires(self): + return { + "fatcat": FatcatPMCID(), + "refs": RefsPMCID(), + } + + def run(self): + output = shellout(""" + LC_ALL=C join -1 2 -2 2 <(zstdcat -T0 {fatcat}) <(zstdcat -T0 {refs}) | + zstd -c -T0 > {output} + """, + fatcat=self.input().get("fatcat").path, + refs=self.input().get("refs").path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="tsv.zst")) + + +class RefsFatcatArxivJoin(Refcat): + """ + Join fatcat, refs on arxiv (base) id. + """ + def requires(self): + return { + "fatcat": FatcatArxiv(), + "refs": RefsArxiv(), + } + + def run(self): + # TODO: We want a zippy join here (e.g. to generate biblioref docs). + output = shellout(""" + LC_ALL=C join -1 2 -2 2 <(zstdcat -T0 {fatcat}) <(zstdcat -T0 {refs}) | + zstd -c -T0 > {output} + """, + fatcat=self.input().get("fatcat").path, + refs=self.input().get("refs").path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="tsv.zst")) + + +class RefsFatcatTitleLowerJoin(Refcat): + """ + Join fatcat and refs titles. + + Output will be textfile (title, fatcat ident, refs ident). XXX: need to + filter out too common titles first. + """ + def requires(self): + return { + "fatcat": FatcatTitlesLower(), + "refs": RefsTitlesLower(), + } + + def run(self): + output = shellout(""" + LC_ALL=C join -1 2 -2 2 {fatcat} {refs} > {output} + """, + fatcat=self.input().get("fatcat").path, + refs=self.input().get("refs").path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="tsv")) + + +class RefsFatcatGroupJoin(Refcat): + """ + Concat joins. + + 10.1001/2012.jama.11274 of7donzkmrbiddbyrr4guqbzum nncja4imynb4rajadrlbnoklxy + 10.1001/2012.jama.11274 of7donzkmrbiddbyrr4guqbzum noimcv5xdzd6hfqu2mebcrzr34 + 10.1001/2012.jama.11274 of7donzkmrbiddbyrr4guqbzum nqzg5lgdxvbhniy2hajlqd3aqi + ... + """ + def requires(self): + return [RefsFatcatDOIJoin(), RefsFatcatPMIDJoin(), RefsFatcatArxivJoin(), RefsFatcatPMCIDJoin()] + + def run(self): + _, tmpf = tempfile.mkstemp(prefix="refcat-") + for target in self.input(): + shellout("""cat {file} >> {output}""", file=target.path, output=tmpf) + luigi.LocalTarget(tmpf).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="tsv.zst")) + + +class RefsFatcatRanked(Refcat): + """ + Inbound count, ident; 32m34.142s. + + 15175 ui64apmob5gnrfwe7pwgk7egju + 15167 cejzj3ddszcdrmij7np36am5fa + 15165 2b2ok43pirduva7ai3745k5xa4 + 15158 cn4c33ctb5g5fax3touxjdmfle + 15155 rrlbmbro4rhwri3zawz3uhp5va + 15138 o62kjogy4zdyrlvy7cu7rlcs3m + """ + def requires(self): + return RefsFatcatGroupJoin() + + def run(self): + output = shellout(""" + zstdcat -T0 {file} | + LC_ALL=C sort -k2,3 -u | + LC_ALL=C cut -d ' ' -f 2 | + LC_ALL=C uniq -c | + LC_ALL=C sort -nr > {output} + """, + file=self.input().path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="tsv")) + + +# +# +# # TODO: merge refs docs and release docs, maybe add an source label, then +# # cluster; run verify and report on the number of similar records; generate a list of common titles # # -# # # TODO: merge refs docs and release docs, maybe add an source label, then -# # # cluster; run verify and report on the number of similar records; generate a list of common titles -# # # -# # # TODO: find non-matched items and check for any pattern +# # TODO: find non-matched items and check for any pattern +# +# +class RefsCounter(Refcat): + """ + Key counts, see: ref_counter.py. + """ + def requires(self): + return RefsWithUnstructured() + + def run(self): + counts = collections.Counter() + with self.input().open("r") as f: + for i, line in enumerate(f): + obj = json.loads(line) + counts['total'] += 1 + for k in obj.keys(): + if k == 'biblio': + continue + elif k == 'ref_source': + counts["source_" + obj[k]] += 1 + elif obj.get(k): + counts["has_" + k] += 1 + biblio = obj.get('biblio') + if not biblio: + continue + for k in biblio.keys(): + if biblio.get(k): + counts["has_" + k] += 1 + if biblio.get('doi') or biblio.get('pmcid') or biblio.get('pmid') or biblio.get('arxiv_id'): + counts['has_any_extid'] += 1 + if biblio.get('container_name') and biblio.get('volume') and biblio.get('issue') and biblio.get('pages'): + counts['has_container_volume_issue_pages'] += 1 + if biblio.get('title') and biblio.get('contrib_raw_names') and biblio.get('year'): + counts['has_title_contrib_year'] += 1 + if biblio.get('container_name') and biblio.get('contrib_raw_names') and biblio.get('year'): + counts['has_contrib_container_year'] += 1 + if biblio.get('title') and biblio.get('container_name') and biblio.get('year'): + counts['has_title_container_year'] += 1 + + if i % 1000000 == 0: + print(json.dumps(counts, indent=4, sort_keys=True), file=sys.stderr) + + with self.output().open("w") as output: + json.dump(counts, output) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="json")) + + +class RefsKeyStats(Refcat): + """ + How many titles, DOI, etc. do we have in refs? + """ + def requires(self): + return RefsWithUnstructured() + + def run(self): + stats = { + "total": 0, + "no_biblio": 0, + "stats": collections.Counter(), + } + with self.input().open("r") as f: + for i, line in enumerate(f): + stats["total"] += 1 + doc = json.loads(line) + if "biblio" not in doc: + stats["no_biblio"] += 1 + continue + biblio = doc["biblio"] + key = "|".join(sorted(biblio.keys())) + stats["stats"][key] += 1 + if i % 1000000 == 0: + print(json.dumps(stats, indent=4, sort_keys=True), file=sys.stderr) + + with self.output().open("w") as output: + json.dumps(stats, output) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="json")) + + +class RefsToRelease(Refcat): + """ + Convert a refs doc into a minimalistic release entity. Requires "skate" + tools - XXX: polish. + """ + def requires(self): + return RefsWithUnstructured() + + def run(self): + output = shellout(""" + zstdcat -T0 {input} | + skate-conv -f ref -w 24 -b 100000 | + zstd -T0 -c > {output} + """, + input=self.input().path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) + + +class RefsSortedKeys(Refcat): + """ + Derive key and sort; 1.8B json docs, took: 255min; 122k/s; key extration + almost 3h (might be faster with rust); 90G compressed. + + Keys based on title will have many empty keys; e.g. "2021-02-20", + 838,057,412 docs have no key. + """ + def requires(self): + return RefsToRelease() + + def run(self): + output = shellout(""" + zstdcat -T0 {input} | + skate-derive-key -skip-empty-keys -b 50000 -verbose -f tsand | + LC_ALL=C sort -k2,2 -S 35% --parallel 6 --compress-program pzstd -T {tmpdir} | + zstd -T0 -c > {output} + """, + tmpdir=self.tmpdir, + input=self.input().path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) + + +class RefsReleasesMerged(Refcat): + """ + Merge release and refs (in release form). + + wc: 1579687186 53137849922 913692185284 + """ + def requires(self): + return { + "release": ReleaseExportReduced(), + "refs": RefsToRelease(), + } + + def run(self): + _, f = tempfile.mkstemp(prefix="refcat-") + for k, v in self.input().items(): + shellout("cat {input} >> {output}", input=v.path, output=f) + luigi.LocalTarget(f).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) + + +class RefsTitleFrequency(Refcat): + """ + Dig into common titles. + """ + tmpdir = luigi.Parameter(default="/fast/tmp", description="set tempdir", significant=False) + + def requires(self): + return RefsTitlesLower() + + def run(self): + output = shellout(""" + zstdcat -T0 {input} | + LC_ALL=C cut -f2 | + LC_ALL=C sort -T {tmpdir} -S20% --compress-program pzstd --parallel 6 | + LC_ALL=C uniq -c | + LC_ALL=C sort -nr | + zstd -c9 > {output} + """, + tmpdir=self.tmpdir, + input=self.input().path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) + + +# # XXX: After RefsReleasesMerged, we want to cluster. +# # python -m fuzzycat cluster -t tsandcrawler < data/re.json > cluster.json.zst # # -# # -# class RefsCounter(Refcat): -# """ -# Key counts, see: ref_counter.py. -# """ -# def requires(self): -# return RefsWithUnstructured() -# -# def run(self): -# counts = collections.Counter() -# with self.input().open("r") as f: -# for i, line in enumerate(f): -# obj = json.loads(line) -# counts['total'] += 1 -# for k in obj.keys(): -# if k == 'biblio': -# continue -# elif k == 'ref_source': -# counts["source_" + obj[k]] += 1 -# elif obj.get(k): -# counts["has_" + k] += 1 -# biblio = obj.get('biblio') -# if not biblio: -# continue -# for k in biblio.keys(): -# if biblio.get(k): -# counts["has_" + k] += 1 -# if biblio.get('doi') or biblio.get('pmcid') or biblio.get('pmid') or biblio.get('arxiv_id'): -# counts['has_any_extid'] += 1 -# if biblio.get('container_name') and biblio.get('volume') and biblio.get('issue') and biblio.get('pages'): -# counts['has_container_volume_issue_pages'] += 1 -# if biblio.get('title') and biblio.get('contrib_raw_names') and biblio.get('year'): -# counts['has_title_contrib_year'] += 1 -# if biblio.get('container_name') and biblio.get('contrib_raw_names') and biblio.get('year'): -# counts['has_contrib_container_year'] += 1 -# if biblio.get('title') and biblio.get('container_name') and biblio.get('year'): -# counts['has_title_container_year'] += 1 -# -# if i % 1000000 == 0: -# print(json.dumps(counts, indent=4, sort_keys=True), file=sys.stderr) -# -# with self.output().open("w") as output: -# json.dump(counts, output) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="json")) -# -# -# class RefsKeyStats(Refcat): -# """ -# How many titles, DOI, etc. do we have in refs? -# """ -# def requires(self): -# return RefsWithUnstructured() -# -# def run(self): -# stats = { -# "total": 0, -# "no_biblio": 0, -# "stats": collections.Counter(), -# } -# with self.input().open("r") as f: -# for i, line in enumerate(f): -# stats["total"] += 1 -# doc = json.loads(line) -# if "biblio" not in doc: -# stats["no_biblio"] += 1 -# continue -# biblio = doc["biblio"] -# key = "|".join(sorted(biblio.keys())) -# stats["stats"][key] += 1 -# if i % 1000000 == 0: -# print(json.dumps(stats, indent=4, sort_keys=True), file=sys.stderr) -# -# with self.output().open("w") as output: -# json.dumps(stats, output) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="json")) -# -# -# class RefsToRelease(Refcat): -# """ -# Convert a refs doc into a minimalistic release entity. Requires "skate" -# tools - XXX: polish. -# """ -# def requires(self): -# return RefsWithUnstructured() -# -# def run(self): -# output = shellout(""" -# zstdcat -T0 {input} | -# skate-conv -f ref -w 24 -b 100000 | -# zstd -T0 -c > {output} -# """, -# input=self.input().path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) -# -# -# class RefsSortedKeys(Refcat): -# """ -# Derive key and sort; 1.8B json docs, took: 255min; 122k/s; key extration -# almost 3h (might be faster with rust); 90G compressed. -# -# Keys based on title will have many empty keys; e.g. "2021-02-20", -# 838,057,412 docs have no key. -# """ -# def requires(self): -# return RefsToRelease() -# -# def run(self): -# output = shellout(""" -# zstdcat -T0 {input} | -# skate-derive-key -skip-empty-keys -b 50000 -verbose -f tsand | -# LC_ALL=C sort -k2,2 -S 35% --parallel 6 --compress-program pzstd -T {tmpdir} | -# zstd -T0 -c > {output} -# """, -# tmpdir=self.tmpdir, -# input=self.input().path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) -# -# -# class RefsReleasesMerged(Refcat): -# """ -# Merge release and refs (in release form). -# -# wc: 1579687186 53137849922 913692185284 -# """ -# def requires(self): -# return { -# "release": ReleaseExportReduced(), -# "refs": RefsToRelease(), -# } -# -# def run(self): -# _, f = tempfile.mkstemp(prefix="refcat-") -# for k, v in self.input().items(): -# shellout("cat {input} >> {output}", input=v.path, output=f) -# luigi.LocalTarget(f).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) -# -# -# class RefsTitleFrequency(Refcat): -# """ -# Dig into common titles. -# """ -# tmpdir = luigi.Parameter(default="/fast/tmp", description="set tempdir", significant=False) -# -# def requires(self): -# return RefsTitlesLower() -# -# def run(self): -# output = shellout(""" -# zstdcat -T0 {input} | -# LC_ALL=C cut -f2 | -# LC_ALL=C sort -T {tmpdir} -S20% --compress-program pzstd --parallel 6 | -# LC_ALL=C uniq -c | -# LC_ALL=C sort -nr | -# zstd -c9 > {output} -# """, -# tmpdir=self.tmpdir, -# input=self.input().path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) -# -# -# # # XXX: After RefsReleasesMerged, we want to cluster. -# # # python -m fuzzycat cluster -t tsandcrawler < data/re.json > cluster.json.zst -# # # -# # # Note: First run with no compression filled the disk, add zstd to fuzzycat. -# -# -# class RefsFatcatSortedKeys(Refcat): -# """ -# Extract keys and sort. -# """ -# def requires(self): -# return RefsReleasesMerged() -# -# def run(self): -# output = shellout(""" -# zstdcat -T0 {input} | -# skate-derive-key -skip-empty-keys -b 50000 -verbose -f tsand | -# LC_ALL=C sort -k2,2 -S 35% --parallel 6 --compress-program pzstd -T {tmpdir} | -# zstd -T0 -c > {output} -# """, -# tmpdir=self.tmpdir, -# input=self.input().path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) -# -# -# class RefsFatcatClusters(Refcat): -# """ -# Group by clusters. Full set will be ~90GB compressed, about 40M clusters -# (already filtered, so 2+ docs only, with at least on ref and one release, etc). -# """ -# def requires(self): -# return RefsFatcatSortedKeys() -# -# def run(self): -# output = shellout(""" -# zstdcat -T0 {input} | -# skate-cluster -both | -# zstd -T0 -c9 > {output} -# """, -# tmpdir=self.tmpdir, -# input=self.input().path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) -# -# -# # ==== new style zippy biblioref generation -# -# -# class BiblioRefFromFuzzyClusters(Refcat): -# """ -# Use "bref" mode to generate a biblioref document from verified clusters. -# """ -# def requires(self): -# return RefsFatcatClusters() -# -# def run(self): -# output = shellout(""" -# zstdcat -T0 {input} | -# skate-verify -m bref > {output} -# """, -# input=self.input().path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) -# -# -# class BiblioRefZippyDOI(Refcat): -# """ -# Generate proposed biblioref docs from two sorted key files, sorted by DOI. -# """ -# def requires(self): -# return { -# "refs": RefsDOI(), -# "releases": FatcatDOI(), -# } -# -# def run(self): -# output = shellout(r""" -# skate-verify -m exact -r doi -R <(zstdcat -T0 {releases}) -F <(zstdcat -T0 {refs}) | -# zstd -c -T0 > {output} -# """, -# releases=self.input().get("releases").path, -# refs=self.input().get("refs").path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) -# -# -# class BiblioRefZippyArxiv(Refcat): -# """ -# Generate proposed biblioref docs from two sorted key files, sorted by DOI. -# """ -# def requires(self): -# return { -# "refs": RefsArxiv(), -# "releases": FatcatArxiv(), -# } -# -# def run(self): -# output = shellout(r""" -# skate-verify -m exact -r arxiv -R <(zstdcat -T0 {releases}) -F <(zstdcat -T0 {refs}) | -# zstd -c -T0 > {output} -# """, -# releases=self.input().get("releases").path, -# refs=self.input().get("refs").path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) -# -# -# class BiblioRefZippyPMID(Refcat): -# """ -# Generate proposed biblioref docs from two sorted key files, sorted by DOI. -# """ -# def requires(self): -# return { -# "refs": RefsPMID(), -# "releases": FatcatPMID(), -# } -# -# def run(self): -# output = shellout(r""" -# skate-verify -m exact -r pmid -R <(zstdcat -T0 {releases}) -F <(zstdcat -T0 {refs}) | -# zstd -c -T0 > {output} -# """, -# releases=self.input().get("releases").path, -# refs=self.input().get("refs").path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) -# -# -# class BiblioRefZippyPMCID(Refcat): -# """ -# Generate proposed biblioref docs from two sorted key files, sorted by DOI. -# """ -# def requires(self): -# return { -# "refs": RefsPMCID(), -# "releases": FatcatPMCID(), -# } -# -# def run(self): -# output = shellout(r""" -# skate-verify -m exact -r pmcid -R <(zstdcat -T0 {releases}) -F <(zstdcat -T0 {refs}) | -# zstd -c -T0 > {output} -# """, -# releases=self.input().get("releases").path, -# refs=self.input().get("refs").path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) -# -# -# class BiblioRefV2(Refcat): -# """ -# A v1 set of biblioref schema docs. -# """ -# def requires(self): -# return [BiblioRefZippyDOI(), BiblioRefZippyArxiv(), BiblioRefZippyPMID(), BiblioRefZippyPMCID(), BiblioRefFromFuzzyClusters()] -# -# def run(self): -# _, tmpf = tempfile.mkstemp(prefix="refcat-") -# for target in self.input(): -# shellout(""" -# zstdcat -T0 {input} | -# skate-bref-id | -# zstd -T0 >> {output} -# """, -# input=target.path, -# output=tmpf) -# luigi.LocalTarget(tmpf).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) -# -# -# # ==== V3 related -# -# -# # ==== RG title match example -# -# -# class RGSitemapToRelease(Refcat): -# """ -# Turn sitemap data to skeleton release. -# """ -# def run(self): -# link = "https://archive.org/download/rg_sitemap_2021_02_23/rg_sitemap_2021_02_23.ndj.zst" -# output = shellout(""" -# curl -sL {link} | -# zstdcat -T0 | -# parallel --block 10M -j 16 --pipe "jq -rc '{{\"title\": .title, \"extra\": {{\"rg\": {{\"sitemap\": true}}}}}}'" | -# zstd -T0 -c > {output} -# """, -# link=link) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) -# -# -# class RGSitemapFatcatMerged(Refcat): -# """ -# A minimal combined fatcat and RG dataset. -# """ -# def requires(self): -# return [RGSitemapToRelease(), ReleaseExportTitleOnly()] -# -# def run(self): -# _, tmpf = tempfile.mkstemp(prefix="refcat-") -# for target in self.input(): -# shellout("""cat {file} >> {output}""", file=target.path, output=tmpf) -# luigi.LocalTarget(tmpf).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) -# -# -# class RGSitemapFatcatSortedKeys(Refcat): -# """ -# Extract keys and sort. -# """ -# def requires(self): -# return RGSitemapFatcatMerged() -# -# def run(self): -# output = shellout(""" -# zstdcat -T0 {input} | -# skate-derive-key -b 50000 -verbose -f tsand | -# LC_ALL=C sort -k2,2 -S 35% --parallel 6 --compress-program pzstd -T {tmpdir} | -# zstd -T0 -c > {output}""", -# tmpdir=self.tmpdir, -# input=self.input().path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) -# -# -# # ==== MAG -# -# -# class MAGDOI(Refcat): -# """ -# List of MAG DOI. -# """ -# def requires(self): -# return MAGPapers() -# -# def run(self): -# output = shellout(""" -# unpigz -c {input} | -# cut -f3 | -# grep -v ^$ | -# zstd -T0 -c > {output} -# """, -# input=self.input().path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) -# -# -# # ==== WikipediaCitations -# -# -# class BiblioRefWikiDOISortedKeys(Refcat): -# """ -# Sorted DOI keys from wikipedia. -# """ -# def requires(self): -# return WikipediaCitationsMinimalDataset() -# -# def run(self): -# output = shellout(""" -# cat {input} | -# skate-wikipedia-doi | -# LC_ALL=C sort -S 10% -k2,2 | -# zstd -T0 -c > {output} -# """, -# input=self.input().path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) -# -# -# class BiblioRefWiki(Refcat): -# def requires(self): -# return { -# "wiki": BiblioRefWikiDOISortedKeys(), -# "releases": FatcatDOI(), -# } -# -# def run(self): -# output = shellout(r""" -# skate-verify -m wiki -r doi -R <(zstdcat -T0 {releases}) -W <(zstdcat -T0 {wiki}) | -# zstd -c -T0 > {output} -# """, -# releases=self.input().get("releases").path, -# wiki=self.input().get("wiki").path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) -# -# -# # ==== Prepare unmatched -# -# -# class BiblioRefSortedIdent(Refcat): -# def requires(self): -# return BiblioRefV2() -# -# def run(self): -# output = shellout(""" -# zstdcat -T0 {input} | -# skate-derive-key -b 50000 -verbose -F source_release_ident | -# LC_ALL=C sort -k2,2 -S 35% --parallel 6 --compress-program pzstd -T {tmpdir} | -# zstd -T0 -c > {output} -# """, -# tmpdir=self.tmpdir, -# input=self.input().path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) -# -# -# class RefsSortedIdent(Refcat): -# def requires(self): -# return RefsWithUnstructured() -# -# def run(self): -# output = shellout(""" -# zstdcat -T0 {input} | -# skate-derive-key -b 50000 -verbose -F release_ident | -# LC_ALL=C sort -k2,2 -S 35% --parallel 6 --compress-program pzstd -T {tmpdir} | -# zstd -T0 -c > {output} -# """, -# tmpdir=self.tmpdir, -# input=self.input().path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) -# -# -# # OL -# -# -# class WithISBN(Refcat): -# """ -# Keeps converted refs with isbn. -# """ -# def requires(self): -# return RefsToRelease() -# -# def run(self): -# output = shellout(""" -# zstdcat -T0 {input} | -# parallel -j {n} --block 10M --pipe "jq -rc 'select(.ext_ids.isbn != null)'" | -# zstd -T0 -c > {output} -# """, -# n=self.n, -# tmpdir=self.tmpdir, -# input=self.input().path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) -# -# -# class OpenLibraryWorks(Refcat): -# """ -# Extract just the works. -# """ -# def requires(self): -# return OpenLibraryDump() -# -# def run(self): -# output = shellout(""" -# zstdcat -T0 {input} | -# parallel -j {n} --block 10M --pipe "jq -rc 'select(.type == \\"work\\")'" | -# zstd -T0 -c > {output} -# """, -# n=self.n, -# tmpdir=self.tmpdir, -# input=self.input().path) -# luigi.LocalTarget(output).move(self.output().path) -# -# def output(self): -# return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) +# # Note: First run with no compression filled the disk, add zstd to fuzzycat. + + +class RefsFatcatSortedKeys(Refcat): + """ + Extract keys and sort. + """ + def requires(self): + return RefsReleasesMerged() + + def run(self): + output = shellout(""" + zstdcat -T0 {input} | + skate-derive-key -skip-empty-keys -b 50000 -verbose -f tsand | + LC_ALL=C sort -k2,2 -S 35% --parallel 6 --compress-program pzstd -T {tmpdir} | + zstd -T0 -c > {output} + """, + tmpdir=self.tmpdir, + input=self.input().path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) + + +class RefsFatcatClusters(Refcat): + """ + Group by clusters. Full set will be ~90GB compressed, about 40M clusters + (already filtered, so 2+ docs only, with at least on ref and one release, etc). + """ + def requires(self): + return RefsFatcatSortedKeys() + + def run(self): + output = shellout(""" + zstdcat -T0 {input} | + skate-cluster -both | + zstd -T0 -c9 > {output} + """, + tmpdir=self.tmpdir, + input=self.input().path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) + + +# ==== new style zippy biblioref generation + + +class BiblioRefFromFuzzyClusters(Refcat): + """ + Use "bref" mode to generate a biblioref document from verified clusters. + """ + def requires(self): + return RefsFatcatClusters() + + def run(self): + output = shellout(""" + zstdcat -T0 {input} | + skate-verify -m bref > {output} + """, + input=self.input().path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) + + +class BiblioRefZippyDOI(Refcat): + """ + Generate proposed biblioref docs from two sorted key files, sorted by DOI. + """ + def requires(self): + return { + "refs": RefsDOI(), + "releases": FatcatDOI(), + } + + def run(self): + output = shellout(r""" + skate-verify -m exact -r doi -R <(zstdcat -T0 {releases}) -F <(zstdcat -T0 {refs}) | + zstd -c -T0 > {output} + """, + releases=self.input().get("releases").path, + refs=self.input().get("refs").path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) + + +class BiblioRefZippyArxiv(Refcat): + """ + Generate proposed biblioref docs from two sorted key files, sorted by DOI. + """ + def requires(self): + return { + "refs": RefsArxiv(), + "releases": FatcatArxiv(), + } + + def run(self): + output = shellout(r""" + skate-verify -m exact -r arxiv -R <(zstdcat -T0 {releases}) -F <(zstdcat -T0 {refs}) | + zstd -c -T0 > {output} + """, + releases=self.input().get("releases").path, + refs=self.input().get("refs").path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) + + +class BiblioRefZippyPMID(Refcat): + """ + Generate proposed biblioref docs from two sorted key files, sorted by DOI. + """ + def requires(self): + return { + "refs": RefsPMID(), + "releases": FatcatPMID(), + } + + def run(self): + output = shellout(r""" + skate-verify -m exact -r pmid -R <(zstdcat -T0 {releases}) -F <(zstdcat -T0 {refs}) | + zstd -c -T0 > {output} + """, + releases=self.input().get("releases").path, + refs=self.input().get("refs").path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) + + +class BiblioRefZippyPMCID(Refcat): + """ + Generate proposed biblioref docs from two sorted key files, sorted by DOI. + """ + def requires(self): + return { + "refs": RefsPMCID(), + "releases": FatcatPMCID(), + } + + def run(self): + output = shellout(r""" + skate-verify -m exact -r pmcid -R <(zstdcat -T0 {releases}) -F <(zstdcat -T0 {refs}) | + zstd -c -T0 > {output} + """, + releases=self.input().get("releases").path, + refs=self.input().get("refs").path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) + + +class BiblioRefV2(Refcat): + """ + A v1 set of biblioref schema docs. + """ + def requires(self): + return [BiblioRefZippyDOI(), BiblioRefZippyArxiv(), BiblioRefZippyPMID(), BiblioRefZippyPMCID(), BiblioRefFromFuzzyClusters()] + + def run(self): + _, tmpf = tempfile.mkstemp(prefix="refcat-") + for target in self.input(): + shellout(""" + zstdcat -T0 {input} | + skate-bref-id | + zstd -T0 >> {output} + """, + input=target.path, + output=tmpf) + luigi.LocalTarget(tmpf).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) + + +# ==== V3 related + + +# ==== RG title match example + + +class RGSitemapToRelease(Refcat): + """ + Turn sitemap data to skeleton release. + """ + def run(self): + link = "https://archive.org/download/rg_sitemap_2021_02_23/rg_sitemap_2021_02_23.ndj.zst" + output = shellout(""" + curl -sL {link} | + zstdcat -T0 | + parallel --block 10M -j 16 --pipe "jq -rc '{{\"title\": .title, \"extra\": {{\"rg\": {{\"sitemap\": true}}}}}}'" | + zstd -T0 -c > {output} + """, + link=link) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) + + +class RGSitemapFatcatMerged(Refcat): + """ + A minimal combined fatcat and RG dataset. + """ + def requires(self): + return [RGSitemapToRelease(), ReleaseExportTitleOnly()] + + def run(self): + _, tmpf = tempfile.mkstemp(prefix="refcat-") + for target in self.input(): + shellout("""cat {file} >> {output}""", file=target.path, output=tmpf) + luigi.LocalTarget(tmpf).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) + + +class RGSitemapFatcatSortedKeys(Refcat): + """ + Extract keys and sort. + """ + def requires(self): + return RGSitemapFatcatMerged() + + def run(self): + output = shellout(""" + zstdcat -T0 {input} | + skate-derive-key -b 50000 -verbose -f tsand | + LC_ALL=C sort -k2,2 -S 35% --parallel 6 --compress-program pzstd -T {tmpdir} | + zstd -T0 -c > {output}""", + tmpdir=self.tmpdir, + input=self.input().path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) + + +# ==== MAG + + +class MAGDOI(Refcat): + """ + List of MAG DOI. + """ + def requires(self): + return MAGPapers() + + def run(self): + output = shellout(""" + unpigz -c {input} | + cut -f3 | + grep -v ^$ | + zstd -T0 -c > {output} + """, + input=self.input().path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) + + +# ==== WikipediaCitations + + +class BiblioRefWikiDOISortedKeys(Refcat): + """ + Sorted DOI keys from wikipedia. + """ + def requires(self): + return WikipediaCitationsMinimalDataset() + + def run(self): + output = shellout(""" + cat {input} | + skate-wikipedia-doi | + LC_ALL=C sort -S 10% -k2,2 | + zstd -T0 -c > {output} + """, + input=self.input().path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) + + +class BiblioRefWiki(Refcat): + def requires(self): + return { + "wiki": BiblioRefWikiDOISortedKeys(), + "releases": FatcatDOI(), + } + + def run(self): + output = shellout(r""" + skate-verify -m wiki -r doi -R <(zstdcat -T0 {releases}) -W <(zstdcat -T0 {wiki}) | + zstd -c -T0 > {output} + """, + releases=self.input().get("releases").path, + wiki=self.input().get("wiki").path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) + + +# ==== Prepare unmatched + + +class BiblioRefSortedIdent(Refcat): + def requires(self): + return BiblioRefV2() + + def run(self): + output = shellout(""" + zstdcat -T0 {input} | + skate-derive-key -b 50000 -verbose -F source_release_ident | + LC_ALL=C sort -k2,2 -S 35% --parallel 6 --compress-program pzstd -T {tmpdir} | + zstd -T0 -c > {output} + """, + tmpdir=self.tmpdir, + input=self.input().path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) + + +class RefsSortedIdent(Refcat): + def requires(self): + return RefsWithUnstructured() + + def run(self): + output = shellout(""" + zstdcat -T0 {input} | + skate-derive-key -b 50000 -verbose -F release_ident | + LC_ALL=C sort -k2,2 -S 35% --parallel 6 --compress-program pzstd -T {tmpdir} | + zstd -T0 -c > {output} + """, + tmpdir=self.tmpdir, + input=self.input().path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) + + +# OL + + +class WithISBN(Refcat): + """ + Keeps converted refs with isbn. + """ + def requires(self): + return RefsToRelease() + + def run(self): + output = shellout(""" + zstdcat -T0 {input} | + parallel -j {n} --block 10M --pipe "jq -rc 'select(.ext_ids.isbn != null)'" | + zstd -T0 -c > {output} + """, + n=self.n, + tmpdir=self.tmpdir, + input=self.input().path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) + + +class OpenLibraryWorks(Refcat): + """ + Extract just the works. + """ + def requires(self): + return OpenLibraryDump() + + def run(self): + output = shellout(""" + zstdcat -T0 {input} | + parallel -j {n} --block 10M --pipe "jq -rc 'select(.type == \\"work\\")'" | + zstd -T0 -c > {output} + """, + n=self.n, + tmpdir=self.tmpdir, + input=self.input().path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="json.zst"), format=Zstd) |