Перейти к содержанию

Сервисы (Services)

В этом разделе представлена документация по ключевым сервисным слоям приложения.

app.services.ingestion.IngestionService

Source code in app/services/ingestion.py
class IngestionService:
    def __init__(self, db: AsyncSession):
        self.db = db
        self.catalog_repo = PostgresCatalogRepository(db)
        self.parsing_repo = ParsingRepository(db)

    async def ingest_products(self, products: List[ScrapedProduct], source_id: int):
        if not products:
            return 0

        # 1. Handle Categories
        external_categories = list(set([p.category for p in products if p.category]))
        if external_categories:
            await self.parsing_repo.get_or_create_category_maps(external_categories)

        # 2. Prepare Product data for Upsert
        product_dicts = []
        for p in products:
            # Generate gift_id if not present or mapping to site_key:url/id
            # For now, let's use site_key:url as a simple unique ID placeholder
            # Real implementation might differ based on provider
            gift_id = f"{p.site_key}:{p.product_url}" 

            product_dicts.append({
                "gift_id": gift_id,
                "title": p.title,
                "description": p.description,
                "price": p.price,
                "currency": p.currency,
                "image_url": p.image_url,
                "product_url": p.product_url,
                "merchant": p.merchant,
                "category": p.category, # We store raw category for now
                "raw": p.raw_data,
                "is_active": True
            })

        # 3. Bulk Upsert
        count = await self.catalog_repo.upsert_products(product_dicts)

        # 4. Update Source Stats (Simplified)
        await self.parsing_repo.update_source_stats(source_id, {"processed_items": len(products)})

        await self.db.commit()
        return count

    async def ingest_categories(self, categories: List[ScrapedCategory]):
        # Logic for discovery: potentially creating new ParsingSource entries
        # For now, just a placeholder
        pass

app.services.intelligence.IntelligenceService

Source code in app/services/intelligence.py
class IntelligenceService:
    def __init__(self):
        settings = get_settings()
        self.base_url = settings.intelligence_api_base.rstrip("/")
        self.token = settings.intelligence_api_token
        self.timeout = 30.0

    async def classify_categories(
        self, 
        external_names: List[str], 
        internal_categories: List[Dict[str, Any]]
    ) -> List[Dict[str, Any]]:
        """
        Calls external API to map external category names to internal category IDs.
        """
        if not external_names:
            return []

        url = f"{self.base_url}/v1/classify/categories"
        payload = {
            "external_names": external_names,
            "internal_categories": internal_categories
        }
        headers = {}
        if self.token:
            headers["Authorization"] = f"Bearer {self.token}"

        try:
            async with httpx.AsyncClient(timeout=self.timeout) as client:
                response = await client.post(url, json=payload, headers=headers)
                response.raise_for_status()
                data = response.json()
                return data.get("mappings", [])
        except Exception as e:
            logger.error(f"Intelligence API Error (classify_categories): {e}")
            return []

    async def score_product(self, product_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        Calls external API to get giftability score and reasoning.
        """
        url = f"{self.base_url}/v1/products/score"
        headers = {}
        if self.token:
            headers["Authorization"] = f"Bearer {self.token}"

        try:
            async with httpx.AsyncClient(timeout=self.timeout) as client:
                response = await client.post(url, json=product_data, headers=headers)
                response.raise_for_status()
                return response.json()
        except Exception as e:
            logger.error(f"Intelligence API Error (score_product): {e}")
            return {}

Functions

classify_categories(external_names, internal_categories) async

Calls external API to map external category names to internal category IDs.

Source code in app/services/intelligence.py
async def classify_categories(
    self, 
    external_names: List[str], 
    internal_categories: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
    """
    Calls external API to map external category names to internal category IDs.
    """
    if not external_names:
        return []

    url = f"{self.base_url}/v1/classify/categories"
    payload = {
        "external_names": external_names,
        "internal_categories": internal_categories
    }
    headers = {}
    if self.token:
        headers["Authorization"] = f"Bearer {self.token}"

    try:
        async with httpx.AsyncClient(timeout=self.timeout) as client:
            response = await client.post(url, json=payload, headers=headers)
            response.raise_for_status()
            data = response.json()
            return data.get("mappings", [])
    except Exception as e:
        logger.error(f"Intelligence API Error (classify_categories): {e}")
        return []

score_product(product_data) async

Calls external API to get giftability score and reasoning.

Source code in app/services/intelligence.py
async def score_product(self, product_data: Dict[str, Any]) -> Dict[str, Any]:
    """
    Calls external API to get giftability score and reasoning.
    """
    url = f"{self.base_url}/v1/products/score"
    headers = {}
    if self.token:
        headers["Authorization"] = f"Bearer {self.token}"

    try:
        async with httpx.AsyncClient(timeout=self.timeout) as client:
            response = await client.post(url, json=product_data, headers=headers)
            response.raise_for_status()
            return response.json()
    except Exception as e:
        logger.error(f"Intelligence API Error (score_product): {e}")
        return {}

app.services.recommendation.RecommendationService

Source code in app/services/recommendation.py
class RecommendationService:
    def __init__(self, session: AsyncSession, embedding_service: EmbeddingService):
        self.session = session
        self.repo = PostgresCatalogRepository(session)
        self.embedding_service = embedding_service

    async def generate_recommendations(
        self, 
        request: RecommendationRequest,
        engine_version: str = "vector_v1"
    ) -> RecommendationResponse:
        """
        Main orchestration method for recommendation generation.
        Implements Stages A, B, C, D from the roadmap.
        """
        logger.info(f"Generating recommendations for request: {request}")

        # Stage A: Vector Retrieval
        candidates = await self._retrieve_candidates(request)

        # Stage B: CPU Ranker
        ranked_candidates = await self._rank_candidates(request, candidates)

        # Stage C: LLM-as-judge Rerank (Stub)
        final_candidates = await self._judge_rerank(request, ranked_candidates)

        # Stage D: Constraints Re-ranking (Diversity)
        final_gifts_data = self._apply_final_rank_and_diversity(request, final_candidates)

        gifts = [
            GiftDTO(
                id=g.gift_id,
                title=g.title,
                description=g.description,
                price=g.price,
                currency=g.currency,
                image_url=g.image_url,
                product_url=g.product_url,
                merchant=g.merchant,
                category=g.category
            ) for g in final_gifts_data
        ]
        featured_gift = gifts[0] if gifts else None

        return RecommendationResponse(
            quiz_run_id="stub-id", # TODO: integrate with quiz_run repo
            engine_version=engine_version,
            featured_gift=featured_gift,
            gifts=gifts,
            debug={"status": "candidates_retrieved", "count": len(candidates)} if request.debug else None
        )

    def _build_query_text(self, request: RecommendationRequest) -> str:
        """Construct semantic search query from quiz answers."""
        parts = [
            f"Gift for {request.relationship or 'someone'}",
            f"Occasion: {request.occasion}" if request.occasion else "",
            f"Age: {request.recipient_age}",
            f"Gender: {request.recipient_gender}" if request.recipient_gender else "",
            f"Interests: {', '.join(request.interests)}" if request.interests else "",
            f"Description: {request.interests_description}" if request.interests_description else "",
            f"Vibe: {request.vibe}" if request.vibe else "",
        ]
        return " ".join([p for p in parts if p]).strip()

    async def _retrieve_candidates(self, request: RecommendationRequest) -> list[Any]:
        """Stage A: Vector Retrieval (pgvector)"""
        query_text = self._build_query_text(request)
        logger.info(f"Retrieving candidates for query: '{query_text}'")

        # 1. Generate Query Embedding
        query_vector = self.embedding_service.embed_batch([query_text])[0]

        # 2. Search in Repo (Top 50 candidates for further ranking)
        candidates = await self.repo.search_similar_products(
            embedding=query_vector, 
            limit=50,
            is_active_only=True
        )
        return candidates

    async def _rank_candidates(self, request: RecommendationRequest, candidates: list[Any]) -> list[Any]:
        """Stage B: CPU Ranker (Logic from SoT Section 5)"""
        return candidates

    async def _judge_rerank(self, request: RecommendationRequest, candidates: list[Any]) -> list[Any]:
        """Stage C: LLM-as-judge Rerank (Stub for now)"""
        return candidates

    def _apply_final_rank_and_diversity(self, request: RecommendationRequest, candidates: list[Any]) -> list[Any]:
        """Stage D: Constraints Re-ranking & Diversity"""
        return candidates[:request.top_n]

Functions

generate_recommendations(request, engine_version='vector_v1') async

Main orchestration method for recommendation generation. Implements Stages A, B, C, D from the roadmap.

Source code in app/services/recommendation.py
async def generate_recommendations(
    self, 
    request: RecommendationRequest,
    engine_version: str = "vector_v1"
) -> RecommendationResponse:
    """
    Main orchestration method for recommendation generation.
    Implements Stages A, B, C, D from the roadmap.
    """
    logger.info(f"Generating recommendations for request: {request}")

    # Stage A: Vector Retrieval
    candidates = await self._retrieve_candidates(request)

    # Stage B: CPU Ranker
    ranked_candidates = await self._rank_candidates(request, candidates)

    # Stage C: LLM-as-judge Rerank (Stub)
    final_candidates = await self._judge_rerank(request, ranked_candidates)

    # Stage D: Constraints Re-ranking (Diversity)
    final_gifts_data = self._apply_final_rank_and_diversity(request, final_candidates)

    gifts = [
        GiftDTO(
            id=g.gift_id,
            title=g.title,
            description=g.description,
            price=g.price,
            currency=g.currency,
            image_url=g.image_url,
            product_url=g.product_url,
            merchant=g.merchant,
            category=g.category
        ) for g in final_gifts_data
    ]
    featured_gift = gifts[0] if gifts else None

    return RecommendationResponse(
        quiz_run_id="stub-id", # TODO: integrate with quiz_run repo
        engine_version=engine_version,
        featured_gift=featured_gift,
        gifts=gifts,
        debug={"status": "candidates_retrieved", "count": len(candidates)} if request.debug else None
    )