Files
orion/app/modules/loyalty/tests/integration/test_terminal_devices.py
Samir Boulahtit c1bb225228 fix(loyalty): paired-device JWT bypasses program.require_staff_pin
When the request principal is a paired POS terminal device
(current_user.terminal_device_id is not None), the staff PIN is
considered already-verified — the cashier bcrypt-verified locally on
the tablet's lock screen against the cached hashes from
/pins/for-device. Web-terminal user JWTs still require the per-action
PIN as before; the strict fraud-prevention path is unchanged.

Threat-model note: the device JWT is itself proof of authentication.
The merchant owner paired the device, the cashier verified locally,
and the JWT is revocable from /merchants/loyalty/devices. The 2-min
idle auto-lock + acting_terminal_device_id audit column give us the
attribution we'd otherwise get from a per-action PIN.

Applied to: stamp_service.add_stamp / redeem_stamps / void_stamps;
points_service.earn_points / redeem_points / void_points. adjust_points
was already permissive on missing PIN. New tests in TestDevicePinBypass
lock both the bypass behavior and the still-strict web-terminal path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 23:16:33 +02:00

364 lines
12 KiB
Python

# app/modules/loyalty/tests/integration/test_terminal_devices.py
"""Integration tests for terminal device pairing & revocation.
Covers both the merchant API (`/api/v1/merchants/loyalty/devices/...`) and
the admin-on-behalf API (`/api/v1/admin/loyalty/merchants/{id}/devices/...`),
plus the bearer-auth path on the store API that accepts a paired-device JWT.
"""
import pytest
from jose import jwt
from app.modules.loyalty.models import TerminalDevice
from middleware.auth import AuthManager
MERCHANT_BASE = "/api/v1/merchants/loyalty"
STORE_BASE = "/api/v1/store/loyalty"
@pytest.mark.integration
@pytest.mark.api
@pytest.mark.loyalty
class TestMerchantPairing:
"""POST /merchants/loyalty/devices — creates row and returns one-time QR."""
def test_pair_device_returns_setup_token_and_qr(
self, client, loyalty_merchant_headers, loyalty_store_setup, db
):
store = loyalty_store_setup["store"]
response = client.post(
f"{MERCHANT_BASE}/devices",
json={"store_id": store.id, "label": "Counter 1"},
headers=loyalty_merchant_headers,
)
assert response.status_code == 201, response.text
data = response.json()
assert data["label"] == "Counter 1"
assert data["store_id"] == store.id
assert data["status"] == "active"
assert data["setup_token"]
assert data["qr_png_base64"].startswith("data:image/png;base64,")
assert data["setup_payload"]["store_code"] == store.store_code
assert data["setup_payload"]["auth_token"] == data["setup_token"]
# Token is signed by AuthManager and carries the device_setup claim
# plus the same jti as the row.
auth = AuthManager()
decoded = jwt.decode(
data["setup_token"], auth.secret_key, algorithms=[auth.algorithm]
)
assert decoded["device_setup"] is True
assert decoded["store_id"] == store.id
device = (
db.query(TerminalDevice)
.filter(TerminalDevice.id == data["id"])
.first()
)
assert device is not None
assert device.jti == decoded["jti"]
def test_pair_device_rejects_foreign_store(
self, client, loyalty_merchant_headers, db
):
"""A merchant cannot pair against a store they don't own."""
# store_id=99999 doesn't belong to this merchant
response = client.post(
f"{MERCHANT_BASE}/devices",
json={"store_id": 99999, "label": "Hijack"},
headers=loyalty_merchant_headers,
)
assert response.status_code == 404
@pytest.mark.integration
@pytest.mark.api
@pytest.mark.loyalty
class TestMerchantListAndRevoke:
"""GET / POST revoke — listing and lifecycle."""
def test_list_excludes_revoked_by_default(
self, client, loyalty_merchant_headers, loyalty_store_setup
):
store = loyalty_store_setup["store"]
# Pair two devices
for label in ("A", "B"):
r = client.post(
f"{MERCHANT_BASE}/devices",
json={"store_id": store.id, "label": label},
headers=loyalty_merchant_headers,
)
assert r.status_code == 201
# Revoke first one
listing = client.get(
f"{MERCHANT_BASE}/devices", headers=loyalty_merchant_headers
).json()
first_id = listing["devices"][-1]["id"] # deterministic order: created_at desc
r = client.post(
f"{MERCHANT_BASE}/devices/{first_id}/revoke",
json={"reason": "lost"},
headers=loyalty_merchant_headers,
)
assert r.status_code == 200
assert r.json()["status"] == "revoked"
# Default list excludes the revoked one
active = client.get(
f"{MERCHANT_BASE}/devices", headers=loyalty_merchant_headers
).json()
assert active["total"] == 1
# include_revoked=true brings it back
full = client.get(
f"{MERCHANT_BASE}/devices?include_revoked=true",
headers=loyalty_merchant_headers,
).json()
assert full["total"] == 2
def test_revoke_is_idempotent(
self, client, loyalty_merchant_headers, loyalty_store_setup
):
store = loyalty_store_setup["store"]
r = client.post(
f"{MERCHANT_BASE}/devices",
json={"store_id": store.id, "label": "Idem"},
headers=loyalty_merchant_headers,
).json()
first = client.post(
f"{MERCHANT_BASE}/devices/{r['id']}/revoke",
json={},
headers=loyalty_merchant_headers,
).json()
second = client.post(
f"{MERCHANT_BASE}/devices/{r['id']}/revoke",
json={},
headers=loyalty_merchant_headers,
).json()
assert first["revoked_at"] == second["revoked_at"]
@pytest.mark.integration
@pytest.mark.api
@pytest.mark.loyalty
class TestStoreAPIBearerAuth:
"""Paired-device JWT is accepted on the store API."""
def test_device_token_authenticates_store_api(
self, client, loyalty_merchant_headers, loyalty_store_setup
):
store = loyalty_store_setup["store"]
paired = client.post(
f"{MERCHANT_BASE}/devices",
json={"store_id": store.id, "label": "Counter 1"},
headers=loyalty_merchant_headers,
).json()
token = paired["setup_token"]
response = client.get(
f"{STORE_BASE}/program",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 200, response.text
body = response.json()
assert body["merchant_id"] == loyalty_store_setup["merchant"].id
def test_revoked_device_token_is_rejected(
self, client, loyalty_merchant_headers, loyalty_store_setup
):
store = loyalty_store_setup["store"]
paired = client.post(
f"{MERCHANT_BASE}/devices",
json={"store_id": store.id, "label": "Burn"},
headers=loyalty_merchant_headers,
).json()
token = paired["setup_token"]
client.post(
f"{MERCHANT_BASE}/devices/{paired['id']}/revoke",
json={"reason": "test"},
headers=loyalty_merchant_headers,
)
response = client.get(
f"{STORE_BASE}/program",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code in (400, 401)
@pytest.mark.integration
@pytest.mark.api
@pytest.mark.loyalty
class TestActingDeviceAudit:
"""Transactions performed via a device JWT carry acting_terminal_device_id."""
def test_points_earn_via_device_stamps_audit_column(
self, client, loyalty_merchant_headers, loyalty_store_setup, db
):
from app.modules.loyalty.models import LoyaltyTransaction
from app.modules.loyalty.models.loyalty_transaction import TransactionType
store = loyalty_store_setup["store"]
card = loyalty_store_setup["card"]
paired = client.post(
f"{MERCHANT_BASE}/devices",
json={"store_id": store.id, "label": "Counter 1"},
headers=loyalty_merchant_headers,
).json()
device_id = paired["id"]
token = paired["setup_token"]
response = client.post(
f"{STORE_BASE}/points/earn",
json={"card_id": card.id, "purchase_amount_cents": 1500},
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 200, response.text
tx = (
db.query(LoyaltyTransaction)
.filter(
LoyaltyTransaction.card_id == card.id,
LoyaltyTransaction.transaction_type
== TransactionType.POINTS_EARNED.value,
)
.order_by(LoyaltyTransaction.id.desc())
.first()
)
assert tx is not None
assert tx.acting_terminal_device_id == device_id
def test_user_token_leaves_audit_column_null(
self, client, loyalty_store_headers, loyalty_store_setup, db
):
"""Web-terminal user JWT must NOT stamp acting_terminal_device_id."""
from app.modules.loyalty.models import LoyaltyTransaction
from app.modules.loyalty.models.loyalty_transaction import TransactionType
card = loyalty_store_setup["card"]
response = client.post(
f"{STORE_BASE}/points/earn",
json={"card_id": card.id, "purchase_amount_cents": 1500},
headers=loyalty_store_headers,
)
assert response.status_code == 200, response.text
tx = (
db.query(LoyaltyTransaction)
.filter(
LoyaltyTransaction.card_id == card.id,
LoyaltyTransaction.transaction_type
== TransactionType.POINTS_EARNED.value,
)
.order_by(LoyaltyTransaction.id.desc())
.first()
)
assert tx is not None
assert tx.acting_terminal_device_id is None
@pytest.mark.integration
@pytest.mark.api
@pytest.mark.loyalty
class TestPinsForDevice:
"""GET /pins/for-device exposes hashes only to paired devices."""
def test_device_token_receives_hashes(
self, client, loyalty_merchant_headers, loyalty_store_setup, db
):
from app.modules.loyalty.models import StaffPin
store = loyalty_store_setup["store"]
program = loyalty_store_setup["program"]
pin = StaffPin(
merchant_id=store.merchant_id,
program_id=program.id,
store_id=store.id,
name="Test Cashier",
staff_id="cash01",
)
pin.set_pin("4321")
db.add(pin)
db.commit()
db.refresh(pin)
paired = client.post(
f"{MERCHANT_BASE}/devices",
json={"store_id": store.id, "label": "PIN test"},
headers=loyalty_merchant_headers,
).json()
token = paired["setup_token"]
response = client.get(
f"{STORE_BASE}/pins/for-device",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 200, response.text
payload = response.json()
assert payload["total"] >= 1
target = next(p for p in payload["pins"] if p["id"] == pin.id)
assert target["pin_hash"].startswith("$2") and len(target["pin_hash"]) > 50
def test_user_token_is_rejected(
self, client, loyalty_store_headers
):
response = client.get(
f"{STORE_BASE}/pins/for-device",
headers=loyalty_store_headers,
)
assert response.status_code == 403, response.text
@pytest.mark.integration
@pytest.mark.api
@pytest.mark.loyalty
class TestDevicePinBypass:
"""Paired-device JWT bypasses program.require_staff_pin — the cashier
has already verified locally on the lock screen."""
def test_device_token_bypasses_required_staff_pin(
self, client, loyalty_merchant_headers, loyalty_store_setup, db
):
store = loyalty_store_setup["store"]
card = loyalty_store_setup["card"]
program = loyalty_store_setup["program"]
# Tighten the program to require PINs.
program.require_staff_pin = True
db.commit()
paired = client.post(
f"{MERCHANT_BASE}/devices",
json={"store_id": store.id, "label": "PIN bypass test"},
headers=loyalty_merchant_headers,
).json()
token = paired["setup_token"]
# No staff_pin in the body, but the device JWT counts as auth.
response = client.post(
f"{STORE_BASE}/points/earn",
json={"card_id": card.id, "purchase_amount_cents": 1000},
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 200, response.text
def test_user_token_still_requires_staff_pin(
self, client, loyalty_store_headers, loyalty_store_setup, db
):
"""Belt-and-braces: web-terminal user JWTs must still send a PIN
when the program requires it."""
card = loyalty_store_setup["card"]
program = loyalty_store_setup["program"]
program.require_staff_pin = True
db.commit()
response = client.post(
f"{STORE_BASE}/points/earn",
json={"card_id": card.id, "purchase_amount_cents": 1000},
headers=loyalty_store_headers,
)
assert response.status_code == 400
assert "STAFF_PIN_REQUIRED" in response.text