Jak budować potoki ETL w Pythonie
Opublikowany: 2022-01-11ETL to skrót od Extract, T ransform , Load . W ramach procesu ETL dane są wyodrębniane, przekształcane i ładowane do hurtowni danych, dzięki czemu organizacje mogą je analizować w celu podejmowania strategicznych decyzji.
Poniżej przedstawiono kluczowe kroki wykonywane w potoku ETL:
- Wyciąg: Ten proces gromadzi i integruje dane z różnych źródeł, w tym baz danych, jezior danych, systemów CRM i innych.
- Transformacja: jest to najważniejsza faza w potoku ETL. Aby dane były gotowe do analizy, muszą być odpowiednio zebrane, posortowane, wyczyszczone i przestawione na tym etapie.
- Ładowanie: ten proces obejmuje importowanie ustrukturyzowanych lub nieustrukturyzowanych danych z jezior danych, baz danych i innych źródeł do hurtowni danych, dzięki czemu analitycy danych lub inni użytkownicy mogą łatwo uzyskać szczegółowe informacje.
Zrozumienie znaczenia Pythona ETL
Python to jeden z najpopularniejszych i powszechnie wykorzystywanych języków programowania współczesnego świata, z nieskończonymi aplikacjami w różnych dziedzinach. Zdobył prestiżową nagrodę TIOBE Language Programming of the Year 2021.
Elastyczny i dynamiczny charakter Pythona czyni go idealnym do zadań wdrażania, analizy i konserwacji. Python ETL to jedna z kluczowych umiejętności wymaganych w inżynierii danych do budowania potoków danych, opracowywania modeli statystycznych i przeprowadzania ich dokładnej analizy.
Stało się popularnym narzędziem do wykonywania procesów ETL ze względu na łatwość obsługi i solidne biblioteki dostępu do baz danych i systemów pamięci masowej. Wiele zespołów używa Pythona do ETL i inżynierii danych zamiast narzędzia ETL, ponieważ jest ono bardziej wszechstronne i wydajne do tych zadań.
Największą zaletą Pythona w porównaniu z innymi językami programowania jest prostota użycia w eksploracji danych, nauce o danych, Big Data, sztucznej inteligencji i uczeniu maszynowym.
Firmy na całym świecie używają Pythona do swoich danych, aby uzyskać wgląd, zarządzać swoimi operacjami i zapewnić płynne działanie.
2 proste kroki do zbudowania potoku ETL w Pythonie
W tej części poznasz podstawowe kroki tworzenia potoku ETL za pomocą Pythona . Utworzysz podstawowy potok danych, który przesyła dane do bazy danych Microsoft SQL Server z baz danych MySQL i Microsoft SQL Server.
Aby skonfigurować skrypt Python ETL, wykonaj poniższe czynności:
Krok 1: Zainstaluj wymagane moduły
Aby skonfigurować Python ETL Pipeline, musisz zainstalować następujące moduły:
- Złącze Python do MySQL: mysql-connector-python (użyj polecenia pip install mysql-connector-python, aby zainstalować)
- Złącze Python do Microsoft SQL Server: pyodbc (użyj polecenia pip install pyodbc , aby zainstalować)
Krok 2: Skonfiguruj katalog ETL
Po zainstalowaniu powyższych pakietów, musisz utworzyć 4 pliki Pythona, wymienione poniżej w katalogu twojego projektu:
- db_credentials.py: Ten plik zawiera kod do nawiązywania połączeń ze wszystkimi bazami danych.
- sql_queries.py: Ten plik zawiera często używane zapytania bazy danych do wyodrębniania i ładowania danych w formacie ciągu.
- etl.py: Ten plik zawiera operacje niezbędne do połączenia z bazą danych i uruchomienia wymaganych zapytań.
- main.py: Jest to podstawowy plik, który reguluje przepływ i wykonanie potoku ETL Pythona.
A) db_credentials.py
Wszystkie ciągi połączeń źródłowej i docelowej bazy danych powinny być zawarte w tym pliku. Powinna ona zawierać wszystkie informacje niezbędne do uzyskania dostępu do odpowiedniej bazy danych w formie listy, aby w razie potrzeby można było ją szybko iterować. Poniżej znajduje się przykładowy skrypt Pythona do nawiązania połączenia z bazą danych:
datawarehouse_name = 'twoja_nazwa_dwh'
# sql-server (docelowa baza danych, hurtownia danych)
datawarehouse_db_config = {
'Trusted_Connection': 'tak',
'sterownik': '{Serwer SQL}',
'serwer': 'datawarehouse_sql_server',
'baza danych': '{}'.format(datawarehouse_name),
'użytkownik': 'twoja_nazwa_bazy_danych',
'hasło': 'twoje_db_hasło',
„automatyczne zatwierdzanie”: Prawda,
}
# źródłowa baza danych > serwer sql
sqlserver_db_config = [
{
'Trusted_Connection': 'tak',
'sterownik': '{Serwer SQL}',
'serwer': 'twój_db_sql_serwer',
'baza danych': 'db_1st',
'użytkownik': 'twoja_nazwa_bazy_danych',
'hasło': 'twoje_db_hasło',
„automatyczne zatwierdzanie”: Prawda,
}
]
# baza danych źródłowych > mysql
mysql_db_config = [
{
'użytkownik': 'twój_1_użytkownik',
'hasło': 'twoje_1_hasło',
'host': 'db_connection_string_1',
'baza danych': 'db_1st',
},
{
'użytkownik': 'twój_2_użytkownik,
'hasło': 'twoje_2_hasło',
'host': 'db_connection_string_2',
'baza danych': 'db_2nd',
},
]
B) sql_queries.py
Plik ten zawiera zapytania do wydobycia danych ze źródłowych baz danych i załadowania ich do docelowej bazy danych. Poniższy skrypt pomoże ci wykonać to zadanie:
# przykładowe zapytania, będą unikalne dla różnych platform bazodanowych
sqlserver_extract = ('''
WYBIERZ sqlserver_col_1, sqlserver_col_2, sqlserver_col_3
Z sqlserver_1_table
'''')
sqlserver_insert = ('''
INSERT INTO table_demo (col_1, col_2, col_3)
WARTOŚCI (?, ?, ?)
'''')
mysql_extract = ('''
WYBIERZ mysql_col_1, mysql_col_2, mysql_col_3
Z mysql_demo_table
'''')
mysql_insert = ('''
INSERT INTO table_demo (col_1, col_2, col_3)
WARTOŚCI (?, ?, ?)
'''')
# Kwerendy są eksportowane
klasa Sql_Query:
def __init__(self, extract_query, load_query):
self.extract_query = ekstrakt_zapytanie
self.load_query = load_query
# utwórz instancje dla klasy Sql_Query
sqlserver_query = SqlQuery (sqlserver_extract, sqlserver_insert)
mysql_query = SqlQuery (mysql_extract, mysql_insert)
# tworzenie listy do iteracji przez wartości
mysql_query = [mysql_query]
sqlserver_queries = [sqlserver_query]
C) etl.py

Plik ten powinien zawierać kod wymagany do uzyskania dostępu do odpowiednich baz danych i wykonania wymaganych zapytań. Poniższy skrypt pomoże ci wykonać to zadanie:
# modułów opartych na Pythonie
importuj pyodbc
import mysql.connector
def etl(zapytanie, source_cnx, target_cnx):
# wyodrębnij dane z demo źródłowej bazy danych
kursor_źródłowy = kursor_źródłowy.kursor()
source_cursor.execute(query.extract_query)
dane = kursor_źródłowy.fetchall()
kursor_źródłowy.zamknij()
# załaduj dane do demo hurtowni danych db
jeśli dane:
cel_kursor = cel_cnx.kursor()
target_cursor.execute("USE {}".format(name_for_datawarehouse))
target_cursor.executemany(query.load_query, dane)
print('dane załadowane do demo Hurtowni Danych db')
cel_kursor.zamknij()
w przeciwnym razie:
print('dane są puste')
def etl_process(zapytania, target_cnx, source_db_config, db_platform):
# konfigurowanie połączenia z bazą danych źródła demo
if db_platform == 'mysql':
source_cnx = mysql.connector.connect(**source_db_config)
elif db_platform == 'sqlserver':
source_cnx = pyodbc.connect(**source_db_config)
w przeciwnym razie:
return 'Błąd! nierozpoznana platforma źródłowej bazy danych”
# pętla przez zapytania sql
dla zapytania w zapytaniach:
etl (zapytanie, source_cnx, target_cnx)
# zamknij źródłowe połączenie z bazą danych
source_cnx.zamknij()
D) główna.py
Ten plik zawiera kod do iteracji przez podane poświadczenia, aby połączyć się z bazą danych i wykonać niezbędne operacje ETL Python. Poniższy skrypt pomoże ci wykonać to zadanie:
# zmienne
z db_credentials importuj datawarehouse_db_config, sqlserver_db_config, mysql_db_config
z sql_queries import sqlserver_queries, mysql_queries
# metody
z etl importuje etl_process
def główna():
print('rozpoczęcie procesu danych etl')
# nawiąż połączenie dla SQL Server, żądane miejsce docelowe
target_cnx = pyodbc.connect(**datawarehouse_db_config)
# Pętla przez poświadczenia
# Baza danych > mysql
dla konfiguracji w mysql_db_config:
próbować:
print("ładowanie bazy danych: " + config['baza danych'])
etl_process(mysql_queries, target_cnx, config, 'mysql')
z wyjątkiem Wyjątku jako błędu:
print("etl for {} ma błąd".format(config['baza danych']))
print('komunikat o błędzie: {}'.format(błąd))
kontyntynuj
# Baza danych > serwer sql
dla konfiguracji w sqlserver_db_config:
próbować:
print("ładowanie bazy danych: " + config['baza danych'])
etl_process(sqlserver_queries, target_cnx, config, 'sqlserver')
z wyjątkiem Wyjątku jako błędu:
print("etl for {} ma błąd".format(config['baza danych']))
print('komunikat o błędzie: {}'.format(błąd))
kontyntynuj
cel_cnx.zamknij()
if __name__ == "__main__":
Główny()
Wniosek
Świetna robota! Pomyślnie zdobyłeś podstawową wiedzę na temat budowania Python ETL Pipeline. Teraz możesz zaimplementować swój niestandardowy skrypt ETL języka Python w oparciu o swoje wymagania, wprowadzając zmiany w używanych bazach danych i odpowiednio wysyłając zapytania.
Aby zapoznać się z szeroko stosowanymi narzędziami Python ETL w branży, przeczytaj blog Best Python ETL Tools.
Większość organizacji pracuje obecnie z Big Data. Dlatego tworzenie potoku ETL od podstaw dla takich danych może być czasochłonne i trudne.
Co więcej, przedsiębiorstwa będą musiały zainwestować znaczne środki, aby go zbudować, a następnie zagwarantować, że nadążą za dużym wolumenem danych i fluktuacjami schematów.
Tak więc zamiast tworzyć skrypty ETL od podstaw, możesz wykorzystać zautomatyzowane potoki danych, takie jak Hevo.
Masz jakieś przemyślenia na ten temat? Daj nam znać poniżej w komentarzach lub przenieś dyskusję na naszego Twittera lub Facebooka.
