Added file locking to avoid db contention

This commit is contained in:
tamservo
2025-05-03 22:07:31 -04:00
parent 4b636eed7a
commit 839ee6306c
2 changed files with 51 additions and 44 deletions

View File

@@ -7,6 +7,8 @@ import time
import sqlite3
import sys
from contextlib import closing
from filelock import Timeout, FileLock
from fastf1.signalr_aio import Connection
import fastf1
@@ -85,7 +87,8 @@ class SignalRClient:
"SessionData", "LapCount"]
self.debug = debug
self._db_is_open = False
self.messages_db = 'messages.db'
self.messages_lock = FileLock('messages.lock')
self.filename = filename
self.filemode = filemode
self.timeout = timeout
@@ -97,6 +100,7 @@ class SignalRClient:
stream=sys.stderr
)
self.logger = logging.getLogger('SignalR')
self.logger.warning("Created logger for SignalR")
else:
self.logger = logger
@@ -104,23 +108,15 @@ class SignalRClient:
self._t_last_message = None
def _to_file(self, msg):
"""
self._output_file.write(msg + '\n')
self._output_file.flush()
"""
#print(msg)
con = sqlite3.connect('messages.db')
try:
with con:
self._db_is_open = True
con.execute("insert into messages (message) values(?)", (msg,))
con.close()
self.db_is_open = False
with self.messages_lock:
with closing(sqlite3.connect(self.messages_db)) as con:
with closing(con.cursor()) as cur:
#self.logger.warning("about to log: " + msg)
cur.execute("insert into messages (message) values(?)", (msg,))
con.commit()
except:
if con is not None:
con.close()
self._db_is_open = False
print(f'Error writing message to db.')
async def _on_do_nothing(self, msg):
# just do nothing with the message; intended for debug mode where some
@@ -196,8 +192,6 @@ class SignalRClient:
await asyncio.gather(asyncio.ensure_future(self._supervise()),
asyncio.ensure_future(self._run()))
#self._output_file.close()
while(self._db_is_open):
asyncio.sleep(1)
self.logger.warning("Exiting...")
def start(self):

View File

@@ -17,9 +17,10 @@ from subprocess import Popen
import time
import discord
from contextlib import closing
from filelock import Timeout, FileLock
from discord.ext import commands, tasks
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
class Robottas(commands.Bot):
@@ -29,15 +30,16 @@ class Robottas(commands.Bot):
# the Livetiming client.
@staticmethod
def convert_message(raw):
def convert_message(raw, logger):
data = raw.replace("'", '"') \
.replace('True', 'true') \
.replace('False', 'false')
try:
data = json.loads(data)
logger.warn(f"convert_message returning: {data}")
return data
except json.JSONDecodeError:
logger.warn("json decode error for")
return ""
# End section adapted from FastF1
@@ -380,6 +382,7 @@ class Robottas(commands.Bot):
await self.print_driver_range(ctx, 10, 20)
def load_initial(self, message):
#self.logger.warning("in load_initial")
# Load podium data
if 'R' in message.keys():
if 'TopThree' in message['R'].keys():
@@ -392,6 +395,7 @@ class Robottas(commands.Bot):
# Load driver list
if 'DriverList' in message['R'].keys():
#self.logger.warning("loading DriverList")
for driver_num in message['R']['DriverList'].keys():
if driver_num == '_kf':
continue
@@ -429,7 +433,7 @@ class Robottas(commands.Bot):
async def process_message(self, message):
try:
if isinstance(message, collections.abc.Sequence):
logging.debug(message)
#self.logger.warning(f"in process_message {message}")
if message[0] == 'Heartbeat':
return
@@ -466,34 +470,34 @@ class Robottas(commands.Bot):
def get_messages_from_db(self):
try:
messages = []
con = sqlite3.connect(self.dbfile)
cur = con.cursor()
cur2 = con.cursor()
with self.messages_lock:
with closing(sqlite3.connect(self.dbfile)) as con:
with closing(con.cursor()) as cur:
with closing(con.cursor()) as cur2:
for row in cur.execute('select id, message from messages order by id asc'):
messages.append(self.convert_message(row[1]))
messages.append(self.convert_message(row[1], self.logger))
self.logger.warn(f"get_messages_from_db: {row[1]}")
# Now that we have the message, delete this row from the dbfile
cur2.execute(f"delete from messages where id = {row[0]}")
con.commit()
cur.close()
cur2.close()
con.close()
return messages
except:
self.logger.warning(f"Error retrieving messages.")
return []
def clear_messages_from_db(self):
try:
con = sqlite3.connect(self.dbfile)
cur = con.cursor()
cur.execute('delete from messges')
with self.messages_lock:
with closing(sqlite3.connect(self.dbfile)) as con:
with closing(con.cursor()) as cur:
cur.execute('delete from messages')
con.commit()
cur.close()
con.close()
except:
pass
except Exception as e:
self.logger.warning(f"error in clear_messages_from_db: {e}")
async def _race_report(self, ctx):
self.clear_messages_from_db()
@@ -522,14 +526,16 @@ class Robottas(commands.Bot):
while self.is_reporting:
# Do processing
#self.logger.warning("in is_reporting")
# process any new messages in the db
messages = self.get_messages_from_db()
try:
for message in messages:
#self.logger.warning(f"processing message: {message}")
await self.process_message(message)
logging.debug(message)
await asyncio.sleep(2)
await asyncio.sleep(3)
except:
pass
@@ -632,7 +638,7 @@ class Robottas(commands.Bot):
self.collector_proc.kill()
self.clear_messages_from_db()
except:
pass
self.logger.warn("error in stop_collect.")
def decode_watched(self, w):
if w == 0:
@@ -858,6 +864,13 @@ class Robottas(commands.Bot):
# Set debug or not
self.debug = True
self.messages_lock = FileLock('messages.lock')
#configure logging
logging.basicConfig(stream=sys.stderr, \
format="%(asctime)s - %(levelname)s: %(message)s")
self.logger = logging.getLogger('robottas')
self.bingo = Bingo()
# Discord authentication token