diff options
| -rw-r--r-- | python/sandcrawler/ia.py | 42 | ||||
| -rw-r--r-- | python/sandcrawler/ingest.py | 355 | ||||
| -rw-r--r-- | python/tests/test_wayback.py | 52 | 
3 files changed, 267 insertions, 182 deletions
diff --git a/python/sandcrawler/ia.py b/python/sandcrawler/ia.py index ba9b5b9..ac0fef8 100644 --- a/python/sandcrawler/ia.py +++ b/python/sandcrawler/ia.py @@ -13,7 +13,7 @@ from http.client import IncompleteRead  from wayback.resourcestore import ResourceStore  from gwb.loader import CDXLoaderFactory -from .misc import b32_hex, requests_retry_session +from .misc import b32_hex, requests_retry_session, gen_file_metadata  ResourceResult = namedtuple("ResourceResult", [ @@ -106,15 +106,15 @@ class CdxApiClient:                  status_code=int(raw[4]),                  sha1b32=raw[5],                  sha1hex=b32_hex(raw[5]), -                warc_csize=raw[8], -                warc_offset=raw[9], +                warc_csize=int(raw[8]), +                warc_offset=int(raw[9]),                  warc_path=raw[10],              )              assert (row.mimetype == "-") or ("-" not in row)              rows.append(row)          return rows -    def fetch(self, url, datetime): +    def fetch(self, url, datetime, filter_status_code=None):          """          Fetches a single CDX row by url/datetime. Raises a KeyError if not          found, because we expect to be looking up a specific full record. @@ -127,9 +127,10 @@ class CdxApiClient:              'to': datetime,              'matchType': 'exact',              'limit': -1, -            'fastLatest': True,              'output': 'json',          } +        if filter_status_code: +            params['filter'] = "statuscode:{}".format(filter_status_code)          resp = self._query_api(params)          if not resp:              raise KeyError("CDX url/datetime not found: {} {}".format(url, datetime)) @@ -148,9 +149,9 @@ class CdxApiClient:              'url': url,              'matchType': 'exact',              'limit': -25, -            'fastLatest': True,              'output': 'json',              'collapse': 'timestamp:6', +            'filter': '!mimetype:warc/revisit',          }          if max_age_days:              since = datetime.date.today() - datetime.timedelta(days=max_age_days) @@ -165,9 +166,11 @@ class CdxApiClient:                  200                      mimetype match -                        most-recent +                        not-liveweb +                            most-recent                      no match -                        most-recent +                        not-liveweb +                            most-recent                  3xx                      most-recent                  4xx @@ -178,10 +181,11 @@ class CdxApiClient:              This function will create a tuple that can be used to sort in *reverse* order.              """              return ( -                r.status_code == 200, -                0 - r.status_code, -                r.mimetype == best_mimetype, -                r.datetime, +                int(r.status_code == 200), +                int(0 - r.status_code), +                int(r.mimetype == best_mimetype), +                int('/' in r.warc_path), +                int(r.datetime),              )          rows = sorted(rows, key=cdx_sort_key) @@ -251,7 +255,7 @@ class WaybackClient:          # whole cluster is down though.          status_code = gwb_record.get_status()[0] -        location = gwb_record.get_location()[0] +        location = (gwb_record.get_location() or [None])[0]          body = None          if status_code == 200: @@ -280,7 +284,7 @@ class WaybackClient:          return resource.body -    def fetch_replay(self, url, datetime): +    def fetch_replay_body(self, url, datetime):          """          Fetches an HTTP 200 record from wayback via the replay interface          (web.archive.org) instead of petabox. @@ -327,8 +331,8 @@ class WaybackClient:                      body=None,                      cdx=None,                  ) -            if cdx.status_code == 200: -                body = self.fetch_petabox_body(cdx.warc_csize, cdx.warc_offset, cdx_row.warc_path) +            if cdx_row.status_code == 200: +                body = self.fetch_petabox_body(cdx_row.warc_csize, cdx_row.warc_offset, cdx_row.warc_path)                  return ResourceResult(                      start_url=start_url,                      hit=True, @@ -360,7 +364,7 @@ class WaybackClient:                  return ResourceResult(                      start_url=start_url,                      hit=False, -                    status="terminal-not-success", +                    status="terminal-bad-status",                      terminal_url=cdx_row.url,                      terminal_dt=cdx_row.datetime,                      terminal_status_code=cdx_row.status_code, @@ -506,7 +510,7 @@ class SavePageNowClient:                  status=spn_result.status,                  terminal_url=spn_result.terminal_url,                  terminal_dt=spn_result.terminal_dt, -                terminal_status_code=spn_result.terminal_status_code, +                terminal_status_code=None,                  body=None,                  cdx=None,              ) @@ -523,7 +527,7 @@ class SavePageNowClient:              hit=True,              status="success",              terminal_url=cdx_row.url, -            terminal_dt=cdx_row.status_code, +            terminal_dt=cdx_row.datetime,              terminal_status_code=cdx_row.status_code,              body=body,              cdx=cdx_partial, diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py index 58d3517..f085fc5 100644 --- a/python/sandcrawler/ingest.py +++ b/python/sandcrawler/ingest.py @@ -4,185 +4,222 @@ import json  import base64  import requests  from http.server import BaseHTTPRequestHandler, HTTPServer +from collections import namedtuple -from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, SavePageNowError, SavePageNowRemoteError, CdxApiError -krom sandcrawler.grobid import GrobidClient +from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, SavePageNowError, CdxApiError, PetaboxError +from sandcrawler.grobid import GrobidClient  from sandcrawler.misc import gen_file_metadata  from sandcrawler.html import extract_fulltext_url  from sandcrawler.workers import SandcrawlerWorker  class IngestFileWorker(SandcrawlerWorker): +    """ +    High level flow is to look in history first, then go to live web if +    resource not found. Following redirects is treated as "fetching a +    resource". Current version fetches a single resource; if it isn't a hit +    but is an HTML 200, treats it as a landing page, tries to extract +    fulltext link, then fetches that resource. + +        process(request) -> response +            Does all the things! + +    Check existing processing (short circuit): + +        check_existing_ingest(base_url) -> ingest_file_result or none +        process_existing(result) -> response +            try fetching all the rows we want. if any don't exist, fetch the resource itself and call process_hit() + +    Fetch resource: + +        find_resource(url) -> ResourceResult + +    Process resource: + +        process_hit(ResourceResult) -> response +        process_grobid(ResourceResult) +    """      def __init__(self, sink=None, **kwargs):          super().__init__()          self.sink = sink -        self.spn_client = kwargs.get('spn_client', -            SavePageNowClient()) -        self.wayback_client = kwargs.get('wayback_client', -            WaybackClient()) -        self.cdx_client = kwargs.get('cdx_client', -            CdxApiClient()) -        self.grobid_client = kwargs.get('grobid_client', -            GrobidClient()) +        self.wayback_client = kwargs.get('wayback_client') +        if not self.wayback_client: +            self.wayback_client = WaybackClient() +        self.spn_client = kwargs.get('spn_client') +        if not self.spn_client: +            self.spn_client = SavePageNowClient() +        self.grobid_client = kwargs.get('grobid_client') +        if not self.grobid_client: +            self.grobid_client = GrobidClient() + +        self.attempt_existing = False + +    def check_existing_ingest(self, base_url): +        """ +        Check in sandcrawler-db (postgres) to see if we have already ingested +        this URL (ingest file result table). +        Returns existing row *if* found *and* we should use it, otherwise None. -    def get_cdx_and_body(self, url): -        """ -        Returns a CDX dict and body as a tuple. -     -        If there isn't an existing wayback capture, take one now. Raises an -        exception if can't capture, or if CDX API not available. -     -        Raises an exception if can't find/fetch. -     -        TODO: -        - doesn't handle redirects (at CDX layer). could allow 3xx status codes and follow recursively +        Looks at existing ingest results and makes a decision based on, eg, +        status and timestamp.          """ -     -        WAYBACK_ENDPOINT = "https://web.archive.org/web/" -     -        cdx = self.cdx_client.lookup_latest(url, follow_redirects=True) -        if not cdx: -            cdx_list = self.spn_client.save_url_now_v2(url) -            for cdx_url in cdx_list: -                if 'pdf.sciencedirectassets.com' in cdx_url and '.pdf' in cdx_url: -                    cdx = self.cdx_client.lookup_latest(cdx_url) -                    break -                if 'osapublishing.org' in cdx_url and 'abstract.cfm' in cdx_url: -                    cdx = self.cdx_client.lookup_latest(cdx_url) -                    break -                if 'pubs.acs.org' in cdx_url and '/doi/pdf/' in cdx_url: -                    cdx = self.cdx_client.lookup_latest(cdx_url) -                    break -                if 'ieeexplore.ieee.org' in cdx_url and '.pdf' in cdx_url and 'arnumber=' in cdx_url: -                    cdx = self.cdx_client.lookup_latest(cdx_url) -                    break -                if 'hrmars.com' in cdx_url and 'journals/papers' in cdx_url: -                    cdx = self.cdx_client.lookup_latest(cdx_url) -                    break -            if not cdx: -                # extraction didn't work as expected; fetch whatever SPN2 got -                cdx = self.cdx_client.lookup_latest(url, follow_redirects=True) -            if not cdx: -                print("{}".format(cdx_list), file=sys.stderr) -                raise SavePageNowError("Failed to find terminal capture from SPNv2") +        if not self.attempt_existing: +            return None +        raise NotImplementedError -        try: -            resp = requests.get(WAYBACK_ENDPOINT + cdx['datetime'] + "id_/" + cdx['url']) -        except requests.exceptions.TooManyRedirects as e: -            raise WaybackError("Redirect loop fetching from wayback (dt: {}, url: {})".format(cdx['datetime'], cdx['url'])) -        if resp.status_code != cdx['http_status']: -            raise WaybackError("Got unexpected wayback status (expected {} from CDX, got {})".format(cdx['http_status'], resp.status_code)) -        body = resp.content -        return (cdx, body) +        # this "return True" is just here to make pylint happy +        return True -    def process(self, request): +    def find_resource(self, url, best_mimetype=None): +        """ +        Looks in wayback for a resource starting at the URL, following any +        redirects. If a hit isn't found, try crawling with SPN. +        """ +        resource = self.wayback_client.lookup_resource(url, best_mimetype) +        if not resource or not resource.hit: +            resource = self.spn_client.crawl_resource(url, self.wayback_client) +        print("[FETCH] {}  url:{}".format( +                resource.status, +                url), +            file=sys.stderr) +        return resource + +    def process_existing(self, request, result_row): +        """ +        If we have an existing ingest file result, do any database fetches or +        additional processing necessary to return a result.          """ -        1. check sandcrawler-db for base_url -            -> if found, populate terminal+wayback fields -        2. check CDX for base_url (only 200, past year) -            -> if found, populate terminal+wayback fields -        3. if we have wayback, fetch that. otherwise do recursive SPN crawl -            -> populate terminal+wayback -        4. calculate file_meta -            -> populate file_meta -        5. check sandcrawler-db for GROBID XML -        6. run GROBID if we didn't already -            -> push results to minio+sandcrawler-db -        7. decide if this was a hit - -        In all cases, print JSON status, and maybe push to sandcrawler-db +        result = { +            'hit': result_row.hit, +            'status': result_row.status, +            'request': request, +        } +        # TODO: fetch file_meta +        # TODO: fetch grobid +        return result + +    def process_hit(self, resource, file_meta):          """ +        Run all the necessary processing for a new/fresh ingest hit. +        """ +        return { +            'grobid': self.process_grobid(resource), +        } + +    def process_grobid(self, resource): +        """ +        Submits to resource body to GROBID for processing. + +        TODO: By default checks sandcrawler-db for an existing row first, then +        decide if we should re-process + +        TODO: Code to push to Kafka might also go here? +        """ +        result = self.grobid_client.process_fulltext(resource.body) +        if result['status'] == "success": +            metadata = self.grobid_client.metadata(result) +            if metadata: +                result.update(metadata) +        result.pop('tei_xml', None) +        return result + +    def process(self, request): + +        # for now, only pdf ingest is implemented +        assert request.get('ingest_type') == "pdf" +        ingest_type = request.get('ingest_type') +        base_url = request['base_url'] + +        best_mimetype = None +        if ingest_type == "pdf": +            best_mimetype = "application/pdf" + +        existing = self.check_existing_ingest(base_url) +        if existing: +            return self.process_existing(request, existing) + +        result = dict(request=request, hit=False) + +        try: +            # first hop +            resource = self.find_resource(base_url, best_mimetype) +            print("[{}] fetch status: {} url: {}".format( +                    resource.status, +                    ingest_type, +                    base_url), +                file=sys.stderr) +            if not resource.hit: +                result['status'] = resource.status +                return result +            file_meta = gen_file_metadata(resource.body) + +            if "html" in file_meta['mimetype']: +                # got landing page, try another hop +                fulltext_url = extract_fulltext_url(resource.terminal_url, resource.body) +                 +                result['html'] = fulltext_url +                if not fulltext_url or not 'pdf_url' in fulltext_url: +                    result['status'] = 'no-pdf-link' +                    return result +                print("\tlanding page URL extracted ({}): {}".format( +                        fulltext_url.get('technique'), +                        fulltext_url['pdf_url'], +                    ), +                    file=sys.stderr) +                resource = self.find_resource(fulltext_url['pdf_url'], best_mimetype) +                if not resource.hit: +                    result['status'] = resource.status +                    return result +                file_meta = gen_file_metadata(resource.body) +        except SavePageNowError as e: +            result['status'] = 'spn-error' +            result['error_message'] = str(e) +            return result +        except PetaboxError as e: +            result['status'] = 'petabox-error' +            result['error_message'] = str(e) +            return result +        except CdxApiError as e: +            result['status'] = 'cdx-error' +            result['error_message'] = str(e) +            return result +        except WaybackError as e: +            result['status'] = 'wayback-error' +            result['error_message'] = str(e) +            return result + +        if resource.terminal_dt: +            result['terminal'] = { +                "terminal_url": resource.terminal_url, +                "terminal_dt": resource.terminal_dt, +                "terminal_status_code": resource.terminal_status_code, +            } + +        # must be a hit if we got this far +        assert resource.hit == True +        assert resource.terminal_status_code == 200 + +        result['file_meta'] = file_meta + +        # other failure cases +        if not resource.body or file_meta['size_bytes'] == 0: +            result['status'] = 'null-body' +            return result + +        if not (resource.hit and file_meta['mimetype'] == "application/pdf"): +            result['status'] = "wrong-mimetype"  # formerly: "other-mimetype" +            return result + +        info = self.process_hit(resource, file_meta) +        result.update(info) + +        result['status'] = "success" +        result['hit'] = True +        return result -        response = dict(request=request, hit=False) -        url = request['base_url'] -        while url: -            try: -                (cdx_dict, body) = self.get_cdx_and_body(url) -            except SavePageNowRemoteError as e: -                response['status'] = 'spn-remote-error' -                response['error_message'] = str(e) -                return response -            except SavePageNowError as e: -                response['status'] = 'spn-error' -                response['error_message'] = str(e) -                return response -            except CdxApiError as e: -                response['status'] = 'cdx-error' -                response['error_message'] = str(e) -                return response -            except WaybackError as e: -                response['status'] = 'wayback-error' -                response['error_message'] = str(e) -                return response -            print("CDX hit: {}".format(cdx_dict), file=sys.stderr) - -            response['cdx'] = cdx_dict -            # TODO: populate terminal -            response['terminal'] = dict(url=cdx_dict['url'], http_status=cdx_dict['http_status']) -            if not body: -                response['status'] = 'null-body' -                return response -            if cdx_dict['http_status'] != 200: -                response['status'] = 'terminal-bad-status' -                return response -            file_meta = gen_file_metadata(body) -            mimetype = cdx_dict['mimetype'] -            if mimetype in ('warc/revisit', 'binary/octet-stream', 'application/octet-stream', 'application/x-download', 'application/force-download'): -                mimetype = file_meta['mimetype'] -                response['file_meta'] = file_meta -            if 'html' in mimetype: -                page_metadata = extract_fulltext_url(response['cdx']['url'], body) -                if page_metadata and page_metadata.get('pdf_url'): -                    next_url = page_metadata.get('pdf_url') -                    if next_url == url: -                        response['status'] = 'link-loop' -                        return response -                    url = next_url -                    continue -                elif page_metadata and page_metadata.get('next_url'): -                    next_url = page_metadata.get('next_url') -                    if next_url == url: -                        response['status'] = 'link-loop' -                        return response -                    url = next_url -                    continue -                else: -                    response['terminal']['html'] = page_metadata -                    response['status'] = 'no-pdf-link' -                return response -            elif 'pdf' in mimetype: -                break -            else: -                response['status'] = 'other-mimetype' -                return response - -        response['file_meta'] = file_meta - -        # if we got here, we have a PDF -        sha1hex = response['file_meta']['sha1hex'] - -        # do GROBID -        response['grobid'] = self.grobid_client.process_fulltext(body) -        #print("GROBID status: {}".format(response['grobid']['status']), file=sys.stderr) - -        # TODO: optionally publish to Kafka here, but continue on failure (but -        # send a sentry exception?) - -        # parse metadata, but drop fulltext from ingest response -        if response['grobid']['status'] == 'success': -            grobid_metadata = self.grobid_client.metadata(response['grobid']) -            if grobid_metadata: -                response['grobid'].update(grobid_metadata) -            response['grobid'].pop('tei_xml') - -        # Ok, now what? -        #print("GOT TO END", file=sys.stderr) -        response['status'] = "success" -        response['hit'] = True -        return response  class IngestFileRequestHandler(BaseHTTPRequestHandler):      def do_POST(self): diff --git a/python/tests/test_wayback.py b/python/tests/test_wayback.py index 8d15d70..2aafe7c 100644 --- a/python/tests/test_wayback.py +++ b/python/tests/test_wayback.py @@ -54,8 +54,8 @@ def test_cdx_fetch(cdx_client):      assert resp.datetime == CDX_DT      assert resp.url == CDX_TARGET      assert resp.sha1b32 == "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR" -    assert resp.warc_csize == "8445" -    assert resp.warc_offset == "108062304" +    assert resp.warc_csize == 8445 +    assert resp.warc_offset == 108062304      assert resp.warc_path == "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz"  @responses.activate @@ -96,7 +96,17 @@ def test_cdx_lookup_best(cdx_client):      assert resp.warc_path == CDX_SINGLE_HIT[1][-1]  WARC_TARGET = "http://fatcat.wiki/" -WARC_BODY = b"<html>some stuff</html>" +WARC_BODY = b""" +<html> +  <head> +      <meta name="citation_pdf_url" content="http://www.example.com/content/271/20/11761.full.pdf"> +  </head> +  <body> +    <h1>my big article here</h1> +    blah +  </body> +</html> +"""  @pytest.fixture  def wayback_client(cdx_client, mocker): @@ -116,7 +126,30 @@ def wayback_client(cdx_client, mocker):      return client -def test_wayback_fetch(wayback_client, mocker): +@pytest.fixture +def wayback_client_pdf(cdx_client, mocker): + +    with open('tests/files/dummy.pdf', 'rb') as f: +        pdf_bytes = f.read() + +    client = WaybackClient( +        cdx_client=cdx_client, +        petabox_webdata_secret="dummy-petabox-secret", +    ) +    # mock out the wayback store with mock stuff +    client.rstore = mocker.Mock() +    resource = mocker.Mock() +    client.rstore.load_resource = mocker.MagicMock(return_value=resource) +    resource.get_status = mocker.MagicMock(return_value=[200]) +    resource.get_location = mocker.MagicMock(return_value=[WARC_TARGET]) +    body = mocker.Mock() +    resource.open_raw_content = mocker.MagicMock(return_value=body) +    body.read = mocker.MagicMock(return_value=pdf_bytes) + +    return client + +@responses.activate +def test_wayback_fetch(wayback_client):      resp = wayback_client.fetch_petabox(123, 456789, "here/there.warc.gz")      assert resp.body == WARC_BODY      assert resp.location == WARC_TARGET @@ -124,3 +157,14 @@ def test_wayback_fetch(wayback_client, mocker):      resp = wayback_client.fetch_petabox_body(123, 456789, "here/there.warc.gz")      assert resp == WARC_BODY +@responses.activate +def test_lookup_resource_success(wayback_client): + +    responses.add(responses.GET, +        'http://dummy-cdx/cdx', +        status=200, +        body=json.dumps(CDX_MULTI_HIT)) + +    resp = wayback_client.lookup_resource(CDX_TARGET) + +    assert resp.hit == True  | 
