diff options
| -rw-r--r-- | fatcat_scholar/sim_pipeline.py | 65 | ||||
| -rw-r--r-- | notes/indexing_pipeline.md | 8 | 
2 files changed, 73 insertions, 0 deletions
diff --git a/fatcat_scholar/sim_pipeline.py b/fatcat_scholar/sim_pipeline.py index 926d943..0277619 100644 --- a/fatcat_scholar/sim_pipeline.py +++ b/fatcat_scholar/sim_pipeline.py @@ -38,6 +38,8 @@ def truncate_issue_meta(full: Dict[str, Any]) -> Dict[str, Any]:      Same as truncate_pub_meta() but for issue item metadata      """      full.pop("files") +    full.pop("histograms", None) +    full.pop("rotations", None)      return full @@ -132,6 +134,11 @@ class SimPipeline:          return pages      def run_issue_db(self, limit: int = None) -> None: +        """ +        Legacy/Deprecated code path + +        Runs DB iteration and fetching in single thread/process +        """          count = 0          self.issue_db.db.row_factory = sqlite3.Row          cur = self.issue_db.db.cursor() @@ -165,6 +172,45 @@ class SimPipeline:              if limit is not None and count >= limit:                  break +    def run_print_issues(self, max_release_count: int = 5) -> None: +        """ +        Iterates over issues, printing as TSV to stdout + +        Intended to be used with GNU/parallel for coarse parallelism. +        """ +        self.issue_db.db.row_factory = sqlite3.Row +        cur = self.issue_db.db.cursor() +        for row in cur.execute( +            f"SELECT * FROM sim_issue LEFT JOIN sim_pub ON sim_issue.sim_pubid = sim_pub.sim_pubid WHERE sim_issue.release_count < {max_release_count}" +        ): +            # filter out "contents" and "index" items +            # TODO: more filters; also redundant with IssueDB code? +            if row["issue_item"].endswith("_contents") or row["issue_item"].endswith( +                "_index" +            ): +                continue +            print(f"{row['issue_item']}\t{row['pub_collection']}") + +    def run_fetch_issue(self, issue_item: str, pub_collection: str) -> None: +        """ +        Fetches SIM issue. + +        TODO: more advanced retries? +        """ +        try: +            full_issue = self.fetch_sim_issue(issue_item, pub_collection) +        except requests.exceptions.ConnectionError as e: +            print(str(e), file=sys.stderr) +            return +        except requests.exceptions.ReadTimeout as e: +            print(str(e), file=sys.stderr) +            return +        if not full_issue: +            return +        pages = self.full_issue_to_pages(full_issue) +        for bundle in pages: +            print(bundle.json(exclude_none=True, sort_keys=True)) +  def main() -> None:      """ @@ -189,6 +235,19 @@ def main() -> None:      sub.set_defaults(func="run_issue_db")      sub.add_argument("--limit", help="maximum number of pages to index", type=int) +    sub = subparsers.add_parser( +        "run_print_issues", +        help="dumps issues as simple TSV rows (for parallel processing)", +    ) +    sub.set_defaults(func="run_print_issues") + +    sub = subparsers.add_parser( +        "run_fetch_issue", help="fetches pages for given issue item" +    ) +    sub.add_argument("issue_item", type=str) +    sub.add_argument("pub_collection", type=str) +    sub.set_defaults(func="run_fetch_issue") +      args = parser.parse_args()      if not args.__dict__.get("func"):          parser.print_help(file=sys.stderr) @@ -198,6 +257,12 @@ def main() -> None:      if args.func == "run_issue_db":          sp.run_issue_db(limit=args.limit) +    elif args.func == "run_print_issues": +        sp.run_print_issues() +    elif args.func == "run_fetch_issue": +        sp.run_fetch_issue( +            issue_item=args.issue_item, pub_collection=args.pub_collection +        )      else:          func = getattr(sp, args.func)          func() diff --git a/notes/indexing_pipeline.md b/notes/indexing_pipeline.md index f891d27..ce4d687 100644 --- a/notes/indexing_pipeline.md +++ b/notes/indexing_pipeline.md @@ -46,3 +46,11 @@ Transform and index both into local elasticsearch:      => 132635 docs in 2m18.787824205s at 955.667 docs/s with 4 workers +## Iterated + +    # in pipenv shell +    python -m fatcat_scholar.sim_pipeline run_print_issues \ +        | parallel -j8 --colsep "\t" python -m fatcat_scholar.sim_pipeline run_fetch_issue {1} {2} \ +        | pv -l \ +        | gzip \ +        > data/sim_intermediate.json.gz  | 
