supercell-wx/scwx-qt/tools/generate_counties_db.py
2024-08-29 13:41:25 -04:00

183 lines
6.7 KiB
Python

import argparse
import geopandas as gpd
import pathlib
import sqlite3
class DatabaseInfo:
def __init__(self):
self.sqlConnection_ = None
self.sqlCursor_ = None
def ParseArguments():
parser = argparse.ArgumentParser(description='Generate counties SQLite database')
parser.add_argument("-c", "--county_dbf",
metavar = "filename",
help = "input county database",
dest = "inputCountyDbs_",
action = "extend",
nargs = "+",
default = [],
type = pathlib.Path)
parser.add_argument("-z", "--zone_dbf",
metavar = "filename",
help = "input zone database",
dest = "inputZoneDbs_",
action = "extend",
nargs = "+",
default = [],
type = pathlib.Path)
parser.add_argument("-s", "--state_dbf",
metavar = "filename",
help = "input state database",
dest = "inputStateDbs_",
action = "extend",
nargs = "+",
default = [],
type = pathlib.Path)
parser.add_argument("-w", "--wfo_dbf",
metavar = "filename",
help = "input wfo database",
dest = "inputWfoDbs_",
action = "extend",
nargs = "+",
default = [],
type = pathlib.Path)
parser.add_argument("-o", "--output_db",
metavar = "filename",
help = "output sqlite database",
dest = "outputDb_",
type = pathlib.Path,
required = True)
return parser.parse_args()
def Prepare(dbInfo, outputDb):
# Truncate existing database
file = open(outputDb, 'w')
file.close()
# Establish SQLite database connection
dbInfo.sqlConnection_ = sqlite3.connect(outputDb)
# Set row factory for name-based access to columns
dbInfo.sqlConnection_.row_factory = sqlite3.Row
dbInfo.sqlCursor_ = dbInfo.sqlConnection_.cursor()
# Create database tables
dbInfo.sqlCursor_.execute("""CREATE TABLE counties(
id TEXT NOT NULL PRIMARY KEY,
name TEXT)""")
dbInfo.sqlCursor_.execute("""CREATE TABLE states(
state TEXT NOT NULL PRIMARY KEY,
name TEXT NOT NULL)""")
dbInfo.sqlCursor_.execute("""CREATE TABLE wfos(
id TEXT NOT NULL PRIMARY KEY,
city TEXT NOT NULL,
state TEXT NOT NULL,
city_state TEXT NOT NULL)""")
def ProcessCountiesDbf(dbInfo, dbfFilename):
# County area type
areaType = 'C'
# Read dataframe
dbfTable = gpd.read_file(filename = dbfFilename,
columns = ["STATE", "FIPS", "COUNTYNAME"],
ignore_geometry = True)
dbfTable.drop_duplicates(inplace=True)
for row in dbfTable.itertuples():
# Generate a FIPS ID compatible with UGC format (NWSI 10-1702)
fipsId = "{}{}{:03}".format(row.STATE, areaType, (int(row.FIPS) % 1000))
# Insert FIPS ID and name pair into database
try:
dbInfo.sqlCursor_.execute("INSERT INTO counties VALUES (?, ?)", (fipsId, row.COUNTYNAME))
except:
print("Skipping duplicate county:", fipsId, row.COUNTYNAME)
def ProcessStateDbf(dbInfo, dbfFilename):
print("Processing states and territories file:", dbfFilename)
# Read dataframe
dbfTable = gpd.read_file(filename = dbfFilename,
columns = ["STATE", "NAME"],
ignore_geometry = True)
dbfTable.drop_duplicates(inplace=True)
for row in dbfTable.itertuples():
# Insert data into database
try:
dbInfo.sqlCursor_.execute("INSERT INTO states VALUES (?, ?)", (row.STATE, row.NAME))
except:
print("Error inserting row:", row.STATE, row.NAME)
def ProcessZoneDbf(dbInfo, dbfFilename):
print("Processing zone file:", dbfFilename)
# Zone area type
areaType = 'Z'
# Read dataframe
dbfTable = gpd.read_file(filename = dbfFilename,
columns = ["ID", "STATE", "ZONE", "NAME"],
ignore_geometry = True)
dbfTable.drop_duplicates(inplace=True)
for row in dbfTable.itertuples():
# Generate a FIPS ID compatible with UGC format (NWSI 10-1702)
if "ID" in dbfTable.columns:
fipsId = row.ID
else:
fipsId = "{}{}{:03}".format(row.STATE, areaType, (int(row.ZONE) % 1000))
# Insert FIPS ID and name pair into database
try:
dbInfo.sqlCursor_.execute("INSERT INTO counties VALUES (?, ?)",
(fipsId, row.NAME))
except:
# Only print warning if FIPS ID has multiple names
result = dbInfo.sqlCursor_.execute("SELECT name FROM counties WHERE id = :fipsId",
{"fipsId": fipsId})
resultRow = result.fetchone()
if resultRow is not None:
if resultRow["name"] != row.NAME:
print("Skipping duplicate zone:", fipsId, row.NAME)
def ProcessWfoDbf(dbInfo, dbfFilename):
print("Processing WFO file:", dbfFilename)
# Read dataframe
dbfTable = gpd.read_file(filename = dbfFilename,
columns = ["FULLSTAID", "CITY", "STATE", "CITYSTATE"],
ignore_geometry = True)
dbfTable.drop_duplicates(inplace = True)
for row in dbfTable.itertuples():
try:
dbInfo.sqlCursor_.execute("INSERT INTO wfos VALUES (?, ?, ?, ?)",
(row.FULLSTAID, row.CITY, row.STATE, row.CITYSTATE))
except:
print("Error inserting WFO:", row.FULLSTAID, row.CITYSTATE)
def PostProcess(dbInfo):
# Commit changes and close database
dbInfo.sqlConnection_.commit()
dbInfo.sqlConnection_.close()
dbInfo = DatabaseInfo()
args = ParseArguments()
Prepare(dbInfo, args.outputDb_)
for countyDb in args.inputCountyDbs_:
ProcessCountiesDbf(dbInfo, countyDb)
for zoneDb in args.inputZoneDbs_:
ProcessZoneDbf(dbInfo, zoneDb)
for stateDb in args.inputStateDbs_:
ProcessStateDbf(dbInfo, stateDb)
for wfoDb in args.inputWfoDbs_:
ProcessWfoDbf(dbInfo, wfoDb)
PostProcess(dbInfo)