Some checks failed
Move db.commit() from services to API endpoints and Celery tasks. Services now use db.flush() only; endpoints own the transaction. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
151 lines
5.7 KiB
Python
151 lines
5.7 KiB
Python
# app/modules/prospecting/tests/unit/test_prospect_service.py
|
|
"""
|
|
Unit tests for ProspectService.
|
|
"""
|
|
|
|
import pytest
|
|
|
|
from app.modules.prospecting.exceptions import ProspectNotFoundException
|
|
from app.modules.prospecting.services.prospect_service import ProspectService
|
|
|
|
|
|
@pytest.mark.unit
|
|
@pytest.mark.prospecting
|
|
class TestProspectService:
|
|
"""Tests for ProspectService."""
|
|
|
|
def setup_method(self):
|
|
self.service = ProspectService()
|
|
|
|
def test_create_digital_prospect(self, db):
|
|
"""Test creating a digital prospect."""
|
|
prospect = self.service.create(
|
|
db,
|
|
{"channel": "digital", "domain_name": "example.lu", "source": "domain_scan"},
|
|
)
|
|
|
|
assert prospect.id is not None
|
|
assert prospect.channel.value == "digital"
|
|
assert prospect.domain_name == "example.lu"
|
|
assert prospect.status.value == "pending"
|
|
|
|
def test_create_offline_prospect(self, db):
|
|
"""Test creating an offline prospect."""
|
|
prospect = self.service.create(
|
|
db,
|
|
{
|
|
"channel": "offline",
|
|
"business_name": "Test Café",
|
|
"source": "street",
|
|
"city": "Luxembourg",
|
|
},
|
|
)
|
|
|
|
assert prospect.id is not None
|
|
assert prospect.channel.value == "offline"
|
|
assert prospect.business_name == "Test Café"
|
|
assert prospect.city == "Luxembourg"
|
|
|
|
def test_create_digital_prospect_with_contacts(self, db):
|
|
"""Test creating a digital prospect with inline contacts."""
|
|
prospect = self.service.create(
|
|
db,
|
|
{
|
|
"channel": "digital",
|
|
"domain_name": "with-contacts.lu",
|
|
"contacts": [
|
|
{"contact_type": "email", "value": "info@with-contacts.lu"},
|
|
],
|
|
},
|
|
)
|
|
|
|
assert prospect.id is not None
|
|
assert len(prospect.contacts) == 1
|
|
assert prospect.contacts[0].value == "info@with-contacts.lu"
|
|
|
|
def test_get_by_id(self, db, digital_prospect):
|
|
"""Test getting a prospect by ID."""
|
|
result = self.service.get_by_id(db, digital_prospect.id)
|
|
assert result.id == digital_prospect.id
|
|
assert result.domain_name == digital_prospect.domain_name
|
|
|
|
def test_get_by_id_not_found(self, db):
|
|
"""Test getting non-existent prospect raises exception."""
|
|
with pytest.raises(ProspectNotFoundException):
|
|
self.service.get_by_id(db, 99999)
|
|
|
|
def test_get_by_domain(self, db, digital_prospect):
|
|
"""Test getting a prospect by domain name."""
|
|
result = self.service.get_by_domain(db, digital_prospect.domain_name)
|
|
assert result is not None
|
|
assert result.id == digital_prospect.id
|
|
|
|
def test_get_all_with_pagination(self, db, digital_prospect, offline_prospect):
|
|
"""Test listing prospects with pagination."""
|
|
prospects, total = self.service.get_all(db, page=1, per_page=10)
|
|
assert total >= 2
|
|
assert len(prospects) >= 2
|
|
|
|
def test_get_all_filter_by_channel(self, db, digital_prospect, offline_prospect):
|
|
"""Test filtering prospects by channel."""
|
|
digital, count = self.service.get_all(db, channel="digital")
|
|
assert count >= 1
|
|
assert all(p.channel.value == "digital" for p in digital)
|
|
|
|
def test_get_all_filter_by_status(self, db, digital_prospect):
|
|
"""Test filtering prospects by status."""
|
|
active, count = self.service.get_all(db, status="active")
|
|
assert count >= 1
|
|
assert all(p.status.value == "active" for p in active)
|
|
|
|
def test_get_all_search(self, db, digital_prospect):
|
|
"""Test searching prospects by domain or business name."""
|
|
domain = digital_prospect.domain_name
|
|
results, count = self.service.get_all(db, search=domain[:8])
|
|
assert count >= 1
|
|
|
|
def test_update_prospect(self, db, digital_prospect):
|
|
"""Test updating a prospect."""
|
|
updated = self.service.update(
|
|
db, digital_prospect.id, {"notes": "Updated notes", "status": "contacted"}
|
|
)
|
|
assert updated.notes == "Updated notes"
|
|
status = updated.status.value if hasattr(updated.status, "value") else updated.status
|
|
assert status == "contacted"
|
|
|
|
def test_delete_prospect(self, db, digital_prospect):
|
|
"""Test deleting a prospect."""
|
|
pid = digital_prospect.id
|
|
self.service.delete(db, pid)
|
|
|
|
with pytest.raises(ProspectNotFoundException):
|
|
self.service.get_by_id(db, pid)
|
|
|
|
def test_create_bulk(self, db):
|
|
"""Test bulk creating prospects from domain list."""
|
|
domains = ["bulk1.lu", "bulk2.lu", "bulk3.lu"]
|
|
created, skipped = self.service.create_bulk(db, domains, source="csv_import")
|
|
assert created == 3
|
|
assert skipped == 0
|
|
|
|
def test_create_bulk_skips_duplicates(self, db, digital_prospect):
|
|
"""Test bulk create skips existing domains."""
|
|
domains = [digital_prospect.domain_name, "new-domain.lu"]
|
|
created, skipped = self.service.create_bulk(db, domains, source="csv_import")
|
|
assert created == 1
|
|
assert skipped == 1
|
|
|
|
def test_count_by_status(self, db, digital_prospect, offline_prospect):
|
|
"""Test counting prospects by status."""
|
|
counts = self.service.count_by_status(db)
|
|
assert isinstance(counts, dict)
|
|
assert counts.get("active", 0) >= 1
|
|
assert counts.get("pending", 0) >= 1
|
|
|
|
def test_count_by_channel(self, db, digital_prospect, offline_prospect):
|
|
"""Test counting prospects by channel."""
|
|
counts = self.service.count_by_channel(db)
|
|
assert isinstance(counts, dict)
|
|
assert counts.get("digital", 0) >= 1
|
|
assert counts.get("offline", 0) >= 1
|