Datenbankprogrammierung in Python

SQL in Python ausführen

In Python können wir mit dem bereits vorinstallierten Paket sqlite3 auf SQLite-Datenbanken zugreifen. Wir können zu Beginn eines Programms

import sqlite3

schreiben, um das Paket zu verwenden.

Die folgenden Anweisungen öffnen die Vorlesungs-Datenbank und geben testweise alle Einträge der Tabelle Vorlesungen aus.

>>> import sqlite3
>>> db = sqlite3.connect('Vorlesungen.db')
>>> cur = db.execute('select * from Vorlesungen;')
>>> cur.fetchall()
[(1, 'Informatik für Nebenfach'), (2, 'IQSH Weiterbildung Informatik')]

Die execute-Methode der Datenbank führt die angegebene SQL-Anfrage aus und gibt ein sogenanntes Cursor-Objekt als Ergebnis zurück. Die fetchall-Methode dieses Cursor-Objekts liefert alle Zeilen zurück, die von der Datenbank als Antwort auf die Anfrage geliefert wurden.

Standardmäßig werden Zeilen als Listen von Tupeln zurückgegeben. Zeilen lassen sich aber stattdessen auch als Dictionary darstellen, bei denen die Attributnamen die Schlüssel darstellen. Dazu muss die folgende Funktion definiert werden, die für ein Cursor-Objekt und ein Tupel mit Werten das entsprechende Dictionary berechnet (Hinweis: cur.description ist eine Liste von Tupeln, deren erstes Element jeweils die Spaltenbeschriftung des Ergebnisses ist):

def dict_factory(cur, row):
    result = {}
    for idx, col in enumerate(cur.description):
        result[col[0]] = row[idx]
    return result

Anschließend kann diese Funktion als “row factory” für die Datenbank-Schnittstelle festgelegt werden. Sie wird automatisch beim Abrufen der Ergebnisse über das Cursor-Objekt ausgeführt. Durch den folgenden Aufruf wird die “row factory” festgelegt (das muss nur einmal nach dem Öffnen der Datenbank gemacht werden), so dass Zeilen ab jetzt als Dictionaries statt als Tupel zurückgegeben werden:

>>> db.row_factory = dict_factory
>>> cur = db.execute('select * from Vorlesungen;')
>>> cur.fetchall()
[{'Id': 1, 'Titel': 'Informatik für Nebenfach'}, {'Id': 2, 'Titel': 'IQSH Weiterbildung Informatik'}]

Damit der Zugriff auf alle Attribut-Werte möglich ist, muss man in der Anfrage (ggf. mit Alias-Namen) dafür sorgen, dass im Ergebnis keine Attribut-Namen doppelt verwendet werden.

Die execute-Methode auf Datenbanken hat einen optionalen zweiten Parameter, der verwendet werden kann, um Argumente an SQL-Anfragen zu übergeben. Dazu können in SQL-Anfragen Fragezeichen als Platzhalter verwendet werden, die von links nach rechts durch Elemente einer zusätzlich übergebenen Liste ersetzt werden. Auf diese Weise ist sichergestellt, dass die Argumente korrekt in SQL dargestellt werden, was auch sogenannten “SQL-Injection”-Angriffen vorbeugt. Der folgende Aufruf veranschaulicht den zusätzlichen Parameter.

>>> cur = db.execute('select Titel from Vorlesungsverzeichnis where Nachname = ?;', ['Fischer'])
>>> cur.fetchall()
[{'Titel': 'IQSH Weiterbildung Informatik'}]

Beispiel: Filmdatenbank

Für das folgende Beispiel entwerfen wir eine Film-Datenbank mit Filmen und Personen, die als Regisseur/in eines Films oder als Schauspieler/innen in Filmen fungieren können. Das folgende Diagramm veranschaulicht das Datenbankschema:

Diagramm

Zwischen Personen als Regisseur/in und Filmen besteht eine 1-zu-N-Beziehung. Zwischen Personen als Schauspieler/innen und Filmen besteht eine N-zu-M-Beziehung, die durch die Tabelle Is_Actor_In vermittelt wird.

Datenbank anlegen

Im Folgenden erzeugen wir die Film-Datenbank allein mit Hilfe von SQL-Anweisungen über die Python-Schnittstelle. Die dazu nötigen Tabellen legen wir mit den folgenden create table Anweisungen an:

movie_db = sqlite3.connect('movies.db')
movie_db.row_factory = dict_factory

movie_db.execute('''
create table person (
    id integer primary key autoincrement not null unique,
    name text not null
);
''')

movie_db.execute('''
create table movie (
    id integer primary key autoincrement not null unique,
    title text not null,
    year integer not null,
    directed_by integer not null,
    foreign key (directed_by) references person(id) on delete cascade
);
''')

movie_db.execute('''
create table is_actor_in (
    id integer primary key autoincrement not null unique,
    person integer not null,
    movie integer not null,
    foreign key (person) references person(id) on delete cascade,
    foreign key (movie) references movie(id) on delete cascade
);
''')

movie_db.commit()

Der Methodenaufruf commit am Ende sorgt dafür, dass die Änderungen dauerhaft in die Datenbank übernommen werden.

Wir wollen nun die wie folgt als Dictionary dargestellten Filme in die Datenbank eintragen.

movies = [
    {'title': 'Das Schweigen der Lämmer', 'year': 1991,
     'director': 'Jonathan Demme',
     'actors': ['Jodie Foster', 'Anthony Hopkins']},
    {'title': 'Ocean\'s Eleven', 'year': 2001,
     'director': 'Steven Soderbergh',
     'actors': ['Brad Pitt', 'George Clooney', 'Matt Damon', 'Julia Roberts']},
    {'title': 'Money Monster', 'year': 2016,
    'director': 'Jodie Foster',
    'actors': ['George Clooney', 'Julia Roberts', 'Jodie Foster']}
]

Dazu verwenden wir drei Hilfsfunktionen zum Einfügen von Werten in die angelegten Tabellen. Diese Hilfsfunktionen fügen den übergebenen Eintrag in die entsprechende Tabelle ein, falls er noch nicht existiert. In jedem Fall wird die id des einzufügenden Eintrags zurückgeliefert. Wenn der Eintrag bereits existiert, wird die existierende id geliefert.

def insert_person(db, name):
    cur = db.execute('select id from person where name = ?;', [name])
    rows = cur.fetchall()
    
    if len(rows) > 0:
        return rows[0]['id']
    
    db.execute('insert into person (name) values (?);', [name])
    db.commit()
    return insert_person(db, name)

def insert_movie(db, movie):
    cur = db.execute('select id from movie where title = ? and year = ? and director = ?;', [movie['title'], movie['year'], movie['director']])
    rows = cur.fetchall()
    
    if len(rows) > 0:
        return rows[0]['id']
    
    db.execute('insert into movie (title, year, director) values (?, ?, ?);', [movie['title'], movie['year'], movie['director']])
    db.commit()
    return insert_movie(db, movie)

def insert_actor(db, actor):
    cur = db.execute('select id from is_actor_in where movie = ? and person = ?;', [actor['movie'], actor['person']])
    rows = cur.fetchall()
    
    if len(rows) > 0:
        return rows[0]['id']
    
    db.execute('insert into is_actor_in (movie, person) values (?, ?);', [actor['movie'], actor['person']])
    db.commit()
    return insert_actor(db, actor)

Auch hier muss jeweils commit nach execute aufgerufen werden, um die Änderung in die Datenbank dauerhaft zu übernehmen.

Diese Hilfsfunktionen können wir verwenden, um die gezeigten Filme in die Datenbank einzutragen.

for movie in movies:
    movie['director'] = insert_person(movie_db, movie['director'])
    movie_id = insert_movie(movie_db, movie)
    for actor in movie['actors']:
        actor_id = insert_person(movie_db, actor)
        insert_actor(movie_db, {'movie': movie_id, 'person': actor_id})

Entwurf einer Datenbank-Klasse

Bei genauerer Betrachtung fällt auf, dass die Implementierungen der Hilfsfunktionen zum Einfügen von Datensätzen sich ähneln. Wir werden die bisherigen Definitionen im Folgenden so abstrahieren, dass wiederverwendbare Fragmente entstehen, die es erlauben, unser Programm kompakter zu implementieren.

Zunächst definieren wir eine Klasse Db, die ein Datenbank- Objekt kapselt, das Ergebnisse als Dictionary zurückliefert. Wir definieren eine eigene execute-Methode, die Fehler im Terminal ausgibt ohne das Programm abzubrechen:

class Db:

    def __init__(self, file_name):
        self.db = sqlite3.connect(file_name)
        self.db.row_factory = dict_factory

    def execute(self, sql, args = []):
        try:
            cur = self.db.execute(sql, args)
            rows = cur.fetchall()
            self.db.commit()
            return rows
        except Exception as e:
            print(e)
            return {}

Die Notation args = [] in der Parameter-Liste sorgt dafür, dass args als leere Liste initialisiert wird, wenn man kein zweites Argument übergibt. Die mit try und except definierten Abschnitte implementieren die Fehlerbehandlung.

Als Nächstes definieren wir eine Methode create, die eine neue Tabelle in der Datenbank anlegt. Falls bereits eine gleichnamige Tabelle existiert, wird diese vorher gelöscht:

class Db:

    def create(self, name, defs):
        self.execute(f'drop table if exists {name};')
        self.execute(f'create table {name} ({defs});')

Wir könnten die Tabelle person der Filmdatenbank mit Hilfe dieser Methode wie folgt (neu) erzeugen:

db = Db('movies.db')
db.create('person', '''
    id integer primary key autoincrement not null unique,
    name text not null
''')

Um auf Tabellen zuzugreifen, definieren wir eine weitere Methode:

class Db:

    def table(self, name):
        result = self.execute(f'pragma table_info({name});')
        cols = [row['name'] for row in result if row['name'] != 'id']
        if len(result) == 0:
            return None
        if any([row['name'] == 'id' and row['pk'] == 1 for row in result]):
            return Table(self, name, cols)
        else:
            return View(self, name, cols)

Mit Hilfe der verwendeten pragma-Anfrage können wir in SQLite-Datenbanken Informationen über Tabellen abgfragen. Jede Zeile des Ergebnisses beschreibt ein Attribut der Tabelle. So können wir erkennen, wie die Spalten der Tabelle heißen und welches Attribut ein Primärschlüssel ist. Für Tabellen mit einer Primärschlüssel-Spalte id erzeugen wir ein zugehöriges Table- Objekt, für andere Tabellen ein View-Objekt. Beiden übergeben wir die Namen der Spalten (ohne id).

Die Klasse Table ist eine Unterklasse von View. View definiert lesende Methoden zum Zugriff auf die enthaltenen Datensätze. Table definiert zusätzliche Methoden zum Zugriff über eine id sowie schreibende Methoden zur Manipulation der enthaltenen Daten.

Hier sind die ersten Methoden der View-Klasse.

class View:

    def __init__(self, db, name, cols):
        self.db = db
        self.name = name
        self.cols = cols

    def all(self):
        return self.db.execute(f'select * from {self.name};')

    def size(self):
        result = self.db.execute(f'select count(*) as count from {self.name} limit 1;')
        return result[0]['count'] if len(result) > 0 else 0

Die Methode all liefert alle Datensätze der beschriebenen Tabelle (oder einer View) zurück, die Methode size nur deren Anzahl. Der Aufruf

db.table('person').size()

liefert also das selbe Ergebnis zurück wie

len(db.table('person').all())

Allerdings ist der direkte Aufruf von size deutlich effizienter, wenn die Tabelle viele Datensätze enthält, da er keine Liste aller Datensätze erzeugt sondern nur deren Anzahl aus der Datenbank abfragt.

Als Nächstes definieren wir Methoden zur Abfrage von Datensätzen mit einer Bedingung:

class View:

    def all_where(self, cond, args = []):
        return self.db.execute(f'select * from {self.name} where {cond};', args)

    def one_where(self, cond, args = []):
        result = self.db.execute(f'select * from {self.name} where {cond} limit 1;', args)
        return result[0] if len(result) > 0 else None

Die Methode all_where liefert eine Liste aller gefundenen Datensätze, während one_where nur ein einziges Dictionary (oder None) liefert. Hierbei liefert

db.table('person').one_where('name like "Jodie%"')

das selbe Ergebnis wie

db.table('person').all_where('name like "Jodie%"')[0]

allerdings ohne zwischendurch eine Liste aller gefundenen Datensätze zu berechnen.

In der Klasse Table definieren wir Methoden zum Zugriff auf Datensätze über ids:

class Table(View):

    def get(self, id):
        result = self.db.execute(f'select * from {self.name} where id = ? limit 1;', [id])
        return result[0] if len(result) > 0 else None

    def id_of(self, row):
        cond = ' and '.join([f'{col} = ?' for col in self.cols])
        vals = [row[col] for col in self.cols]
        res = self.one_where(cond, vals)
        if res == None:
            return None
        return res['id']

Die Methode get liefert einen Datensatz mit der gegebenen id (oder None, wenn kein solcher existiert). Die Methode id_of arbeitet umgekehrt und liefert eine id zu einem gegebenen Datensatz (bei dem keine id angegeben zu sein braucht). Ein Beispielaufruf dieser Methode könnte so aussehen:

db.table('person').id_of({'name': 'Brad Pitt'})

Wir definieren nun noch Methoden zum schreibenden Zugriff auf Datensätze:

class Table(View):

    def delete(self, id):
        self.db.execute(f'delete from {self.name} where id = ?;', [id])
        return self.get(id) == None

    def set(self, id, row):
        cols = ', '.join([f'{col} = ?' for col in self.cols])
        vals = [row[col] for col in self.cols]
        self.db.execute(f'update {self.name} set {cols} where id = ?;', vals + [id])
        return get(id)

Die Methode delete löscht den Datensatz mit der übergebenen id und set aktualisiert die Attribut-Werte eines über eine id spezifizierten Datensatzes. Beim Aktualisieren müssen im übergebenen Datensatz alle Attribut-Werte (außer id) vorhanden sein.

Schließlich definieren wir noch eine Methode zum Anlegen neuer Datensätze:

class Table(View):

    def insert(self, row):
        existing_id = self.id_of(row)
        if existing_id != None:
            return existing_id
        vals = [row[col] for col in self.cols]
        self.db.execute(f'insert into {self.name} ({",".join(self.cols)}) values ({(",".join(["?"] * len(self.cols)))});', vals)
        return self.id_of(row)

Diese Methode fügt einen übergebenen Datensatz mit Hilfe einer insert-Anweisung in die Tabelle ein, sofern noch kein identischer Datensatz existiert, und gibt die id des eingefügten Datensatzes zurück. Wenn schon ein identischer Datensatz in der Tabelle existiert, wird (entsprechend der Interpretation von Tabellen als Menge von Datensätzen) kein weiterer Datensatz eingefügt und die id des existierenden Datensatzes zurückgeliefert.

Mit den definierten Methoden können wir die Initialisierung der Film-Datenbank wie folgt (und ohne Hilfsfunktionen, die speziell auf Film-Objekte zugeschnitten sind) definieren:

for movie in movies:
    movie['director'] = db.table('person').insert({'name': movie['director']})
    movie_id = db.table('movie').insert(movie)
    for actor in movie['actors']:
        actor_id = db.table('person').insert({'name': actor})
        db.table('is_actor_in').insert({'person': actor_id, 'movie': movie_id})

Mit Hilfe der in den Klassen Db, View und Table definierten Methoden können wir die gängigsten Datenbank-Anfragen ohne SQL formulieren. Für komplexere Anfragen können wir weiterhin execute verwenden oder Views definieren, auf die wir dann wieder mit den Hilfsmethoden zugreifen können.

Quellen und Lesetipps