Files
fuel-price/app/Services/FuelPriceService.php
Ovidiu U 06f5f2035f refactor: extract iterateBatches helper in FuelPriceService
Audit item #9. pollPrices() and refreshStations() shared the same
do/while batch loop with token + try/catch + Log::error + clean-exit
detection but different bodies. Extracted to a private iterateBatches()
that takes the endpoint, optional since timestamp, and a process
callable. Returns [total_processed, completedCleanly] so pollPrices
can still gate its incremental-poll cache update on a clean exhaustion.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 20:21:21 +01:00

365 lines
13 KiB
PHP
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
namespace App\Services;
use App\Enums\FuelType;
use App\Models\Station;
use App\Models\StationPrice;
use App\Models\StationPriceCurrent;
use Carbon\CarbonInterface;
use Illuminate\Support\Carbon;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\Http;
use Illuminate\Support\Facades\Log;
use Throwable;
use ValueError;
class FuelPriceService
{
private const string TOKEN_CACHE_KEY = 'fuel_finder_access_token';
private const string LAST_PRICE_POLL_CACHE_KEY = 'fuel_finder_last_price_poll_at';
/**
* Per-fuel-type valid price range in pence (as returned by the API).
* Based on UK all-time records + 3075% headroom for future spikes.
* All-time records: petrol 191.6p, diesel 199.2p (Jul 2022).
*
* @var array<string, array{min: int, max: int}>
*/
private const array PRICE_LIMITS_PENCE = [
'e10' => ['min' => 80, 'max' => 750],
'e5' => ['min' => 80, 'max' => 840],
'b7_standard' => ['min' => 80, 'max' => 840],
'b7_premium' => ['min' => 80, 'max' => 960],
'b10' => ['min' => 80, 'max' => 840],
'hvo' => ['min' => 80, 'max' => 1050],
];
public function __construct(
private readonly StationTaggingService $taggingService,
private readonly ApiLogger $apiLogger,
) {}
public function getAccessToken(): string
{
return Cache::remember(self::TOKEN_CACHE_KEY, 3540, function (): string {
$url = config('services.fuel_finder.base_url').'/oauth/generate_access_token';
$response = $this->apiLogger->send('fuel_finder', 'POST', $url, fn () => Http::retry(3, 500)
->timeout(60)
->post($url, [
'client_id' => config('services.fuel_finder.client_id'),
'client_secret' => config('services.fuel_finder.client_secret'),
]));
return $response->json('data.access_token');
});
}
/**
* Poll the prices endpoint, deduplicate, and persist changes.
*
* Uses incremental polling when a previous poll timestamp is cached — only
* stations with prices changed since then are returned by the API. Falls
* back to a full fetch on cold start (cache miss).
*
* @return int Number of new price records inserted
*/
public function pollPrices(): int
{
$pollStartedAt = now();
$since = Cache::get(self::LAST_PRICE_POLL_CACHE_KEY);
$sinceCarbon = $since instanceof CarbonInterface ? $since : null;
[$inserted, $completedCleanly] = $this->iterateBatches(
'/pfs/fuel-prices',
$sinceCarbon,
fn (array $stations): int => $this->processPriceBatch($stations),
);
if ($completedCleanly) {
Cache::forever(self::LAST_PRICE_POLL_CACHE_KEY, $pollStartedAt);
}
return $inserted;
}
/**
* Fetch and upsert all station metadata.
* Called on full daily refresh before pollPrices().
*/
public function refreshStations(): void
{
$this->iterateBatches('/pfs', null, function (array $stations): int {
$this->upsertStations($stations);
return 0;
});
}
/**
* Drive a paginated fuel-finder endpoint until exhausted, calling
* $process on each non-empty batch. Returns the sum of $process return
* values plus a flag indicating the loop exited cleanly (404 or empty
* body) rather than via an HTTP error or thrown exception. Callers use
* the flag to decide whether to update incremental-poll bookkeeping.
*
* @param callable(array<int, array<string, mixed>>): int $process
* @return array{0: int, 1: bool}
*/
private function iterateBatches(string $endpoint, ?CarbonInterface $since, callable $process): array
{
$token = $this->getAccessToken();
$baseUrl = config('services.fuel_finder.base_url').$endpoint;
$total = 0;
$batch = 1;
$completedCleanly = false;
do {
try {
$params = ['batch-number' => $batch];
if ($since !== null) {
$params['effective-start-timestamp'] = $since->format('Y-m-d H:i:s');
}
$logUrl = $baseUrl.'?'.http_build_query($params);
$response = $this->apiLogger->send('fuel_finder', 'GET', $logUrl, fn () => Http::timeout(30)
->withToken($token)
->get($baseUrl, $params));
if ($response->notFound()) {
$completedCleanly = true;
break;
}
if (! $response->successful()) {
Log::error('FuelPriceService: batch returned error', [
'endpoint' => $endpoint,
'batch' => $batch,
'status' => $response->status(),
]);
break;
}
$stations = $response->json() ?? [];
} catch (Throwable $e) {
Log::error('FuelPriceService: batch fetch failed', [
'endpoint' => $endpoint,
'batch' => $batch,
'error' => $e->getMessage(),
]);
break;
}
if (empty($stations)) {
$completedCleanly = true;
break;
}
$total += $process($stations);
$batch++;
} while (true);
return [$total, $completedCleanly];
}
/** @param array<int, array<string, mixed>> $apiStations */
public function upsertStations(array $apiStations): void
{
$now = now();
$rows = [];
foreach ($apiStations as $data) {
if (! $this->hasRequiredStationFields($data)) {
Log::warning('FuelPriceService: station skipped — missing required fields', [
'node_id' => $data['node_id'] ?? null,
]);
continue;
}
$station = new Station([
'node_id' => $data['node_id'],
'trading_name' => $data['trading_name'],
'brand_name' => $data['brand_name'] ?? null,
'is_same_trading_and_brand' => $data['is_same_trading_and_brand_name'] ?? false,
'is_supermarket' => false,
'is_motorway_service_station' => $data['is_motorway_service_station'] ?? false,
'is_supermarket_service_station' => $data['is_supermarket_service_station'] ?? false,
'temporary_closure' => $data['temporary_closure'] ?? false,
'permanent_closure' => $data['permanent_closure'] ?? false,
'permanent_closure_date' => $data['permanent_closure_date'] ?? null,
'public_phone_number' => $data['public_phone_number'] ?? null,
'address_line_1' => $data['location']['address_line_1'] ?? null,
'address_line_2' => $data['location']['address_line_2'] ?? null,
'city' => $data['location']['city'] ?? null,
'county' => $data['location']['county'] ?? null,
'country' => $data['location']['country'] ?? null,
'postcode' => $data['location']['postcode'],
'lat' => $data['location']['latitude'],
'lng' => $data['location']['longitude'],
'amenities' => $this->flattenEnabledFlags($data['amenities'] ?? []),
'opening_times' => $data['opening_times'] ?? null,
'fuel_types' => $this->flattenEnabledFlags($data['fuel_types'] ?? []),
'last_seen_at' => $now,
]);
$this->taggingService->tag($station);
$rows[] = $station->getAttributes();
}
if ($rows === []) {
return;
}
Station::upsert($rows, ['node_id'], array_keys($rows[0]));
}
/** @param array<string, mixed> $data */
private function hasRequiredStationFields(array $data): bool
{
return ! empty($data['node_id'])
&& ! empty($data['trading_name'])
&& isset($data['location']['postcode'], $data['location']['latitude'], $data['location']['longitude']);
}
/**
* The API returns `amenities` and `fuel_types` as objects with boolean
* flags (e.g. {"E10": true, "car_wash": false}). Flatten to a list of
* enabled keys. If the payload is already an array of strings, return as-is.
*
* @param array<string, bool>|array<int, string> $flags
* @return array<int, string>
*/
private function flattenEnabledFlags(array $flags): array
{
if ($flags === []) {
return [];
}
if (array_is_list($flags)) {
return array_values($flags);
}
return array_values(array_keys(array_filter($flags, fn ($v) => filter_var($v, FILTER_VALIDATE_BOOLEAN))));
}
private function isValidPrice(FuelType $fuelType, float $pricePence): bool
{
$limits = self::PRICE_LIMITS_PENCE[$fuelType->value] ?? null;
if ($limits === null) {
return false;
}
return $pricePence >= $limits['min'] && $pricePence <= $limits['max'];
}
/**
* Process one batch of API price data.
*
* Loads current prices for all stations in the batch, inserts a new
* station_prices row only when the price has changed, and upserts
* station_prices_current to reflect the latest known price.
*
* @param array<int, array<string, mixed>> $apiBatch
*/
private function processPriceBatch(array $apiBatch): int
{
$stationIds = array_column($apiBatch, 'node_id');
// Filter to stations that exist in the stations table — prevents FK
// violations when the API surfaces a station before the next metadata
// refresh picks it up.
$knownStationIds = array_flip(
Station::whereIn('node_id', $stationIds)->pluck('node_id')->all(),
);
$unknown = array_diff($stationIds, array_keys($knownStationIds));
if ($unknown !== []) {
Log::info('FuelPriceService: skipped prices for unknown stations', [
'count' => count($unknown),
]);
}
// Load current prices for all stations in this batch in one query
$currentPrices = StationPriceCurrent::whereIn('station_id', array_keys($knownStationIds))
->get()
->groupBy('station_id')
->map(fn ($rows) => $rows->keyBy(fn ($r) => $r->fuel_type->value));
$now = now();
$newPrices = [];
$upsertRows = [];
foreach ($apiBatch as $station) {
$stationId = $station['node_id'] ?? null;
if ($stationId === null || ! isset($knownStationIds[$stationId])) {
continue;
}
foreach ($station['fuel_prices'] ?? [] as $priceData) {
if (! isset($priceData['fuel_type'], $priceData['price'], $priceData['price_last_updated'], $priceData['price_change_effective_timestamp'])) {
continue;
}
try {
$fuelType = FuelType::fromApiValue($priceData['fuel_type']);
} catch (ValueError) {
continue; // Skip unknown fuel types
}
$rawPrice = (float) $priceData['price'];
if (! $this->isValidPrice($fuelType, $rawPrice)) {
Log::warning('FuelPriceService: price out of valid range — skipped', [
'station_id' => $stationId,
'fuel_type' => $fuelType->value,
'price' => $rawPrice,
'limits' => self::PRICE_LIMITS_PENCE[$fuelType->value],
]);
continue;
}
$pricePence = (int) round($rawPrice * 100);
$effectiveAt = Carbon::parse($priceData['price_change_effective_timestamp']);
$reportedAt = Carbon::parse($priceData['price_last_updated']);
$currentPricePence = $currentPrices[$stationId][$fuelType->value]->price_pence ?? null;
$row = [
'station_id' => $stationId,
'fuel_type' => $fuelType->value,
'price_pence' => $pricePence,
'price_effective_at' => $effectiveAt,
'price_reported_at' => $reportedAt,
'recorded_at' => $now,
];
// Always upsert current; only write history when price changed
$upsertRows[] = $row;
if ($currentPricePence !== $pricePence) {
$newPrices[] = $row;
}
}
}
if (! empty($newPrices)) {
StationPrice::insert($newPrices);
}
if (! empty($upsertRows)) {
StationPriceCurrent::upsert(
$upsertRows,
['station_id', 'fuel_type'],
['price_pence', 'price_effective_at', 'price_reported_at', 'recorded_at'],
);
}
return count($newPrices);
}
}