Files
fuel-alert/app/Services/Forecasting/LlmOverlayService.php
Ovidiu U 07e0789044 fix(forecasting): persist LLM overlay under Tier-1 ITPM via two-call architecture
The daily forecast:llm-overlay command was being skipped because the previous
single-conversation flow consumed more than Tier-1's 50,000 input-tokens-per-
minute Anthropic bucket. The web_search tool auto-caches its results (~55k
tokens) and requires `encrypted_content` intact when those blocks are resent,
so the prior retry-on-missing-citations path either 429'd or 400'd on the
second call.

LlmOverlayService now runs two independent API calls. Phase 1 invokes the
web_search tool and we discard the transcript after harvesting the URLs +
titles from the returned web_search_tool_result blocks. Phase 2 is a fresh
conversation containing the forecast context and the harvested headlines as
plain text, with a forced submit_overlay tool call. events_cited is now
optional in the tool schema — Haiku's flaky compliance no longer matters
because citations come from the search results, not the model's transcription.
Model-tagged events (with directional impact) merge with harvested-only
entries (impact: 'neutral'), deduped by URL.

Between phases the service reads anthropic-ratelimit-input-tokens-remaining /
…-reset from Phase 1's headers and sleeps proportionally — only long enough
for the SUBMIT_TOKEN_BUDGET worth of refill, not for the full bucket reset,
capped at 65 seconds.

ApiLogger now captures usage.input_tokens, usage.output_tokens,
cache_read_input_tokens, cache_creation_input_tokens, plus the rate-limit
remaining/reset headers on every Anthropic response. New nullable columns on
api_logs make rate-limit diagnostics directly queryable.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-14 14:22:42 +01:00

659 lines
24 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
namespace App\Services\Forecasting;
use App\Models\BrentPrice;
use App\Models\LlmOverlay;
use App\Models\VolatilityRegime;
use App\Services\ApiLogger;
use Carbon\CarbonImmutable;
use Carbon\CarbonInterface;
use Illuminate\Http\Client\Response;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Http;
use Illuminate\Support\Facades\Log;
use Throwable;
/**
* Layer 4 — daily news-aware overlay on the calibrated ridge forecast.
*
* Runs as two independent Anthropic API calls:
* Phase 1 — web_search tool only; we capture the URLs/titles from
* the returned web_search_tool_result blocks.
* Phase 2 — fresh conversation containing those URLs+titles as plain
* text plus a forced submit_overlay tool call.
*
* Phase 1's transcript is never sent back to Phase 2. Anthropic's
* web_search auto-caches the encrypted page text (~55k tokens per
* search) and requires it intact when web_search_tool_result blocks
* are resent. Threading it through to Phase 2 either blows the Tier-1
* 50k ITPM bucket or 400s if we try to strip it. Two clean calls keep
* Phase 2 around 3k input tokens.
*
* Citations are harvested directly from Phase 1's web_search_tool_result
* blocks — Haiku is unreliable about populating `events_cited` itself.
*
* Read-only with respect to the volatility flag — Layer 4 writes its
* `llm_overlays` row; Layer 5's hourly cron picks it up and decides
* whether to flip the regime.
*/
final class LlmOverlayService
{
private const string URL = 'https://api.anthropic.com/v1/messages';
private const int CONFIDENCE_CAP = 75;
private const int COOLDOWN_HOURS = 4;
private const int MAX_SEARCH_TURNS = 2;
/**
* Approximate input-token cost of Phase 2 (system + tool schema +
* forecast context + harvested URL list). If Phase 1 leaves
* remaining ITPM below this, wait for the bucket to refill.
*/
private const int SUBMIT_TOKEN_BUDGET = 4_000;
public function __construct(
private readonly ApiLogger $apiLogger,
private readonly WeeklyForecastService $weeklyForecast,
) {}
/**
* Run an overlay generation. $eventDriven=true respects the 4-hour
* cooldown; the daily 07:00 cron passes false to always run.
*/
public function run(bool $eventDriven = false): ?LlmOverlay
{
if ($this->apiKey() === null) {
Log::info('LlmOverlayService: no ANTHROPIC_API_KEY, skipping');
return null;
}
if ($eventDriven && $this->onCooldown()) {
return null;
}
$forecast = $this->weeklyForecast->currentForecast();
$context = $this->buildContext($forecast);
$callResult = $this->callAnthropic($context);
if ($callResult === null) {
return null;
}
$rawResult = $callResult['raw'];
$harvested = $callResult['harvested'];
$mergedEvents = $this->mergeEvents($rawResult['events_cited'] ?? [], $harvested);
$verifiedEvents = $this->verifyCitedUrls($mergedEvents);
if ($verifiedEvents === []) {
Log::warning('LlmOverlayService: no verified citations, rejecting overlay', [
'model_events' => $rawResult['events_cited'] ?? null,
'harvested_urls' => array_column($harvested, 'url'),
'direction' => $rawResult['direction'] ?? null,
'confidence' => $rawResult['confidence'] ?? null,
'reasoning_short' => $rawResult['reasoning_short'] ?? null,
]);
return null;
}
$confidence = max(0, min(self::CONFIDENCE_CAP, (int) ($rawResult['confidence'] ?? 0)));
$direction = $rawResult['direction'] ?? 'flat';
$agreesWithRidge = $direction === $this->ridgeDirection($forecast['predicted_direction']);
return LlmOverlay::query()->create([
'ran_at' => now(),
'forecast_for_week' => $this->upcomingMondayDateString(),
'direction' => $direction,
'confidence' => $confidence,
'reasoning' => (string) ($rawResult['reasoning_short'] ?? ''),
'events_json' => $verifiedEvents,
'agrees_with_ridge' => $agreesWithRidge,
'major_impact_event' => (bool) ($rawResult['major_impact_event'] ?? false),
'volatility_flag_on' => VolatilityRegime::currentlyActive() !== null,
'search_used' => true,
]);
}
private function onCooldown(): bool
{
$latest = LlmOverlay::query()->orderByDesc('ran_at')->first();
return $latest !== null
&& $latest->ran_at->greaterThanOrEqualTo(now()->subHours(self::COOLDOWN_HOURS));
}
/** @return array<string, mixed> */
private function buildContext(array $forecast): array
{
$ulspWeekly = DB::table('weekly_pump_prices')
->orderByDesc('date')
->limit(8)
->get(['date', 'ulsp_pence'])
->reverse()
->map(fn ($r): array => ['date' => (string) $r->date, 'ulsp_pence' => round((int) $r->ulsp_pence / 100, 1)])
->values()
->all();
$brentRecent = BrentPrice::query()
->orderByDesc('date')
->limit(14)
->get(['date', 'price_usd'])
->reverse()
->map(fn (BrentPrice $r): array => ['date' => (string) $r->date->toDateString(), 'price_usd' => (float) $r->price_usd])
->values()
->all();
return [
'ulsp_recent_8_weeks' => $ulspWeekly,
'brent_recent_14_days' => $brentRecent,
'ridge_model_says' => [
'direction' => $forecast['predicted_direction'] ?? 'stable',
'confidence' => $forecast['confidence_score'] ?? 0,
'magnitude_pence' => $forecast['predicted_change_pence'] ?? 0,
],
];
}
/**
* Two independent API calls:
*
* Phase 1 — runs the web_search tool, captures the assistant's
* returned `web_search_tool_result` blocks, then
* discards the transcript.
*
* Phase 2 — issues a brand-new conversation with the harvested
* URLs/titles flattened into a plain-text user message
* and forces a `submit_overlay` tool call.
*
* Why not one stitched conversation: Anthropic auto-caches web_search
* results into ITPM (≈55k tokens for a 1-search call) and requires
* `encrypted_content` intact when those blocks are sent back.
* Resending the Phase 1 transcript to Phase 2 either rate-limits us
* (29k+ tokens twice → exceeds the Tier-1 50k ITPM bucket) or 400s
* if we strip the encrypted blob. A fresh Phase 2 sends ~3k tokens
* total — small enough to fit in the recovered bucket after a
* short adaptive sleep.
*
* @return array{raw: array<string, mixed>, harvested: array<int, array{url: string, title: string}>}|null
*/
private function callAnthropic(array $context): ?array
{
try {
$phase1 = $this->runWebSearch($context);
if ($phase1 === null) {
return null;
}
$this->waitForRateLimitIfNeeded($phase1['response']);
$rawResult = $this->runSubmit($context, $phase1['harvested']);
if ($rawResult === null) {
return null;
}
return ['raw' => $rawResult, 'harvested' => $phase1['harvested']];
} catch (Throwable $e) {
Log::error('LlmOverlayService: callAnthropic failed', ['error' => $e->getMessage()]);
return null;
}
}
/**
* Phase 1: ask the model to search for news and capture the
* web_search_tool_result blocks. Returns the harvested citations
* and the final response (whose rate-limit headers tell us when
* the ITPM bucket will be replenished for Phase 2).
*
* @return array{harvested: array<int, array{url: string, title: string}>, response: Response}|null
*/
private function runWebSearch(array $context): ?array
{
$messages = [['role' => 'user', 'content' => $this->searchUserMessage($context)]];
$response = null;
for ($i = 0; $i < self::MAX_SEARCH_TURNS; $i++) {
$response = $this->apiLogger->send('anthropic', 'POST', self::URL, fn () => Http::timeout(45)
->withHeaders($this->headers())
->post(self::URL, [
'model' => $this->model(),
'max_tokens' => 1024,
'system' => $this->searchSystem(),
'tools' => [['type' => 'web_search_20250305', 'name' => 'web_search']],
'messages' => $messages,
]));
if (! $response->successful()) {
Log::error('LlmOverlayService: search request failed', [
'status' => $response->status(),
'body' => substr($response->body(), 0, 500),
]);
return null;
}
$messages[] = ['role' => 'assistant', 'content' => $response->json('content')];
if ($response->json('stop_reason') !== 'pause_turn') {
break;
}
}
if ($response === null) {
return null;
}
return [
'harvested' => $this->harvestSearchResults($messages),
'response' => $response,
];
}
/**
* Phase 2: fresh API call — no Phase 1 transcript — with the
* harvested citations as plain text and a forced submit_overlay
* tool call.
*
* @param array<int, array{url: string, title: string}> $harvested
* @return array<string, mixed>|null
*/
private function runSubmit(array $context, array $harvested): ?array
{
$response = $this->apiLogger->send('anthropic', 'POST', self::URL, fn () => Http::timeout(20)
->withHeaders($this->headers())
->post(self::URL, [
'model' => $this->model(),
'max_tokens' => 512,
'system' => $this->submitSystem(),
'tools' => [$this->submitOverlayTool()],
'tool_choice' => ['type' => 'tool', 'name' => 'submit_overlay'],
'messages' => [['role' => 'user', 'content' => $this->submitUserMessage($context, $harvested)]],
]));
if (! $response->successful()) {
Log::error('LlmOverlayService: submit request failed', [
'status' => $response->status(),
'body' => substr($response->body(), 0, 500),
]);
return null;
}
$rawResult = $this->extractToolInput($response->json('content') ?? []);
if ($rawResult === null) {
Log::warning('LlmOverlayService: submit response missing tool_use block');
return null;
}
return $rawResult;
}
/**
* Anthropic's web_search burns ≈55k input tokens (mostly auto-cached
* search results) on Phase 1. At Tier 1's 50k ITPM the bucket can
* be at zero immediately afterwards. Read the rate-limit headers
* and sleep until the bucket has refilled enough for Phase 2.
* Capped at 65s so the daily cron never hangs longer than a minute.
*/
private function waitForRateLimitIfNeeded(Response $response): void
{
$remaining = (int) $response->header('anthropic-ratelimit-input-tokens-remaining');
if ($response->header('anthropic-ratelimit-input-tokens-remaining') === ''
|| $remaining >= self::SUBMIT_TOKEN_BUDGET) {
return;
}
$resetAt = $response->header('anthropic-ratelimit-input-tokens-reset');
$bucketSize = (int) $response->header('anthropic-ratelimit-input-tokens-limit');
if ($resetAt === '' || $bucketSize <= 0) {
return;
}
try {
$secondsUntilFullReset = max(0, CarbonImmutable::parse($resetAt)->getTimestamp() - now()->getTimestamp());
} catch (Throwable) {
return;
}
// Anthropic's bucket refills linearly. We don't need to wait for
// the full reset — only enough for SUBMIT_TOKEN_BUDGET tokens to
// become available. Sleep proportionally + a small safety margin,
// hard-capped at 65s.
$tokensNeeded = self::SUBMIT_TOKEN_BUDGET - $remaining;
$proportional = (int) ceil(($tokensNeeded / $bucketSize) * $secondsUntilFullReset);
$waitSeconds = max(1, min(65, $proportional + 2));
Log::info('LlmOverlayService: waiting for ITPM bucket refill before submit', [
'remaining' => $remaining,
'wait_seconds' => $waitSeconds,
'full_reset_in' => $secondsUntilFullReset,
]);
sleep($waitSeconds);
}
/**
* Walk every assistant turn and extract `{url, title}` from each
* `web_search_tool_result` block. Anthropic's web_search returns
* these blocks directly — they are the authoritative citation
* source, not anything the model transcribes back to us.
*
* @param array<int, array<string, mixed>> $messages
* @return array<int, array{url: string, title: string}>
*/
private function harvestSearchResults(array $messages): array
{
$byUrl = [];
foreach ($messages as $message) {
if (($message['role'] ?? null) !== 'assistant') {
continue;
}
$content = $message['content'] ?? [];
if (! is_array($content)) {
continue;
}
foreach ($content as $block) {
if (! is_array($block) || ($block['type'] ?? null) !== 'web_search_tool_result') {
continue;
}
$results = $block['content'] ?? [];
if (! is_array($results)) {
continue;
}
foreach ($results as $result) {
if (! is_array($result) || ($result['type'] ?? null) !== 'web_search_result') {
continue;
}
$url = (string) ($result['url'] ?? '');
if ($url === '' || isset($byUrl[$url])) {
continue;
}
$byUrl[$url] = ['url' => $url, 'title' => (string) ($result['title'] ?? '')];
}
}
}
return array_values($byUrl);
}
/**
* Merge model-provided events_cited with citations harvested from
* `web_search_tool_result`. Model entries (which include `impact`
* tagging) take precedence on URL collision; harvested-only entries
* default to `impact: 'neutral'`.
*
* @param array<int, mixed> $modelEvents
* @param array<int, array{url: string, title: string}> $harvested
* @return array<int, array<string, mixed>>
*/
private function mergeEvents(array $modelEvents, array $harvested): array
{
$byUrl = [];
foreach ($modelEvents as $event) {
if (! is_array($event)) {
continue;
}
$url = (string) ($event['url'] ?? '');
if ($url === '') {
continue;
}
$byUrl[$url] = [
'headline' => (string) ($event['headline'] ?? ''),
'source' => (string) ($event['source'] ?? ''),
'url' => $url,
'impact' => in_array($event['impact'] ?? null, ['rising', 'falling', 'neutral'], true)
? $event['impact']
: 'neutral',
];
}
foreach ($harvested as $result) {
$url = $result['url'];
if (isset($byUrl[$url])) {
continue;
}
$byUrl[$url] = [
'headline' => $result['title'],
'source' => $this->domainOf($url),
'url' => $url,
'impact' => 'neutral',
];
}
return array_values($byUrl);
}
private function domainOf(string $url): string
{
$host = parse_url($url, PHP_URL_HOST);
return is_string($host) ? preg_replace('/^www\./', '', $host) : '';
}
private function verificationUserAgent(): string
{
$appUrl = rtrim((string) config('app.url'), '/');
return "Mozilla/5.0 (compatible; FuelPriceBot/1.0; +{$appUrl}/bot)";
}
/**
* Verify each cited URL is reachable. Major news sites (Reuters, FT,
* Bloomberg, BBC...) often reject HEAD with 403 / 405 even though
* GET works fine. So: try HEAD first, then fall back to a 1-byte
* GET (Range header) when HEAD fails. Both must include a
* browser-shaped User-Agent or Cloudflare etc. block us as a bot.
*
* Every URL — verified or rejected — is logged at INFO/WARNING so
* operators can debug rejections from `storage/logs/laravel.log`
* without needing to capture the Anthropic response body.
*
* @param array<int, array<string, mixed>> $events
* @return array<int, array<string, mixed>>
*/
private function verifyCitedUrls(array $events): array
{
$verified = [];
foreach ($events as $event) {
$url = (string) ($event['url'] ?? '');
if ($url === '') {
Log::warning('LlmOverlayService: dropping cited event with empty URL', [
'headline' => $event['headline'] ?? null,
'source' => $event['source'] ?? null,
]);
continue;
}
[$reachable, $diagnosis] = $this->urlReachable($url);
if ($reachable) {
Log::info('LlmOverlayService: URL verified', [
'url' => $url,
'via' => $diagnosis,
]);
$verified[] = $event;
} else {
Log::warning('LlmOverlayService: URL rejected', [
'url' => $url,
'reason' => $diagnosis,
'headline' => $event['headline'] ?? null,
'source' => $event['source'] ?? null,
]);
}
}
return $verified;
}
/** @return array{0: bool, 1: string} [reachable, diagnostic_string] */
private function urlReachable(string $url): array
{
$headers = ['User-Agent' => $this->verificationUserAgent()];
$headStatus = 'no-attempt';
try {
$head = Http::timeout(5)
->withHeaders($headers)
->head($url);
$headStatus = 'HEAD='.$head->status();
if ($head->successful() || $head->redirect()) {
return [true, $headStatus];
}
} catch (Throwable $e) {
$headStatus = 'HEAD=exception('.class_basename($e).')';
}
try {
$get = Http::timeout(8)
->withHeaders($headers + ['Range' => 'bytes=0-0'])
->get($url);
$getStatus = 'GET='.$get->status();
if ($get->successful() || $get->redirect()) {
return [true, $headStatus.' → '.$getStatus.' (fallback)'];
}
return [false, $headStatus.' → '.$getStatus];
} catch (Throwable $e) {
return [false, $headStatus.' → GET=exception('.class_basename($e).')'];
}
}
private function ridgeDirection(string $publicDirection): string
{
return match ($publicDirection) {
'up' => 'rising',
'down' => 'falling',
default => 'flat',
};
}
private function upcomingMondayDateString(): string
{
$today = now()->startOfDay();
$monday = $today->isMonday() ? $today : $today->copy()->next(CarbonInterface::MONDAY);
return $monday->toDateString();
}
/** @return array<string, string> */
private function headers(): array
{
return [
'x-api-key' => $this->apiKey(),
'anthropic-version' => '2023-06-01',
];
}
private function apiKey(): ?string
{
return config('services.anthropic.api_key');
}
private function model(): string
{
return config('services.anthropic.model', 'claude-haiku-4-5-20251001');
}
private function searchSystem(): string
{
return <<<'PROMPT'
You are researching news that may affect this week's UK pump-price forecast.
Search recent news (last 48 hours) for:
- OPEC+ production decisions or unexpected announcements
- Geopolitical events affecting oil supply (sanctions, conflict, shipping disruption)
- Major refinery outages or pipeline incidents
- US/EU inventory reports that materially moved Brent
Return only the search results — you will be asked to summarise separately.
PROMPT;
}
private function searchUserMessage(array $context): string
{
$json = json_encode($context, JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES);
return "Use web_search to find oil/fuel news from the last 48 hours that could move UK pump prices this week.\n\nContext for this week:\n\n".$json;
}
private function submitSystem(): string
{
$cap = self::CONFIDENCE_CAP;
return <<<PROMPT
You are providing a news-aware directional overlay for a UK weekly pump-price forecast.
Decide whether to AGREE or DISAGREE with the ridge model based on the news headlines
provided in the user message. Cap confidence at $cap.
Include events_cited (with impact tags) for any specific headline that drove your
reasoning; you may leave events_cited empty if the news is unremarkable.
PROMPT;
}
/**
* @param array<int, array{url: string, title: string}> $harvested
*/
private function submitUserMessage(array $context, array $harvested): string
{
$contextJson = json_encode($context, JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES);
if ($harvested === []) {
$headlines = '(none — no relevant news found)';
} else {
$headlines = collect($harvested)
->map(fn (array $r): string => '- '.$r['title'].' — '.$r['url'])
->implode("\n");
}
return "Context for this week:\n\n".$contextJson."\n\nNews headlines found:\n".$headlines."\n\nNow call submit_overlay with your decision.";
}
/** @return array<string, mixed> */
private function submitOverlayTool(): array
{
return [
'name' => 'submit_overlay',
'description' => 'Submit the news-aware overlay for the upcoming weekly forecast.',
'input_schema' => [
'type' => 'object',
'properties' => [
'direction' => ['type' => 'string', 'enum' => ['rising', 'falling', 'flat']],
'confidence' => ['type' => 'integer', 'minimum' => 0, 'maximum' => self::CONFIDENCE_CAP],
'reasoning_short' => ['type' => 'string', 'description' => '12 sentences.'],
'events_cited' => [
'type' => 'array',
'description' => 'Optional. Events that drove your reasoning, with directional impact. Citations are otherwise harvested from web_search_tool_result.',
'items' => [
'type' => 'object',
'properties' => [
'headline' => ['type' => 'string'],
'source' => ['type' => 'string'],
'url' => ['type' => 'string'],
'impact' => ['type' => 'string', 'enum' => ['rising', 'falling', 'neutral']],
],
'required' => ['headline', 'source', 'url', 'impact'],
],
],
'agrees_with_ridge' => ['type' => 'boolean'],
'major_impact_event' => ['type' => 'boolean'],
],
'required' => ['direction', 'confidence', 'reasoning_short', 'agrees_with_ridge', 'major_impact_event'],
],
];
}
/**
* @param array<int, mixed> $content
* @return array<string, mixed>|null
*/
private function extractToolInput(array $content): ?array
{
$block = collect($content)->firstWhere('type', 'tool_use');
return $block['input'] ?? null;
}
}