""" cdx - read raw CDX, filter - push to SQL table ingest-file-result - read JSON format (batch) - cdx SQL push batch (on conflict skip) - file_meta SQL push batch (on conflict update) - ingest request push batch (on conflict skip) - ingest result push batch (on conflict update) grobid - reads JSON format (batch) - grobid2json - minio push (one-by-one) - grobid SQL push batch (on conflict update) - file_meta SQL push batch (on conflict update) """ import os from typing import Optional, AnyStr import xml.etree.ElementTree from sandcrawler.workers import SandcrawlerWorker from sandcrawler.db import SandcrawlerPostgresClient from sandcrawler.minio import SandcrawlerMinioClient from sandcrawler.grobid import GrobidClient from sandcrawler.pdfextract import PdfExtractResult from sandcrawler.ingest_html import HtmlMetaRow class PersistCdxWorker(SandcrawlerWorker): def __init__(self, db_url, **kwargs): super().__init__() self.db = SandcrawlerPostgresClient(db_url) self.cur = self.db.conn.cursor() def process(self, record, key=None): """ Only do batches (as transactions) """ raise NotImplementedError def push_batch(self, batch): self.counts['total'] += len(batch) # filter to full CDX lines, no liveweb cdx_batch = [r for r in batch if r.get('warc_path') and ("/" in r['warc_path'])] resp = self.db.insert_cdx(self.cur, cdx_batch) if len(cdx_batch) < len(batch): self.counts['skip'] += len(batch) - len(cdx_batch) self.counts['insert-cdx'] += resp[0] self.counts['update-cdx'] += resp[1] self.db.commit() return [] class PersistIngestFileResultWorker(SandcrawlerWorker): def __init__(self, db_url, **kwargs): super().__init__() self.db = SandcrawlerPostgresClient(db_url) self.cur = self.db.conn.cursor() def process(self, record, key=None): """ Only do batches (as transactions) """ raise NotImplementedError def request_to_row(self, raw): """ Converts ingest-request JSON schema (eg, from Kafka) to SQL ingest_request schema if there is a problem with conversion, return None """ # backwards compat hacks; transform request to look like current schema if raw.get('ingest_type') == 'file': raw['ingest_type'] = 'pdf' if (not raw.get('link_source') and raw.get('base_url') and raw.get('ext_ids', {}).get('doi') and raw['base_url'] == "https://doi.org/{}".format(raw['ext_ids']['doi'])): # set link_source(_id) for old ingest requests raw['link_source'] = 'doi' raw['link_source_id'] = raw['ext_ids']['doi'] if (not raw.get('link_source') and raw.get('ingest_request_source', '').startswith('savepapernow') and raw.get('fatcat', {}).get('release_ident')): # set link_source(_id) for old ingest requests raw['link_source'] = 'spn' raw['link_source_id'] = raw['fatcat']['release_ident'] for k in ('ingest_type', 'base_url', 'link_source', 'link_source_id'): if not k in raw: self.counts['skip-request-fields'] += 1 return None if raw['ingest_type'] not in ('pdf', 'xml', 'html'): self.counts['skip-ingest-type'] += 1 return None request = { 'ingest_type': raw['ingest_type'], 'base_url': raw['base_url'], 'link_source': raw['link_source'], 'link_source_id': raw['link_source_id'], 'ingest_request_source': raw.get('ingest_request_source'), 'request': {}, } # extra/optional fields if raw.get('release_stage'): request['release_stage'] = raw['release_stage'] if raw.get('fatcat', {}).get('release_ident'): request['request']['release_ident'] = raw['fatcat']['release_ident'] for k in ('ext_ids', 'edit_extra', 'rel'): if raw.get(k): request['request'][k] = raw[k] # if this dict is empty, trim it to save DB space if not request['request']: request['request'] = None return request def file_result_to_row(self, raw: dict) -> Optional[dict]: """ Converts ingest-result JSON schema (eg, from Kafka) to SQL ingest_file_result schema if there is a problem with conversion, return None and set skip count """ for k in ('request', 'hit', 'status'): if not k in raw: self.counts['skip-result-fields'] += 1 return None if not 'base_url' in raw['request']: self.counts['skip-result-fields'] += 1 return None ingest_type = raw['request'].get('ingest_type') if ingest_type == 'file': ingest_type = 'pdf' if ingest_type not in ('pdf', 'xml', 'html'): self.counts['skip-ingest-type'] += 1 return None if raw['status'] in ("existing", ): self.counts['skip-existing'] += 1 return None result = { 'ingest_type': ingest_type, 'base_url': raw['request']['base_url'], 'hit': raw['hit'], 'status': raw['status'], } terminal = raw.get('terminal') if terminal: result['terminal_url'] = terminal.get('terminal_url') or terminal.get('url') result['terminal_dt'] = terminal.get('terminal_dt') result['terminal_status_code'] = terminal.get('terminal_status_code') or terminal.get('status_code') or terminal.get('http_code') if result['terminal_status_code']: result['terminal_status_code'] = int(result['terminal_status_code']) result['terminal_sha1hex'] = terminal.get('terminal_sha1hex') if len(result['terminal_url']) > 2048: # postgresql13 doesn't like extremely large URLs in b-tree index self.counts['skip-huge-url'] += 1 return None return result def result_to_html_meta(self, record: dict) -> Optional[HtmlMetaRow]: html_body = record.get('html_body') file_meta = record.get('file_meta') if not (file_meta and html_body): return None return HtmlMetaRow( sha1hex=file_meta["sha1hex"], status=record.get('status'), scope=record.get('scope'), has_teixml=bool(html_body and html_body['status'] == 'success'), has_thumbnail=False, # TODO word_count=(html_body and html_body.get('word_count')) or None, biblio=record.get('html_biblio'), resources=record.get('html_resources'), ) def push_batch(self, batch): self.counts['total'] += len(batch) if not batch: return [] results = [self.file_result_to_row(raw) for raw in batch] results = [r for r in results if r] requests = [self.request_to_row(raw['request']) for raw in batch if raw.get('request')] requests = [r for r in requests if r] if requests: resp = self.db.insert_ingest_request(self.cur, requests) self.counts['insert-requests'] += resp[0] self.counts['update-requests'] += resp[1] if results: resp = self.db.insert_ingest_file_result(self.cur, results, on_conflict="update") self.counts['insert-results'] += resp[0] self.counts['update-results'] += resp[1] # these schemas match, so can just pass through cdx_batch = [r['cdx'] for r in batch if r.get('hit') and r.get('cdx')] revisit_cdx_batch = [r['revisit_cdx'] for r in batch if r.get('hit') and r.get('revisit_cdx')] cdx_batch.extend(revisit_cdx_batch) # filter to full CDX lines, with full warc_paths (not liveweb) cdx_batch = [r for r in cdx_batch if r.get('warc_path') and ("/" in r['warc_path'])] if cdx_batch: resp = self.db.insert_cdx(self.cur, cdx_batch) self.counts['insert-cdx'] += resp[0] self.counts['update-cdx'] += resp[1] file_meta_batch = [r['file_meta'] for r in batch if r.get('hit') and r.get('file_meta')] if file_meta_batch: resp = self.db.insert_file_meta(self.cur, file_meta_batch, on_conflict="nothing") self.counts['insert-file_meta'] += resp[0] self.counts['update-file_meta'] += resp[1] html_meta_batch = [self.result_to_html_meta(r) for r in batch if r.get('hit') and r.get('html_body')] if html_meta_batch: resp = self.db.insert_html_meta(self.cur, html_meta_batch, on_conflict="update") self.counts['insert-html_meta'] += resp[0] self.counts['update-html_meta'] += resp[1] self.db.commit() return [] class PersistIngestRequestWorker(PersistIngestFileResultWorker): def __init__(self, db_url, **kwargs): super().__init__(db_url=db_url) def process(self, record, key=None): """ Only do batches (as transactions) """ raise NotImplementedError def push_batch(self, batch): self.counts['total'] += len(batch) if not batch: return [] requests = [self.request_to_row(raw) for raw in batch] requests = [r for r in requests if r] if requests: resp = self.db.insert_ingest_request(self.cur, requests) self.counts['insert-requests'] += resp[0] self.counts['update-requests'] += resp[1] self.db.commit() return [] class PersistGrobidWorker(SandcrawlerWorker): def __init__(self, db_url, **kwargs): super().__init__() self.grobid = GrobidClient() self.s3 = SandcrawlerMinioClient( host_url=kwargs.get('s3_url', 'localhost:9000'), access_key=kwargs['s3_access_key'], secret_key=kwargs['s3_secret_key'], default_bucket=kwargs['s3_bucket'], ) self.s3_only = kwargs.get('s3_only', False) self.db_only = kwargs.get('db_only', False) assert not (self.s3_only and self.db_only), "Only one of s3_only and db_only allowed" if not self.s3_only: self.db = SandcrawlerPostgresClient(db_url) self.cur = self.db.conn.cursor() else: self.db = None self.cur = None def process(self, record, key=None): """ Only do batches (as transactions) """ raise NotImplementedError def push_batch(self, batch): self.counts['total'] += len(batch) # filter out bad "missing status_code" timeout rows missing = [r for r in batch if not r.get('status_code')] if missing: self.counts['skip-missing-status'] += len(missing) batch = [r for r in batch if r.get('status_code')] for r in batch: if r['status_code'] != 200 or not r.get('tei_xml'): self.counts['s3-skip-status'] += 1 if r.get('error_msg'): r['metadata'] = {'error_msg': r['error_msg'][:500]} continue assert len(r['key']) == 40 if not self.db_only: resp = self.s3.put_blob( folder="grobid", blob=r['tei_xml'], sha1hex=r['key'], extension=".tei.xml", ) self.counts['s3-put'] += 1 # enhance with teixml2json metadata, if available try: metadata = self.grobid.metadata(r) except xml.etree.ElementTree.ParseError as xml_e: r['status'] = 'bad-grobid-xml' r['metadata'] = {'error_msg': str(xml_e)[:1024]} continue if not metadata: continue for k in ('fatcat_release', 'grobid_version'): r[k] = metadata.pop(k, None) if r.get('fatcat_release'): r['fatcat_release'] = r['fatcat_release'].replace('release_', '') if metadata.get('grobid_timestamp'): r['updated'] = metadata['grobid_timestamp'] r['metadata'] = metadata if not self.s3_only: resp = self.db.insert_grobid(self.cur, batch, on_conflict="update") self.counts['insert-grobid'] += resp[0] self.counts['update-grobid'] += resp[1] file_meta_batch = [r['file_meta'] for r in batch if r.get('file_meta')] resp = self.db.insert_file_meta(self.cur, file_meta_batch, on_conflict="update") self.counts['insert-file-meta'] += resp[0] self.counts['update-file-meta'] += resp[1] self.db.commit() return [] class PersistGrobidDiskWorker(SandcrawlerWorker): """ Writes blobs out to disk. This could be refactored into a "Sink" type with an even thinner wrapper. """ def __init__(self, output_dir): super().__init__() self.output_dir = output_dir def _blob_path(self, sha1hex, extension=".tei.xml"): obj_path = "{}/{}/{}{}".format( sha1hex[0:2], sha1hex[2:4], sha1hex, extension, ) return obj_path def process(self, record, key=None): if record.get('status_code') != 200 or not record.get('tei_xml'): return False assert(len(record['key'])) == 40 p = "{}/{}".format(self.output_dir, self._blob_path(record['key'])) os.makedirs(os.path.dirname(p), exist_ok=True) with open(p, 'w') as f: f.write(record.pop('tei_xml')) self.counts['written'] += 1 return record class PersistPdfTrioWorker(SandcrawlerWorker): def __init__(self, db_url, **kwargs): super().__init__() self.db = SandcrawlerPostgresClient(db_url) self.cur = self.db.conn.cursor() def process(self, record, key=None): """ Only do batches (as transactions) """ raise NotImplementedError def push_batch(self, batch): self.counts['total'] += len(batch) batch = [r for r in batch if 'pdf_trio' in r and r['pdf_trio'].get('status_code')] for r in batch: # copy key (sha1hex) into sub-object r['pdf_trio']['key'] = r['key'] pdftrio_batch = [r['pdf_trio'] for r in batch] resp = self.db.insert_pdftrio(self.cur, pdftrio_batch, on_conflict="update") self.counts['insert-pdftrio'] += resp[0] self.counts['update-pdftrio'] += resp[1] file_meta_batch = [r['file_meta'] for r in batch if r['pdf_trio']['status'] == "success" and r.get('file_meta')] resp = self.db.insert_file_meta(self.cur, file_meta_batch) self.counts['insert-file-meta'] += resp[0] self.counts['update-file-meta'] += resp[1] self.db.commit() return [] class PersistPdfTextWorker(SandcrawlerWorker): """ Pushes text file to blob store (S3/seaweed/minio) and PDF metadata to SQL table. Should keep batch sizes small. """ def __init__(self, db_url, **kwargs): super().__init__() self.s3 = SandcrawlerMinioClient( host_url=kwargs.get('s3_url', 'localhost:9000'), access_key=kwargs['s3_access_key'], secret_key=kwargs['s3_secret_key'], default_bucket=kwargs['s3_bucket'], ) self.s3_only = kwargs.get('s3_only', False) self.db_only = kwargs.get('db_only', False) assert not (self.s3_only and self.db_only), "Only one of s3_only and db_only allowed" if not self.s3_only: self.db = SandcrawlerPostgresClient(db_url) self.cur = self.db.conn.cursor() else: self.db = None self.cur = None def process(self, record, key=None): """ Only do batches (as transactions) """ raise NotImplementedError def push_batch(self, batch): self.counts['total'] += len(batch) parsed_batch = [] for r in batch: parsed_batch.append(PdfExtractResult.from_pdftext_dict(r)) for r in parsed_batch: if r.status != 'success' or not r.text: self.counts['s3-skip-status'] += 1 if r.error_msg: r.metadata = {'error_msg': r.error_msg[:500]} continue assert len(r.sha1hex) == 40 if not self.db_only: resp = self.s3.put_blob( folder="text", blob=r.text, sha1hex=r.sha1hex, extension=".txt", ) self.counts['s3-put'] += 1 if not self.s3_only: resp = self.db.insert_pdf_meta(self.cur, parsed_batch, on_conflict="update") self.counts['insert-pdf-meta'] += resp[0] self.counts['update-pdf-meta'] += resp[1] file_meta_batch = [r.file_meta for r in parsed_batch if r.file_meta] resp = self.db.insert_file_meta(self.cur, file_meta_batch, on_conflict="update") self.counts['insert-file-meta'] += resp[0] self.counts['update-file-meta'] += resp[1] self.db.commit() return [] class PersistThumbnailWorker(SandcrawlerWorker): """ Pushes text file to blob store (S3/seaweed/minio) and PDF metadata to SQL table. This worker *must* be used with raw kakfa mode; thumbnails are *not* wrapped in JSON like most sandcrawler kafka messages. """ def __init__(self, **kwargs): super().__init__() self.s3 = SandcrawlerMinioClient( host_url=kwargs.get('s3_url', 'localhost:9000'), access_key=kwargs['s3_access_key'], secret_key=kwargs['s3_secret_key'], default_bucket=kwargs['s3_bucket'], ) self.s3_extension = kwargs.get('s3_extension', ".jpg") self.s3_folder = kwargs.get('s3_folder', "pdf") def process(self, blob: bytes, key: Optional[str] = None): """ Processing raw messages, not decoded JSON objects """ if isinstance(key, bytes): key = key.decode('utf-8') assert key is not None and len(key) == 40 and isinstance(key, str) assert isinstance(blob, bytes) assert len(blob) >= 50 resp = self.s3.put_blob( folder=self.s3_folder, blob=blob, sha1hex=key, extension=self.s3_extension, ) self.counts['s3-put'] += 1 class GenericPersistDocWorker(SandcrawlerWorker): """ Pushes blobs from Kafka to S3. Objects are assumed to be JSON-wrapped strings. """ def __init__(self, **kwargs): super().__init__() self.s3 = SandcrawlerMinioClient( host_url=kwargs.get('s3_url', 'localhost:9000'), access_key=kwargs['s3_access_key'], secret_key=kwargs['s3_secret_key'], default_bucket=kwargs['s3_bucket'], ) self.s3_extension = kwargs.get('s3_extension', ".unknown") self.s3_folder = kwargs.get('s3_folder', "unknown") self.doc_key = "unknown" def process(self, record: dict, key: Optional[AnyStr] = None) -> None: if record.get('status') != 'success' or not record.get(self.doc_key): return assert key is not None if isinstance(key, bytes): key_str = key.decode('utf-8') elif isinstance(key, str): key_str = key assert len(key_str) == 40 if 'sha1hex' in record: assert key_str == record['sha1hex'] resp = self.s3.put_blob( folder=self.s3_folder, blob=record[self.doc_key].encode('utf-8'), sha1hex=key_str, extension=self.s3_extension, ) self.counts['s3-put'] += 1 class PersistXmlDocWorker(GenericPersistDocWorker): """ Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to sandcrawler database (SQL). """ def __init__(self, **kwargs): super().__init__(**kwargs) self.s3_extension = kwargs.get('s3_extension', ".jats.xml") self.s3_folder = kwargs.get('s3_folder', "xml_doc") self.doc_key = "jats_xml" class PersistHtmlTeiXmlWorker(GenericPersistDocWorker): """ Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to sandcrawler database (SQL). """ def __init__(self, **kwargs): super().__init__(**kwargs) self.s3_extension = kwargs.get('s3_extension', ".tei.xml") self.s3_folder = kwargs.get('s3_folder', "html_body") self.doc_key = "tei_xml"