import json
import time
from typing import Any, Dict, List, Optional

from confluent_kafka import Consumer, KafkaException, Producer
from fatcat_openapi_client import ApiClient, ReleaseEntity

from fatcat_tools.transforms import release_ingest_request, release_to_elasticsearch

from .worker_common import FatcatWorker, most_recent_message


class ChangelogWorker(FatcatWorker):
    """
    Periodically polls the fatcat API looking for new changelogs. When they are
    found, fetch them and push (as JSON) into a Kafka topic.
    """

    def __init__(
        self,
        api: ApiClient,
        kafka_hosts: str,
        produce_topic: str,
        poll_interval: float = 10.0,
        offset: Optional[int] = None,
    ) -> None:
        super().__init__(kafka_hosts=kafka_hosts, produce_topic=produce_topic, api=api)
        self.poll_interval = poll_interval
        self.offset = offset  # the fatcat changelog offset, not the kafka offset

    def run(self) -> None:

        # On start, try to consume the most recent from the topic, and using
        # that as the starting offset. Note that this is a single-partition
        # topic
        if self.offset is None:
            print("Checking for most recent changelog offset...")
            assert self.produce_topic
            msg = most_recent_message(self.produce_topic, self.kafka_config)
            if msg:
                self.offset = json.loads(msg.decode("utf-8"))["index"]
            else:
                self.offset = 0
            print("Most recent changelog index in Kafka seems to be {}".format(self.offset))

        def fail_fast(err: Any, _msg: Any) -> None:
            if err is not None:
                print("Kafka producer delivery error: {}".format(err))
                print("Bailing out...")
                # TODO: should it be sys.exit(-1)?
                raise KafkaException(err)

        producer_conf = self.kafka_config.copy()
        producer_conf.update(
            {
                "delivery.report.only.error": True,
                "default.topic.config": {
                    "request.required.acks": -1,  # all brokers must confirm
                },
            }
        )
        producer = Producer(producer_conf)

        while True:
            latest = int(self.api.get_changelog(limit=1)[0].index)
            if latest > self.offset:
                print("Fetching changelogs from {} through {}".format(self.offset + 1, latest))
            for i in range(self.offset + 1, latest + 1):
                cle = self.api.get_changelog_entry(i)
                obj = self.api.api_client.sanitize_for_serialization(cle)
                producer.produce(
                    self.produce_topic,
                    json.dumps(obj).encode("utf-8"),
                    key=str(i),
                    on_delivery=fail_fast,
                    # NOTE timestamp could be timestamp=cle.timestamp (?)
                )
                self.offset = i
            producer.flush()
            print("Sleeping {} seconds...".format(self.poll_interval))
            time.sleep(self.poll_interval)


class EntityUpdatesWorker(FatcatWorker):
    """
    Consumes from the changelog topic and publishes expanded entities (fetched
    from API) to update topics.
    """

    def __init__(
        self,
        api: ApiClient,
        kafka_hosts: str,
        consume_topic: str,
        release_topic: str,
        file_topic: str,
        container_topic: str,
        ingest_file_request_topic: str,
        work_ident_topic: str,
        poll_interval: float = 5.0,
    ):
        super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic, api=api)
        self.release_topic = release_topic
        self.file_topic = file_topic
        self.container_topic = container_topic
        self.ingest_file_request_topic = ingest_file_request_topic
        self.work_ident_topic = work_ident_topic
        self.poll_interval = poll_interval
        self.consumer_group = "entity-updates"
        self.ingest_oa_only = False
        self.ingest_pdf_doi_prefix_blocklist = [
            # gbif.org: many DOIs, not PDF fulltext
            "10.15468/",
            # ssl.fao.org: gene data
            "10.18730/",
            # plutof.ut.ee: gene data
            "10.15156/",
            # ba.e-pics.ethz.ch: swiss image (photo) archive
            "10.3932/",
            # ccdc.cam.ac.uk: crystal structures
            "10.5517/",
            # researchgate: mostly blocks our crawler
            "10.13140/",
            # springerlink: mostly blocks crawler
            "10.1007/",
            # nature group: mostly blocks crawler
            "10.1038/",
            # SAGE: mostly blocks crawler
            "10.1177/",
            # IOP: mostly blocks crawler
            "10.1088/",
            # JSTOR: mostly blocks crawler
            "10.2307/",
        ]
        self.live_pdf_ingest_doi_prefix_acceptlist = [
            # biorxiv and medrxiv
            "10.1101/",
            # the lancet (often hybrid OA)
            "10.1016/s0140-6736",
            "10.1016/s2213-2600",
            # journal of virology
            "10.1128/jvi.",
            # FEBS letters
            "10.1002/1873-3468.",
            # Journal of Neuroscience
            "10.1523/jneurosci.",
            # Chemical and pharmaceutical bulletin
            "10.1248/cpb.",
            # Japanese Journal of Radiological Technology
            "10.6009/jjrt.",
            # Seibutsu Butsuri
            "10.2142/biophys.",
            # Chemical Communications
            "10.1039/d0cc",
            # Yakugaku zasshi
            "10.1248/yakushi.",
            # bulletin AMS
            "10.1090/s0002-9904",
            # Current Biology
            "10.1016/j.cub.",
            # Antarctica A Keystone in a Changing World
            "10.3133/ofr",
            # Clinical Cancer Research
            "10.1158/1078-0432.",
            # Transactions of the Japan Society of Mechanical Engineers
            "10.1299/kikai",
            # protocols.io
            "10.17504/",
        ]

    def want_live_ingest(self, release: ReleaseEntity, ingest_request: Dict[str, Any]) -> bool:
        """
        This function looks at ingest requests and decides whether they are
        worth enqueing for ingest.

        In theory crawling all DOIs to a landing page is valuable.  It is
        intended to be an operational point of control to reduce load on daily
        ingest crawling (via wayback SPN).
        """

        link_source = ingest_request.get("ingest_request")
        ingest_type = ingest_request.get("ingest_type")
        doi = ingest_request.get("ext_ids", {}).get("doi")
        es = release_to_elasticsearch(release)

        is_document = release.release_type in (
            "article",
            "article-journal",
            "article-newspaper",
            "book",
            "chapter",
            "editorial",
            "interview",
            "legal_case",
            "legislation",
            "letter",
            "manuscript",
            "paper-conference",
            "patent",
            "peer_review",
            "post",
            "report",
            "retraction",
            "review",
            "review-book",
            "thesis",
        )
        is_not_pdf = release.release_type in (
            "component",
            "dataset",
            "figure",
            "graphic",
            "software",
            "stub",
        )

        # accept list sets a default "crawl it" despite OA metadata for
        # known-OA DOI prefixes
        in_acceptlist = False
        if doi:
            for prefix in self.live_pdf_ingest_doi_prefix_acceptlist:
                if doi.startswith(prefix):
                    in_acceptlist = True

        if self.ingest_oa_only and link_source not in ("arxiv", "pmc"):

            # most datacite documents are in IRs and should be crawled
            is_datacite_doc = False
            if release.extra and ("datacite" in release.extra) and is_document:
                is_datacite_doc = True
            if not (es["is_oa"] or in_acceptlist or is_datacite_doc):
                return False

        # big publishers *generally* have accurate OA metadata, use
        # preservation networks, and block our crawlers. So unless OA, or
        # explicitly on accept list, or not preserved, skip crawling
        if (
            es.get("publisher_type") == "big5"
            and es.get("is_preserved")
            and not (es["is_oa"] or in_acceptlist)
        ):
            return False

        # if ingest_type is pdf but release_type is almost certainly not a PDF,
        # skip it. This is mostly a datacite thing.
        if ingest_type == "pdf" and is_not_pdf:
            return False

        if ingest_type == "pdf" and doi:
            for prefix in self.ingest_pdf_doi_prefix_blocklist:
                if doi.startswith(prefix):
                    return False

        # figshare
        if doi and (doi.startswith("10.6084/") or doi.startswith("10.25384/")):
            # don't crawl "most recent version" (aka "group") DOIs
            if not release.version:
                return False

        # zenodo
        if doi and doi.startswith("10.5281/"):
            # if this is a "grouping" DOI of multiple "version" DOIs, do not crawl (will crawl the versioned DOIs)
            if release.extra and release.extra.get("relations"):
                for rel in release.extra["relations"]:
                    if rel.get("relationType") == "HasVersion" and rel.get(
                        "relatedIdentifier", ""
                    ).startswith("10.5281/"):
                        return False

        return True

    def run(self) -> None:
        def fail_fast(err: Any, _msg: Any) -> None:
            if err is not None:
                print("Kafka producer delivery error: {}".format(err))
                print("Bailing out...")
                # TODO: should it be sys.exit(-1)?
                raise KafkaException(err)

        def on_commit(err: Any, partitions: List[Any]) -> None:
            if err is not None:
                print("Kafka consumer commit error: {}".format(err))
                print("Bailing out...")
                # TODO: should it be sys.exit(-1)?
                raise KafkaException(err)
            for p in partitions:
                # check for partition-specific commit errors
                print(p)
                if p.error:
                    print("Kafka consumer commit error: {}".format(p.error))
                    print("Bailing out...")
                    # TODO: should it be sys.exit(-1)?
                    raise KafkaException(p.error)
            print("Kafka consumer commit successful")
            pass

        def on_rebalance(consumer: Consumer, partitions: List[Any]) -> None:
            for p in partitions:
                if p.error:
                    raise KafkaException(p.error)
            print("Kafka partitions rebalanced: {} / {}".format(consumer, partitions))

        consumer_conf = self.kafka_config.copy()
        consumer_conf.update(
            {
                "group.id": self.consumer_group,
                "on_commit": fail_fast,
                # messages don't have offset marked as stored until pushed to
                # elastic, but we do auto-commit stored offsets to broker
                "enable.auto.commit": True,
                "enable.auto.offset.store": False,
                # user code timeout; if no poll after this long, assume user code
                # hung and rebalance (default: 5min)
                "max.poll.interval.ms": 180000,
                "default.topic.config": {
                    "auto.offset.reset": "latest",
                },
            }
        )
        consumer = Consumer(consumer_conf)

        producer_conf = self.kafka_config.copy()
        producer_conf.update(
            {
                "delivery.report.only.error": True,
                "default.topic.config": {
                    "request.required.acks": -1,  # all brokers must confirm
                },
            }
        )
        producer = Producer(producer_conf)

        consumer.subscribe(
            [self.consume_topic],
            on_assign=on_rebalance,
            on_revoke=on_rebalance,
        )
        print("Kafka consuming {}".format(self.consume_topic))

        while True:
            msg = consumer.poll(self.poll_interval)
            if not msg:
                print(
                    "nothing new from kafka (poll_interval: {} sec)".format(self.poll_interval)
                )
                continue
            if msg.error():
                raise KafkaException(msg.error())

            cle = json.loads(msg.value().decode("utf-8"))
            # print(cle)
            print("processing changelog index {}".format(cle["index"]))
            release_ids = []
            new_release_ids = []
            file_ids = []
            fileset_ids = []
            webcapture_ids = []
            container_ids = []
            work_ids = []
            release_edits = cle["editgroup"]["edits"]["releases"]
            for re in release_edits:
                release_ids.append(re["ident"])
                # filter to direct release edits which are not updates
                if not re.get("prev_revision") and not re.get("redirect_ident"):
                    new_release_ids.append(re["ident"])
            file_edits = cle["editgroup"]["edits"]["files"]
            for e in file_edits:
                file_ids.append(e["ident"])
            fileset_edits = cle["editgroup"]["edits"]["filesets"]
            for e in fileset_edits:
                fileset_ids.append(e["ident"])
            webcapture_edits = cle["editgroup"]["edits"]["webcaptures"]
            for e in webcapture_edits:
                webcapture_ids.append(e["ident"])
            container_edits = cle["editgroup"]["edits"]["containers"]
            for e in container_edits:
                container_ids.append(e["ident"])
            work_edits = cle["editgroup"]["edits"]["works"]
            for e in work_edits:
                work_ids.append(e["ident"])

            # TODO: do these fetches in parallel using a thread pool?
            for ident in set(file_ids):
                file_entity = self.api.get_file(ident, expand=None)
                # update release when a file changes
                # TODO: also fetch old version of file and update any *removed*
                # release idents (and same for filesets, webcapture updates)
                release_ids.extend(file_entity.release_ids or [])
                file_dict = self.api.api_client.sanitize_for_serialization(file_entity)
                producer.produce(
                    self.file_topic,
                    json.dumps(file_dict).encode("utf-8"),
                    key=ident.encode("utf-8"),
                    on_delivery=fail_fast,
                )

            # TODO: topic for fileset updates
            for ident in set(fileset_ids):
                fileset_entity = self.api.get_fileset(ident, expand=None)
                # update release when a fileset changes
                release_ids.extend(fileset_entity.release_ids or [])

            # TODO: topic for webcapture updates
            for ident in set(webcapture_ids):
                webcapture_entity = self.api.get_webcapture(ident, expand=None)
                # update release when a webcapture changes
                release_ids.extend(webcapture_entity.release_ids or [])

            for ident in set(container_ids):
                container = self.api.get_container(ident)
                container_dict = self.api.api_client.sanitize_for_serialization(container)
                producer.produce(
                    self.container_topic,
                    json.dumps(container_dict).encode("utf-8"),
                    key=ident.encode("utf-8"),
                    on_delivery=fail_fast,
                )

            for ident in set(release_ids):
                release = self.api.get_release(
                    ident, expand="files,filesets,webcaptures,container"
                )
                if release.work_id:
                    work_ids.append(release.work_id)
                release_dict = self.api.api_client.sanitize_for_serialization(release)
                producer.produce(
                    self.release_topic,
                    json.dumps(release_dict).encode("utf-8"),
                    key=ident.encode("utf-8"),
                    on_delivery=fail_fast,
                )
                # for ingest requests, filter to "new" active releases with no matched files
                if release.ident in new_release_ids:
                    ir = release_ingest_request(
                        release, ingest_request_source="fatcat-changelog"
                    )
                    if ir and not release.files and self.want_live_ingest(release, ir):
                        producer.produce(
                            self.ingest_file_request_topic,
                            json.dumps(ir).encode("utf-8"),
                            # key=None,
                            on_delivery=fail_fast,
                        )

            # send work updates (just ident and changelog metadata) to scholar for re-indexing
            for ident in set(work_ids):
                assert ident
                key = f"work_{ident}"
                work_ident_dict = dict(
                    key=key,
                    type="fatcat_work",
                    work_ident=ident,
                    updated=cle["timestamp"],
                    fatcat_changelog_index=cle["index"],
                )
                producer.produce(
                    self.work_ident_topic,
                    json.dumps(work_ident_dict).encode("utf-8"),
                    key=key.encode("utf-8"),
                    on_delivery=fail_fast,
                )

            producer.flush()
            # TODO: publish updated 'work' entities to a topic
            consumer.store_offsets(message=msg)