Coverage for art_studio_tz/db.py: 98%
62 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-28 09:38 +0200
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-28 09:38 +0200
1"""
2DB for the quotes project
3"""
4import csv
5from pathlib import Path
6from typing import Any
8class DB:
9 """CSV file based DB class
10 """
11 def __init__(self, db_path: Path, db_file_prefix: str, fieldnames: list[str]):
12 """initialize DB
14 Args:
15 db_path (Path): path to the db directory
16 db_file_prefix (str): prefix for the db file name
17 fieldnames (list[str]): list of field names (without 'id' field)
18 """
19 self.file = db_path / f"{db_file_prefix}.csv"
20 self.fieldnames = ['id', *fieldnames] # первая колонка — id
21 if not self.file.exists():
22 self.file.parent.mkdir(parents=True, exist_ok=True)
23 with self.file.open('w', newline='', encoding='utf-8') as f:
24 writer = csv.DictWriter(f, fieldnames=self.fieldnames)
25 writer.writeheader()
27 def _read_all_rows(self) -> list[dict[str, Any]]:
28 """read all rows from the CSV file
30 Returns:
31 list[dict[str, Any]]: list of dictionaries with row data
32 """
33 rows = []
34 with self.file.open('r', newline='', encoding='utf-8') as f:
35 reader = csv.DictReader(f)
36 rows = list(reader)
37 return rows
39 def create(self, item: dict[str, Any]) -> int:
40 """create a new quote record
42 Args:
43 item (dict[str, Any]): dictionary with quote data
45 Returns:
46 int: new record id
47 """
48 rows = self._read_all_rows()
49 if rows == []:
50 new_id = 1
51 else:
52 new_id = max([int(r['id']) for r in rows], default=0) + 1
53 row = {**item, 'id': new_id}
54 with self.file.open('a', newline='', encoding='utf-8') as f:
55 writer = csv.DictWriter(f, fieldnames=self.fieldnames)
56 writer.writerow(row)
57 return new_id
59 def read(self, id: int) -> dict[str, Any] | None:
60 """read a quote record by id
62 Args:
63 id (int): record id
65 Returns:
66 dict[str, Any] | None: dictionary with record data or None if not found
67 """
68 rows = self._read_all_rows()
69 for r in rows:
70 if int(r['id']) == id:
71 return r
72 return None
74 def read_all(self) -> list[dict[str, Any]]:
75 """Read all quote records
77 Returns:
78 list[dict[str, Any]]: list of dictionary with quote data
79 """
80 return self._read_all_rows()
82 def update(self, id: int, mods: dict[str, Any]) -> None:
83 """Update a quote record by id
85 Args:
86 id (int): id of the record to update
87 mods (dict[str, Any]): dictionary with fields to update
88 """
89 rows = self._read_all_rows()
90 changed = False
91 for r in rows:
92 if int(r['id']) == id:
93 for k, v in mods.items():
94 if v is not None and k in r:
95 r[k] = v
96 changed = True
97 if changed:
98 with self.file.open('w', newline='', encoding='utf-8') as f:
99 writer = csv.DictWriter(f, fieldnames=self.fieldnames)
100 writer.writeheader()
101 writer.writerows(rows)
103 def delete(self, id: int) -> None:
104 """Delete a quote record by id
106 Args:
107 id (int): id of the record to delete
108 """
109 rows = [r for r in self._read_all_rows() if int(r['id']) != id]
110 with self.file.open('w', newline='', encoding='utf-8') as f:
111 writer = csv.DictWriter(f, fieldnames=self.fieldnames)
112 writer.writeheader()
113 writer.writerows(rows)
115 def delete_all(self) -> None:
116 """Delete all quote records
117 """
118 with self.file.open('w', newline='', encoding='utf-8') as f:
119 writer = csv.DictWriter(f, fieldnames=self.fieldnames)
120 writer.writeheader()
122 def count(self) -> int:
123 """Get number of records in the DB
125 Returns:
126 int: number of records
127 """
128 return len(self._read_all_rows())