from__future__importannotationsimportasyncioimportjsonimportloggingimporttypingastfrompathlibimportPathimportaiofileimporttyping_extensionsasteift.TYPE_CHECKING:importcollections.abcasclogger=logging.getLogger(__name__)SERIALIZABLE_TYPE:te.TypeAlias="str | int | JSON_TYPE | None""""Types that are JSON serializable."""JSON_TYPE:te.TypeAlias=("c.Mapping[str, SERIALIZABLE_TYPE] | c.Sequence[SERIALIZABLE_TYPE]")
[docs]@t.finalclassStorage:"""Core key-value store, that is responsible for correctly storing JSON and writing it to file."""# WARNING: You should call `del` on all instances by yourself,# since Python does not guarantee that `__del__` will be ever calledinstances:t.ClassVar[list[te.Self]]=[]def__init__(self,path:Path|str,*,indent:int|None)->None:self.instances.append(self)self._data:dict[str,SERIALIZABLE_TYPE]={}self._path=Path(path)self._tempfile=Path(str(self._path)+".temp")# Append Only File, see https://redis.io/docs/latest/operate/oss_and_stack/management/persistence/self._aof_path=Path(str(self._path)+".log.temp")self._indent=indent# ensure that the db file existsself._path.touch(exist_ok=True)self._write_loop_task:asyncio.Task[te.Never]=None# pyright: ignore[reportAttributeAccessIssue]
[docs]@classmethodasyncdefinit(cls,path:Path|str,*,write_interval:int|t.Literal[False]=5*60,indent:int|None=2,)->te.Self:"""Python doesn't have async init methods, so we have to use this. Arguments: path: Path to database file. Do note that you should allocate entire folder to the database, because this library creates a few temp files near the db for technical reasons. write_interval: How often we should write to database in seconds? Set to ``False`` to disable automatic writing at all. indent: If it is not ``None``, data in database file will be pretty printed with that indent level. Value is directly passed to :py:func:`json.dump`. """instance=cls(path,indent=indent)awaitinstance.read()ifwrite_interval:instance._write_loop_task=asyncio.create_task(instance._write_loop(write_interval))returninstance
[docs]asyncdefread(self)->None:"""Read content from the db file. This method is usually called only on initialization, but if you added some data to the file manually, you can call this method to sync in-memory state with what is written on disk. """path=self._pathifself._tempfile.exists():logger.warning("Found tempfile with database, using it. Database file may be damaged")path=self._tempfileifpath.exists():asyncwithaiofile.async_open(path,"r")asf:content=t.cast(str,awaitf.read())ifcontent!="":self._data=json.loads(content)else:self._data={}else:self._data={}ifself._aof_path.exists():asyncwithaiofile.AIOFile(self._aof_path,"r")asaof:asyncforlineinaiofile.LineReader(aof):ifline=="\n":# first line is always empty, because it is easier# this waycontinueforkey,valueint.cast(dict[str,SERIALIZABLE_TYPE],json.loads(line)).items():awaitself.set(key,value,_replay=True)
[docs]asyncdefwrite(self)->None:"""Save changes on disk. You can also manually call this method whenever you want. """ifself._path.exists():_=self._path.rename(self._tempfile)asyncwithaiofile.async_open(self._path,"w")asf:_=awaitf.write(json.dumps(self._data,indent=self._indent,ensure_ascii=False))ifself._aof_path.exists():self._aof_path.unlink()ifself._tempfile.exists():self._tempfile.unlink()
[docs]asyncdef_write_loop(self,interval:int)->te.Never:"""Call :func:`.write` every N seconds. Arguments: interval: How long we wait between every write. """whileTrue:try:awaitself.write()exceptExceptionasexception:# noqa: PERF203# stop working after if the task got canceledifisinstance(exception,asyncio.CancelledError):awaitself.write()# but ensure we won't lose dataraiseexception# noqa: TRY201logger.exception("Error during write!",exc_info=exception)else:awaitasyncio.sleep(interval)
[docs]asyncdef_append_command(self,key:str,value:SERIALIZABLE_TYPE)->None:"""Handle AOF logic on every :func:`set`."""asyncwithaiofile.async_open(self._aof_path,"a")asf:_=awaitf.write("\n"+json.dumps({key:value}))
[docs]asyncdefset(self,key:str,value:SERIALIZABLE_TYPE,*,_replay:bool=False)->None:"""Set a key to value. Arguments: _replay: If ``True``, we won't do AOF stuff. This is used when we replay operations from AOF. """task=Noneifnot_replay:task=asyncio.create_task(self._append_command(key,value))ifvalueisNone:delself._data[key]else:self._data[key]=valueiftaskisnotNone:_=awaitasyncio.gather(task)