diff options
Diffstat (limited to 'python/fatcat_tools/importers')
-rw-r--r-- | python/fatcat_tools/importers/__init__.py | 2 | ||||
-rw-r--r-- | python/fatcat_tools/importers/arabesque.py | 3 | ||||
-rw-r--r-- | python/fatcat_tools/importers/common.py | 45 | ||||
-rw-r--r-- | python/fatcat_tools/importers/ingest.py | 85 | ||||
-rw-r--r-- | python/fatcat_tools/importers/matched.py | 3 |
5 files changed, 118 insertions, 20 deletions
diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index 025a111c..bb9c5b17 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -26,4 +26,4 @@ from .orcid import OrcidImporter from .arabesque import ArabesqueMatchImporter, ARABESQUE_MATCH_WHERE_CLAUSE from .wayback_static import auto_wayback_static from .cdl_dash_dat import auto_cdl_dash_dat -from .ingest import IngestFileResultImporter +from .ingest import IngestFileResultImporter, SavePaperNowFileImporter diff --git a/python/fatcat_tools/importers/arabesque.py b/python/fatcat_tools/importers/arabesque.py index 7017c56c..acfc2b87 100644 --- a/python/fatcat_tools/importers/arabesque.py +++ b/python/fatcat_tools/importers/arabesque.py @@ -42,8 +42,7 @@ class ArabesqueMatchImporter(EntityImporter): def __init__(self, api, extid_type, require_grobid=True, **kwargs): - eg_desc = kwargs.get('editgroup_description', - "Match web crawl files to releases based on identifier/URL seedlist") + eg_desc = kwargs.get('editgroup_description', None) or "Match web crawl files to releases based on identifier/URL seedlist" eg_extra = kwargs.get('editgroup_extra', dict()) eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.ArabesqueMatchImporter') if kwargs.get('crawl_id'): diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 073725ad..d51a5ff4 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -6,6 +6,7 @@ import json import ftfy import base64 import sqlite3 +import datetime import subprocess import unicodedata from collections import Counter @@ -112,7 +113,7 @@ def clean(thing, force_xml=False): This function is appropriate to be called on any random, non-markup string, such as author names, titles, etc. - It will try to clean up commong unicode mangles, HTML characters, etc. + It will try to clean up common unicode mangles, HTML characters, etc. This will detect XML/HTML and "do the right thing" (aka, not remove entities like '&' if there are tags in the string), unless you pass the @@ -271,6 +272,11 @@ class EntityImporter: if didn't update or insert because of existing) self.counts['update'] += 1 if updated an entity + + Parameters: + + submit_mode: instead of accepting editgroups, only submits them. + implementors must write insert_batch appropriately """ def __init__(self, api, **kwargs): @@ -282,6 +288,7 @@ class EntityImporter: self.api = api self.bezerk_mode = kwargs.get('bezerk_mode', False) + self.submit_mode = kwargs.get('submit_mode', False) self.edit_batch_size = kwargs.get('edit_batch_size', 100) self.editgroup_description = kwargs.get('editgroup_description') self.editgroup_extra = eg_extra @@ -330,8 +337,20 @@ class EntityImporter: raise NotImplementedError def finish(self): + """ + Gets called as cleanup at the end of imports, but can also be called at + any time to "snip off" current editgroup progress. In other words, safe + to call this but then continue pushing records. + + For example, in a persistent worker could call this if there have been + no new entities fed in for more than some time period, to ensure that + entities actually get created within a reasonable time frame. + """ if self._edit_count > 0: - self.api.accept_editgroup(self._editgroup_id) + if self.submit_mode: + self.api.submit_editgroup(self._editgroup_id) + else: + self.api.accept_editgroup(self._editgroup_id) self._editgroup_id = None self._edit_count = 0 self._edits_inflight = [] @@ -345,7 +364,10 @@ class EntityImporter: def get_editgroup_id(self, edits=1): if self._edit_count >= self.edit_batch_size: - self.api.accept_editgroup(self._editgroup_id) + if self.submit_mode: + self.api.submit_editgroup(self._editgroup_id) + else: + self.api.accept_editgroup(self._editgroup_id) self._editgroup_id = None self._edit_count = 0 self._edits_inflight = [] @@ -715,22 +737,28 @@ class KafkaJsonPusher(RecordPusher): def run(self): count = 0 + last_push = datetime.datetime.now() while True: - # TODO: this is batch-oriented, because underlying importer is + # Note: this is batch-oriented, because underlying importer is # often batch-oriented, but this doesn't confirm that entire batch # has been pushed to fatcat before commiting offset. Eg, consider # case where there there is one update and thousands of creates; # update would be lingering in importer, and if importer crashed - # never created. Not great. + # never created. + # This is partially mitigated for the worker case by flushing any + # outstanding editgroups every 5 minutes, but there is still that + # window when editgroups might be hanging (unsubmitted). batch = self.consumer.consume( num_messages=self.consume_batch_size, timeout=self.poll_interval) print("... got {} kafka messages ({}sec poll interval)".format( len(batch), self.poll_interval)) if not batch: - # TODO: could have some larger timeout here and - # self.importer.finish() if it's been more than, eg, a couple - # minutes + if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5): + # it has been some time, so flush any current editgroup + self.importer.finish() + last_push = datetime.datetime.now() + #print("Flushed any partial import batch: {}".format(self.importer.counts)) continue # first check errors on entire batch... for msg in batch: @@ -743,6 +771,7 @@ class KafkaJsonPusher(RecordPusher): count += 1 if count % 500 == 0: print("Import counts: {}".format(self.importer.counts)) + last_push = datetime.datetime.now() for msg in batch: # locally store offsets of processed messages; will be # auto-commited by librdkafka from this "stored" value diff --git a/python/fatcat_tools/importers/ingest.py b/python/fatcat_tools/importers/ingest.py index deb4ef51..c47f0aa7 100644 --- a/python/fatcat_tools/importers/ingest.py +++ b/python/fatcat_tools/importers/ingest.py @@ -11,8 +11,7 @@ class IngestFileResultImporter(EntityImporter): def __init__(self, api, require_grobid=True, **kwargs): - eg_desc = kwargs.pop('editgroup_description', - "Files crawled from web using sandcrawler ingest tool") + eg_desc = kwargs.pop('editgroup_description', None) or "Files crawled from web using sandcrawler ingest tool" eg_extra = kwargs.pop('editgroup_extra', dict()) eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestFileResultImporter') super().__init__(api, @@ -21,7 +20,6 @@ class IngestFileResultImporter(EntityImporter): **kwargs) self.default_link_rel = kwargs.get("default_link_rel", "web") assert self.default_link_rel - self.default_mimetype = kwargs.get("default_mimetype", None) self.do_updates = kwargs.get("do_updates", False) self.require_grobid = require_grobid if self.require_grobid: @@ -53,9 +51,14 @@ class IngestFileResultImporter(EntityImporter): if row.get('hit') != True: self.counts['skip-hit'] += 1 return False - if self.ingest_request_source_whitelist and row['request'].get('ingest_request_source') not in self.ingest_request_source_whitelist: + source = row['request'].get('ingest_request_source') + if self.ingest_request_source_whitelist and source not in self.ingest_request_source_whitelist: self.counts['skip-ingest_request_source'] += 1 return False + if source.startswith('savepapernow'): + # never process async savepapernow requests + self.counts['skip-savepapernow'] += 1 + return False if not row.get('file_meta'): self.counts['skip-file-meta'] += 1 return False @@ -123,16 +126,21 @@ class IngestFileResultImporter(EntityImporter): sha1=file_meta['sha1hex'], sha256=file_meta['sha256hex'], size=file_meta['size_bytes'], - mimetype=file_meta['mimetype'] or self.default_mimetype, + mimetype=file_meta['mimetype'], release_ids=[release_ident], urls=urls, ) if fatcat and fatcat.get('edit_extra'): fe.edit_extra = fatcat['edit_extra'] + else: + fe.edit_extra = dict() if request.get('ingest_request_source'): - if not fe.edit_extra: - fe.edit_extra = dict() fe.edit_extra['ingest_request_source'] = request['ingest_request_source'] + if request.get('link_source') and request.get('link_source_id'): + fe.edit_extra['link_source'] = request['link_source'] + fe.edit_extra['link_source_id'] = request['link_source_id'] + if not fe.edit_extra: + fe.edit_extra = None return fe def try_update(self, fe): @@ -152,6 +160,12 @@ class IngestFileResultImporter(EntityImporter): self.counts['exists'] += 1 return False + # check for existing edits-in-progress with same file hash + for other in self._entity_queue: + if other.sha1 == fe.sha1: + self.counts['skip-in-queue'] += 1 + return False + if not self.do_updates: self.counts['skip-update-disabled'] += 1 return False @@ -167,3 +181,60 @@ class IngestFileResultImporter(EntityImporter): extra=self.editgroup_extra), entity_list=batch)) + +class SavePaperNowFileImporter(IngestFileResultImporter): + """ + This worker ingests from the same feed as IngestFileResultImporter, but + only imports files from anonymous save-paper-now requests, and "submits" + them for further human review (as opposed to accepting by default). + """ + + def __init__(self, api, submit_mode=True, **kwargs): + + eg_desc = kwargs.pop('editgroup_description', None) or "Files crawled after a public 'Save Paper Now' request" + eg_extra = kwargs.pop('editgroup_extra', dict()) + eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestFileSavePaperNow') + kwargs['submit_mode'] = submit_mode + kwargs['require_grobid'] = True + kwargs['do_updates'] = False + super().__init__(api, + editgroup_description=eg_desc, + editgroup_extra=eg_extra, + **kwargs) + + def want(self, row): + + source = row['request'].get('ingest_request_source') + if not source.startswith('savepapernow'): + self.counts['skip-not-savepapernow'] += 1 + return False + if row.get('hit') != True: + self.counts['skip-hit'] += 1 + return False + if not row.get('file_meta'): + self.counts['skip-file-meta'] += 1 + return False + if self.require_grobid and row.get('grobid', {}).get('status_code') != 200: + self.counts['skip-grobid'] += 1 + return False + + return True + + def insert_batch(self, batch): + """ + Usually running in submit_mode, so we can't use auto_batch method + """ + if self.submit_mode: + eg = self.api.create_editgroup(fatcat_openapi_client.Editgroup( + description=self.editgroup_description, + extra=self.editgroup_extra)) + for fe in batch: + self.api.create_file(eg.editgroup_id, fe) + self.api.update_editgroup(eg.editgroup_id, eg, submit=True) + else: + self.api.create_file_auto_batch(fatcat_openapi_client.FileAutoBatch( + editgroup=fatcat_openapi_client.Editgroup( + description=self.editgroup_description, + extra=self.editgroup_extra), + entity_list=batch)) + diff --git a/python/fatcat_tools/importers/matched.py b/python/fatcat_tools/importers/matched.py index dbb78ec9..180d7ba3 100644 --- a/python/fatcat_tools/importers/matched.py +++ b/python/fatcat_tools/importers/matched.py @@ -35,8 +35,7 @@ class MatchedImporter(EntityImporter): def __init__(self, api, **kwargs): - eg_desc = kwargs.pop('editgroup_description', - "Import of large-scale file-to-release match results. Source of metadata varies.") + eg_desc = kwargs.pop('editgroup_description', None) or "Import of large-scale file-to-release match results. Source of metadata varies." eg_extra = kwargs.pop('editgroup_extra', dict()) eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.MatchedImporter') super().__init__(api, |