fix: ImportPostcodes streams into staging, swaps on success
Audit item #13. The command truncated postcodes + outcodes immediately and then chunk-inserted from the CSV. A mid-stream failure (network drive disconnect, malformed row) left the production tables empty until a successful re-run. Now streams into a postcodes_staging table created via Schema::create (works on both MySQL and SQLite) and only swaps into the live tables once the full CSV has been consumed. A try/finally ensures the file handle and staging table are cleaned up on any path. Live data is now preserved on partial failures. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -5,7 +5,10 @@ namespace App\Console\Commands;
|
|||||||
use Illuminate\Console\Attributes\Description;
|
use Illuminate\Console\Attributes\Description;
|
||||||
use Illuminate\Console\Attributes\Signature;
|
use Illuminate\Console\Attributes\Signature;
|
||||||
use Illuminate\Console\Command;
|
use Illuminate\Console\Command;
|
||||||
|
use Illuminate\Database\Schema\Blueprint;
|
||||||
use Illuminate\Support\Facades\DB;
|
use Illuminate\Support\Facades\DB;
|
||||||
|
use Illuminate\Support\Facades\Schema;
|
||||||
|
use Throwable;
|
||||||
|
|
||||||
#[Signature('postcodes:import {--file= : Path to ONSPD CSV file}')]
|
#[Signature('postcodes:import {--file= : Path to ONSPD CSV file}')]
|
||||||
#[Description('Import UK postcodes (ONSPD) into the local postcodes and outcodes tables')]
|
#[Description('Import UK postcodes (ONSPD) into the local postcodes and outcodes tables')]
|
||||||
@@ -79,56 +82,78 @@ final class ImportPostcodes extends Command
|
|||||||
|
|
||||||
$hasDoterm = isset($columns['doterm']);
|
$hasDoterm = isset($columns['doterm']);
|
||||||
|
|
||||||
DB::table('postcodes')->truncate();
|
// Stream into a staging table first. Only swap into the live
|
||||||
DB::table('outcodes')->truncate();
|
// postcodes / outcodes tables once the full CSV has been consumed —
|
||||||
|
// a mid-stream failure leaves production data untouched.
|
||||||
|
Schema::dropIfExists('postcodes_staging');
|
||||||
|
Schema::create('postcodes_staging', function (Blueprint $table): void {
|
||||||
|
$table->string('postcode', 7);
|
||||||
|
$table->string('outcode', 4);
|
||||||
|
$table->decimal('lat', 10, 7);
|
||||||
|
$table->decimal('lng', 10, 7);
|
||||||
|
});
|
||||||
|
|
||||||
$buffer = [];
|
$buffer = [];
|
||||||
$imported = 0;
|
$imported = 0;
|
||||||
|
|
||||||
while (($row = fgetcsv($handle)) !== false) {
|
try {
|
||||||
if ($hasDoterm && trim((string) ($row[$columns['doterm']] ?? '')) !== '') {
|
while (($row = fgetcsv($handle)) !== false) {
|
||||||
continue;
|
if ($hasDoterm && trim((string) ($row[$columns['doterm']] ?? '')) !== '') {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
$lat = trim((string) ($row[$columns['lat']] ?? ''));
|
||||||
|
$lng = trim((string) ($row[$columns['long']] ?? ''));
|
||||||
|
|
||||||
|
if ($lat === '' || $lng === '') {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
$pcd = strtoupper(preg_replace('/\s+/', '', (string) $row[$columns[$pcdColumn]]));
|
||||||
|
|
||||||
|
if ($pcd === '' || strlen($pcd) < 5) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
$buffer[] = [
|
||||||
|
'postcode' => $pcd,
|
||||||
|
'outcode' => substr($pcd, 0, strlen($pcd) - 3),
|
||||||
|
'lat' => (float) $lat,
|
||||||
|
'lng' => (float) $lng,
|
||||||
|
];
|
||||||
|
|
||||||
|
if (count($buffer) >= self::CHUNK_SIZE) {
|
||||||
|
DB::table('postcodes_staging')->insert($buffer);
|
||||||
|
$imported += count($buffer);
|
||||||
|
$buffer = [];
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
$lat = trim((string) ($row[$columns['lat']] ?? ''));
|
if ($buffer !== []) {
|
||||||
$lng = trim((string) ($row[$columns['long']] ?? ''));
|
DB::table('postcodes_staging')->insert($buffer);
|
||||||
|
|
||||||
if ($lat === '' || $lng === '') {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
$pcd = strtoupper(preg_replace('/\s+/', '', (string) $row[$columns[$pcdColumn]]));
|
|
||||||
|
|
||||||
if ($pcd === '' || strlen($pcd) < 5) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
$buffer[] = [
|
|
||||||
'postcode' => $pcd,
|
|
||||||
'outcode' => substr($pcd, 0, strlen($pcd) - 3),
|
|
||||||
'lat' => (float) $lat,
|
|
||||||
'lng' => (float) $lng,
|
|
||||||
];
|
|
||||||
|
|
||||||
if (count($buffer) >= self::CHUNK_SIZE) {
|
|
||||||
DB::table('postcodes')->insert($buffer);
|
|
||||||
$imported += count($buffer);
|
$imported += count($buffer);
|
||||||
$buffer = [];
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Swap: empty live tables, copy from staging, derive outcodes.
|
||||||
|
DB::table('outcodes')->truncate();
|
||||||
|
DB::table('postcodes')->truncate();
|
||||||
|
DB::statement(
|
||||||
|
'INSERT INTO postcodes (postcode, outcode, lat, lng)
|
||||||
|
SELECT postcode, outcode, lat, lng FROM postcodes_staging'
|
||||||
|
);
|
||||||
|
DB::statement(
|
||||||
|
'INSERT INTO outcodes (outcode, lat, lng)
|
||||||
|
SELECT outcode, AVG(lat), AVG(lng) FROM postcodes GROUP BY outcode'
|
||||||
|
);
|
||||||
|
} catch (Throwable $e) {
|
||||||
|
$this->error('Import failed — live tables left untouched: '.$e->getMessage());
|
||||||
|
|
||||||
|
return self::FAILURE;
|
||||||
|
} finally {
|
||||||
|
fclose($handle);
|
||||||
|
Schema::dropIfExists('postcodes_staging');
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($buffer !== []) {
|
|
||||||
DB::table('postcodes')->insert($buffer);
|
|
||||||
$imported += count($buffer);
|
|
||||||
}
|
|
||||||
|
|
||||||
fclose($handle);
|
|
||||||
|
|
||||||
DB::statement(
|
|
||||||
'INSERT INTO outcodes (outcode, lat, lng)
|
|
||||||
SELECT outcode, AVG(lat), AVG(lng) FROM postcodes GROUP BY outcode'
|
|
||||||
);
|
|
||||||
|
|
||||||
$this->info("Imported {$imported} postcodes.");
|
$this->info("Imported {$imported} postcodes.");
|
||||||
$this->info('Derived '.DB::table('outcodes')->count().' outcode centroids.');
|
$this->info('Derived '.DB::table('outcodes')->count().' outcode centroids.');
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user