apiKey() === null) { Log::info('LlmOverlayService: no ANTHROPIC_API_KEY, skipping'); return null; } if ($eventDriven && $this->onCooldown()) { return null; } $forecast = $this->weeklyForecast->currentForecast(); $context = $this->buildContext($forecast); $callResult = $this->callAnthropic($context); if ($callResult === null) { return null; } $rawResult = $callResult['raw']; $harvested = $callResult['harvested']; $mergedEvents = $this->mergeEvents($rawResult['events_cited'] ?? [], $harvested); $verifiedEvents = $this->verifyCitedUrls($mergedEvents); if ($verifiedEvents === []) { Log::warning('LlmOverlayService: no verified citations, rejecting overlay', [ 'model_events' => $rawResult['events_cited'] ?? null, 'harvested_urls' => array_column($harvested, 'url'), 'direction' => $rawResult['direction'] ?? null, 'confidence' => $rawResult['confidence'] ?? null, 'reasoning_short' => $rawResult['reasoning_short'] ?? null, ]); return null; } $confidence = max(0, min(self::CONFIDENCE_CAP, (int) ($rawResult['confidence'] ?? 0))); $direction = $rawResult['direction'] ?? 'flat'; $agreesWithRidge = $direction === $this->ridgeDirection($forecast['predicted_direction']); return LlmOverlay::query()->create([ 'ran_at' => now(), 'forecast_for_week' => $this->upcomingMondayDateString(), 'direction' => $direction, 'confidence' => $confidence, 'reasoning' => (string) ($rawResult['reasoning_short'] ?? ''), 'events_json' => $verifiedEvents, 'agrees_with_ridge' => $agreesWithRidge, 'major_impact_event' => (bool) ($rawResult['major_impact_event'] ?? false), 'volatility_flag_on' => VolatilityRegime::currentlyActive() !== null, 'search_used' => true, ]); } private function onCooldown(): bool { $latest = LlmOverlay::query()->orderByDesc('ran_at')->first(); return $latest !== null && $latest->ran_at->greaterThanOrEqualTo(now()->subHours(self::COOLDOWN_HOURS)); } /** @return array */ private function buildContext(array $forecast): array { $ulspWeekly = DB::table('weekly_pump_prices') ->orderByDesc('date') ->limit(8) ->get(['date', 'ulsp_pence']) ->reverse() ->map(fn ($r): array => ['date' => (string) $r->date, 'ulsp_pence' => round((int) $r->ulsp_pence / 100, 1)]) ->values() ->all(); $brentRecent = BrentPrice::query() ->orderByDesc('date') ->limit(14) ->get(['date', 'price_usd']) ->reverse() ->map(fn (BrentPrice $r): array => ['date' => (string) $r->date->toDateString(), 'price_usd' => (float) $r->price_usd]) ->values() ->all(); return [ 'ulsp_recent_8_weeks' => $ulspWeekly, 'brent_recent_14_days' => $brentRecent, 'ridge_model_says' => [ 'direction' => $forecast['predicted_direction'] ?? 'stable', 'confidence' => $forecast['confidence_score'] ?? 0, 'magnitude_pence' => $forecast['predicted_change_pence'] ?? 0, ], ]; } /** * Two independent API calls: * * Phase 1 — runs the web_search tool, captures the assistant's * returned `web_search_tool_result` blocks, then * discards the transcript. * * Phase 2 — issues a brand-new conversation with the harvested * URLs/titles flattened into a plain-text user message * and forces a `submit_overlay` tool call. * * Why not one stitched conversation: Anthropic auto-caches web_search * results into ITPM (≈55k tokens for a 1-search call) and requires * `encrypted_content` intact when those blocks are sent back. * Resending the Phase 1 transcript to Phase 2 either rate-limits us * (29k+ tokens twice → exceeds the Tier-1 50k ITPM bucket) or 400s * if we strip the encrypted blob. A fresh Phase 2 sends ~3k tokens * total — small enough to fit in the recovered bucket after a * short adaptive sleep. * * @return array{raw: array, harvested: array}|null */ private function callAnthropic(array $context): ?array { try { $phase1 = $this->runWebSearch($context); if ($phase1 === null) { return null; } $this->waitForRateLimitIfNeeded($phase1['response']); $rawResult = $this->runSubmit($context, $phase1['harvested']); if ($rawResult === null) { return null; } return ['raw' => $rawResult, 'harvested' => $phase1['harvested']]; } catch (Throwable $e) { Log::error('LlmOverlayService: callAnthropic failed', ['error' => $e->getMessage()]); return null; } } /** * Phase 1: ask the model to search for news and capture the * web_search_tool_result blocks. Returns the harvested citations * and the final response (whose rate-limit headers tell us when * the ITPM bucket will be replenished for Phase 2). * * @return array{harvested: array, response: Response}|null */ private function runWebSearch(array $context): ?array { $messages = [['role' => 'user', 'content' => $this->searchUserMessage($context)]]; $response = null; for ($i = 0; $i < self::MAX_SEARCH_TURNS; $i++) { $response = $this->apiLogger->send('anthropic', 'POST', self::URL, fn () => Http::timeout(45) ->withHeaders($this->headers()) ->post(self::URL, [ 'model' => $this->model(), 'max_tokens' => 1024, 'system' => $this->searchSystem(), 'tools' => [['type' => 'web_search_20250305', 'name' => 'web_search']], 'messages' => $messages, ])); if (! $response->successful()) { Log::error('LlmOverlayService: search request failed', [ 'status' => $response->status(), 'body' => substr($response->body(), 0, 500), ]); return null; } $messages[] = ['role' => 'assistant', 'content' => $response->json('content')]; if ($response->json('stop_reason') !== 'pause_turn') { break; } } if ($response === null) { return null; } return [ 'harvested' => $this->harvestSearchResults($messages), 'response' => $response, ]; } /** * Phase 2: fresh API call — no Phase 1 transcript — with the * harvested citations as plain text and a forced submit_overlay * tool call. * * @param array $harvested * @return array|null */ private function runSubmit(array $context, array $harvested): ?array { $response = $this->apiLogger->send('anthropic', 'POST', self::URL, fn () => Http::timeout(20) ->withHeaders($this->headers()) ->post(self::URL, [ 'model' => $this->model(), 'max_tokens' => 512, 'system' => $this->submitSystem(), 'tools' => [$this->submitOverlayTool()], 'tool_choice' => ['type' => 'tool', 'name' => 'submit_overlay'], 'messages' => [['role' => 'user', 'content' => $this->submitUserMessage($context, $harvested)]], ])); if (! $response->successful()) { Log::error('LlmOverlayService: submit request failed', [ 'status' => $response->status(), 'body' => substr($response->body(), 0, 500), ]); return null; } $rawResult = $this->extractToolInput($response->json('content') ?? []); if ($rawResult === null) { Log::warning('LlmOverlayService: submit response missing tool_use block'); return null; } return $rawResult; } /** * Anthropic's web_search burns ≈55k input tokens (mostly auto-cached * search results) on Phase 1. At Tier 1's 50k ITPM the bucket can * be at zero immediately afterwards. Read the rate-limit headers * and sleep until the bucket has refilled enough for Phase 2. * Capped at 65s so the daily cron never hangs longer than a minute. */ private function waitForRateLimitIfNeeded(Response $response): void { $remaining = (int) $response->header('anthropic-ratelimit-input-tokens-remaining'); if ($response->header('anthropic-ratelimit-input-tokens-remaining') === '' || $remaining >= self::SUBMIT_TOKEN_BUDGET) { return; } $resetAt = $response->header('anthropic-ratelimit-input-tokens-reset'); $bucketSize = (int) $response->header('anthropic-ratelimit-input-tokens-limit'); if ($resetAt === '' || $bucketSize <= 0) { return; } try { $secondsUntilFullReset = max(0, CarbonImmutable::parse($resetAt)->getTimestamp() - now()->getTimestamp()); } catch (Throwable) { return; } // Anthropic's bucket refills linearly. We don't need to wait for // the full reset — only enough for SUBMIT_TOKEN_BUDGET tokens to // become available. Sleep proportionally + a small safety margin, // hard-capped at 65s. $tokensNeeded = self::SUBMIT_TOKEN_BUDGET - $remaining; $proportional = (int) ceil(($tokensNeeded / $bucketSize) * $secondsUntilFullReset); $waitSeconds = max(1, min(65, $proportional + 2)); Log::info('LlmOverlayService: waiting for ITPM bucket refill before submit', [ 'remaining' => $remaining, 'wait_seconds' => $waitSeconds, 'full_reset_in' => $secondsUntilFullReset, ]); sleep($waitSeconds); } /** * Walk every assistant turn and extract `{url, title}` from each * `web_search_tool_result` block. Anthropic's web_search returns * these blocks directly — they are the authoritative citation * source, not anything the model transcribes back to us. * * @param array> $messages * @return array */ private function harvestSearchResults(array $messages): array { $byUrl = []; foreach ($messages as $message) { if (($message['role'] ?? null) !== 'assistant') { continue; } $content = $message['content'] ?? []; if (! is_array($content)) { continue; } foreach ($content as $block) { if (! is_array($block) || ($block['type'] ?? null) !== 'web_search_tool_result') { continue; } $results = $block['content'] ?? []; if (! is_array($results)) { continue; } foreach ($results as $result) { if (! is_array($result) || ($result['type'] ?? null) !== 'web_search_result') { continue; } $url = (string) ($result['url'] ?? ''); if ($url === '' || isset($byUrl[$url])) { continue; } $byUrl[$url] = ['url' => $url, 'title' => (string) ($result['title'] ?? '')]; } } } return array_values($byUrl); } /** * Merge model-provided events_cited with citations harvested from * `web_search_tool_result`. Model entries (which include `impact` * tagging) take precedence on URL collision; harvested-only entries * default to `impact: 'neutral'`. * * @param array $modelEvents * @param array $harvested * @return array> */ private function mergeEvents(array $modelEvents, array $harvested): array { $byUrl = []; foreach ($modelEvents as $event) { if (! is_array($event)) { continue; } $url = (string) ($event['url'] ?? ''); if ($url === '') { continue; } $byUrl[$url] = [ 'headline' => (string) ($event['headline'] ?? ''), 'source' => (string) ($event['source'] ?? ''), 'url' => $url, 'impact' => in_array($event['impact'] ?? null, ['rising', 'falling', 'neutral'], true) ? $event['impact'] : 'neutral', ]; } foreach ($harvested as $result) { $url = $result['url']; if (isset($byUrl[$url])) { continue; } $byUrl[$url] = [ 'headline' => $result['title'], 'source' => $this->domainOf($url), 'url' => $url, 'impact' => 'neutral', ]; } return array_values($byUrl); } private function domainOf(string $url): string { $host = parse_url($url, PHP_URL_HOST); return is_string($host) ? preg_replace('/^www\./', '', $host) : ''; } private function verificationUserAgent(): string { $appUrl = rtrim((string) config('app.url'), '/'); return "Mozilla/5.0 (compatible; FuelPriceBot/1.0; +{$appUrl}/bot)"; } /** * Verify each cited URL is reachable. Major news sites (Reuters, FT, * Bloomberg, BBC...) often reject HEAD with 403 / 405 even though * GET works fine. So: try HEAD first, then fall back to a 1-byte * GET (Range header) when HEAD fails. Both must include a * browser-shaped User-Agent or Cloudflare etc. block us as a bot. * * Every URL — verified or rejected — is logged at INFO/WARNING so * operators can debug rejections from `storage/logs/laravel.log` * without needing to capture the Anthropic response body. * * @param array> $events * @return array> */ private function verifyCitedUrls(array $events): array { $verified = []; foreach ($events as $event) { $url = (string) ($event['url'] ?? ''); if ($url === '') { Log::warning('LlmOverlayService: dropping cited event with empty URL', [ 'headline' => $event['headline'] ?? null, 'source' => $event['source'] ?? null, ]); continue; } [$reachable, $diagnosis] = $this->urlReachable($url); if ($reachable) { Log::info('LlmOverlayService: URL verified', [ 'url' => $url, 'via' => $diagnosis, ]); $verified[] = $event; } else { Log::warning('LlmOverlayService: URL rejected', [ 'url' => $url, 'reason' => $diagnosis, 'headline' => $event['headline'] ?? null, 'source' => $event['source'] ?? null, ]); } } return $verified; } /** @return array{0: bool, 1: string} [reachable, diagnostic_string] */ private function urlReachable(string $url): array { $headers = ['User-Agent' => $this->verificationUserAgent()]; $headStatus = 'no-attempt'; try { $head = Http::timeout(5) ->withHeaders($headers) ->head($url); $headStatus = 'HEAD='.$head->status(); if ($head->successful() || $head->redirect()) { return [true, $headStatus]; } } catch (Throwable $e) { $headStatus = 'HEAD=exception('.class_basename($e).')'; } try { $get = Http::timeout(8) ->withHeaders($headers + ['Range' => 'bytes=0-0']) ->get($url); $getStatus = 'GET='.$get->status(); if ($get->successful() || $get->redirect()) { return [true, $headStatus.' → '.$getStatus.' (fallback)']; } return [false, $headStatus.' → '.$getStatus]; } catch (Throwable $e) { return [false, $headStatus.' → GET=exception('.class_basename($e).')']; } } private function ridgeDirection(string $publicDirection): string { return match ($publicDirection) { 'up' => 'rising', 'down' => 'falling', default => 'flat', }; } private function upcomingMondayDateString(): string { $today = now()->startOfDay(); $monday = $today->isMonday() ? $today : $today->copy()->next(CarbonInterface::MONDAY); return $monday->toDateString(); } /** @return array */ private function headers(): array { return [ 'x-api-key' => $this->apiKey(), 'anthropic-version' => '2023-06-01', ]; } private function apiKey(): ?string { return config('services.anthropic.api_key'); } private function model(): string { return config('services.anthropic.model', 'claude-haiku-4-5-20251001'); } private function searchSystem(): string { return <<<'PROMPT' You are researching news that may affect this week's UK pump-price forecast. Search recent news (last 48 hours) for: - OPEC+ production decisions or unexpected announcements - Geopolitical events affecting oil supply (sanctions, conflict, shipping disruption) - Major refinery outages or pipeline incidents - US/EU inventory reports that materially moved Brent Return only the search results — you will be asked to summarise separately. PROMPT; } private function searchUserMessage(array $context): string { $json = json_encode($context, JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES); return "Use web_search to find oil/fuel news from the last 48 hours that could move UK pump prices this week.\n\nContext for this week:\n\n".$json; } private function submitSystem(): string { $cap = self::CONFIDENCE_CAP; return << $harvested */ private function submitUserMessage(array $context, array $harvested): string { $contextJson = json_encode($context, JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES); if ($harvested === []) { $headlines = '(none — no relevant news found)'; } else { $headlines = collect($harvested) ->map(fn (array $r): string => '- '.$r['title'].' — '.$r['url']) ->implode("\n"); } return "Context for this week:\n\n".$contextJson."\n\nNews headlines found:\n".$headlines."\n\nNow call submit_overlay with your decision."; } /** @return array */ private function submitOverlayTool(): array { return [ 'name' => 'submit_overlay', 'description' => 'Submit the news-aware overlay for the upcoming weekly forecast.', 'input_schema' => [ 'type' => 'object', 'properties' => [ 'direction' => ['type' => 'string', 'enum' => ['rising', 'falling', 'flat']], 'confidence' => ['type' => 'integer', 'minimum' => 0, 'maximum' => self::CONFIDENCE_CAP], 'reasoning_short' => ['type' => 'string', 'description' => '1–2 sentences.'], 'events_cited' => [ 'type' => 'array', 'description' => 'Optional. Events that drove your reasoning, with directional impact. Citations are otherwise harvested from web_search_tool_result.', 'items' => [ 'type' => 'object', 'properties' => [ 'headline' => ['type' => 'string'], 'source' => ['type' => 'string'], 'url' => ['type' => 'string'], 'impact' => ['type' => 'string', 'enum' => ['rising', 'falling', 'neutral']], ], 'required' => ['headline', 'source', 'url', 'impact'], ], ], 'agrees_with_ridge' => ['type' => 'boolean'], 'major_impact_event' => ['type' => 'boolean'], ], 'required' => ['direction', 'confidence', 'reasoning_short', 'agrees_with_ridge', 'major_impact_event'], ], ]; } /** * @param array $content * @return array|null */ private function extractToolInput(array $content): ?array { $block = collect($content)->firstWhere('type', 'tool_use'); return $block['input'] ?? null; } }