วิธีสร้างไปป์ไลน์ ETL ใน Python
เผยแพร่แล้ว: 2022-01-11ETL ย่อมาจาก E xtract, T ransform, L oad ในฐานะที่เป็นส่วนหนึ่งของกระบวนการ ETL ข้อมูลจะถูกแยก แปลง และโหลดลงในคลังข้อมูล เพื่อให้องค์กรสามารถวิเคราะห์ข้อมูลเพื่อทำการตัดสินใจเชิงกลยุทธ์ได้
ต่อไปนี้คือขั้นตอนสำคัญที่ดำเนินการในไปป์ไลน์ ETL:
- แยกข้อมูล: กระบวนการนี้จะรวบรวมและรวมข้อมูลจากแหล่งต่างๆ รวมถึงฐานข้อมูล Data Lakes CRM และอื่นๆ
- การเปลี่ยนแปลง: นี่เป็นขั้นตอนที่สำคัญที่สุดในท่อส่ง ETL ในการจัดทำข้อมูล พร้อมสำหรับการวิเคราะห์ จะต้องรวบรวม จัดเรียง ทำความสะอาด และปรับเปลี่ยนข้อมูลอย่างเหมาะสมในขั้นตอนนี้
- โหลด: กระบวนการนี้เกี่ยวข้องกับการนำเข้าข้อมูลที่มีโครงสร้างหรือไม่มีโครงสร้างจาก Data Lakes ฐานข้อมูล และแหล่งข้อมูลอื่นๆ ไปยัง Data Warehouses เพื่อให้ Data Analyst หรือผู้ใช้รายอื่นสามารถได้รับข้อมูลเชิงลึกอย่างลึกซึ้งได้อย่างง่ายดาย
การทำความเข้าใจความสำคัญของ Python ETL
Python เป็นหนึ่งในภาษาการเขียนโปรแกรมที่ได้รับความนิยมและมีการใช้ประโยชน์มากที่สุดในโลกสมัยใหม่ โดยมีแอพพลิเคชั่นที่ไม่มีที่สิ้นสุดในหลากหลายสาขา ได้รับรางวัล TIOBE Programming Language of the Year 2021 อันทรงเกียรติ
ลักษณะความยืดหยุ่นและไดนามิกของ Python ทำให้เหมาะสำหรับงานการปรับใช้ การวิเคราะห์ และการบำรุงรักษา Python ETL เป็นหนึ่งในทักษะที่สำคัญที่จำเป็นใน Data Engineering เพื่อสร้าง Data Pipelines พัฒนาแบบจำลองทางสถิติ และทำการวิเคราะห์อย่างละเอียด
ได้กลายเป็นเครื่องมือยอดนิยมสำหรับดำเนินการกระบวนการ ETL เนื่องจากใช้งานง่ายและไลบรารีที่มีประสิทธิภาพสำหรับการเข้าถึงฐานข้อมูลและระบบจัดเก็บข้อมูล หลายทีมใช้ Python สำหรับ ETL & Data Engineering มากกว่าเครื่องมือ ETL เนื่องจากมีความอเนกประสงค์และมีประสิทธิภาพมากกว่าสำหรับงานเหล่านี้
ประโยชน์สูงสุดของ Python เหนือภาษาการเขียนโปรแกรมอื่นๆ คือความเรียบง่ายในการใช้งานใน Data Mining, Data Science, Big Data, ปัญญาประดิษฐ์ และ Machine Learning
บริษัทต่างๆ ทั่วโลกใช้ Python สำหรับข้อมูลเพื่อรับข้อมูลเชิงลึก จัดการการดำเนินงาน และทำให้ทุกอย่างทำงานได้อย่างราบรื่น
2 ขั้นตอนง่ายๆ ในการสร้าง Python ETL Pipeline
ในส่วนนี้ คุณจะได้เรียนรู้ขั้นตอนที่จำเป็นสำหรับการสร้าง ไปป์ไลน์ ETL โดยใช้ Python คุณจะต้องสร้าง Data Pipeline พื้นฐานที่ดึงข้อมูลเข้าสู่ฐานข้อมูล Microsoft SQL Server จากฐานข้อมูล MySQL และ Microsoft SQL Server
ในการตั้งค่าสคริปต์ Python ETL ให้ทำตามขั้นตอนด้านล่าง:
ขั้นตอนที่ 1: ติดตั้งโมดูลที่จำเป็น
ในการตั้งค่า Python ETL Pipeline คุณจะต้องติดตั้งโมดูลต่อไปนี้:
- ตัวเชื่อมต่อ Python to MySQL: mysql-connector-python (ใช้คำสั่ง pip install mysql-connector-python เพื่อติดตั้ง)
- Python กับ Microsoft SQL Server Connector: pyodbc (ใช้คำสั่ง pip install pyodbc เพื่อติดตั้ง)
ขั้นตอนที่ 2: ตั้งค่าไดเรกทอรี ETL
หลังจากติดตั้งแพ็คเกจข้างต้นแล้ว คุณต้องสร้างไฟล์ Python 4 ไฟล์ ดังที่กล่าวถึงด้านล่างในไดเร็กทอรีโครงการของคุณ:
- db_credentials.py: ไฟล์นี้มีโค้ดสำหรับสร้างการเชื่อมต่อกับฐานข้อมูลทั้งหมด
- sql_queries.py: ไฟล์นี้ประกอบด้วยการสืบค้นฐานข้อมูลที่ใช้กันทั่วไปเพื่อแยกและโหลดข้อมูลในรูปแบบสตริง
- etl.py: ไฟล์นี้มีการดำเนินการที่จำเป็นในการเชื่อมต่อกับฐานข้อมูลและเรียกใช้การสืบค้นข้อมูลที่จำเป็น
- main.py: นี่คือไฟล์หลักที่ควบคุมการไหลและการทำงานของ Python ETL Pipeline
ก) db_credentials.py
สตริงการเชื่อมต่อฐานข้อมูลต้นทางและเป้าหมายทั้งหมดควรรวมอยู่ในไฟล์นี้ ควรมีข้อมูลที่จำเป็นทั้งหมดสำหรับการเข้าถึงฐานข้อมูลที่เกี่ยวข้องในรูปแบบรายการเพื่อให้สามารถทำซ้ำได้อย่างรวดเร็วเมื่อจำเป็น ต่อไปนี้คือตัวอย่างสคริปต์ Python เพื่อสร้างการเชื่อมต่อฐานข้อมูล:
datawarehouse_name = 'your_dwh_name'
# sql-เซิร์ฟเวอร์ (ฐานข้อมูลเป้าหมาย, คลังข้อมูล)
datawarehouse_db_config = {
'Trusted_Connection': 'ใช่',
'ไดรเวอร์': '{SQL Server}',
'เซิร์ฟเวอร์': 'datawarehouse_sql_server',
'ฐานข้อมูล': '{}'.format(datawarehouse_name),
'ผู้ใช้': 'your_db_uname',
'รหัสผ่าน': 'your_db_pword',
'ส่งอัตโนมัติ': จริง
}
# source db > sql-server
sqlserver_db_config = [
{
'Trusted_Connection': 'ใช่',
'ไดรเวอร์': '{SQL Server}',
'เซิร์ฟเวอร์': 'เซิร์ฟเวอร์ของคุณ_db_sql_server',
'ฐานข้อมูล': 'db_1st',
'ผู้ใช้': 'your_db_uname',
'รหัสผ่าน': 'your_db_pword',
'ส่งอัตโนมัติ': จริง
}
]
# source db > mysql
mysql_db_config = [
{
'ผู้ใช้': 'your_1_user',
'รหัสผ่าน': 'your_1_pword',
'โฮสต์': 'db_connection_string_1',
'ฐานข้อมูล': 'db_1st',
},
{
'ผู้ใช้': 'your_2_user,
'รหัสผ่าน': 'รหัสผ่านของคุณ_2_รหัสผ่าน',
'โฮสต์': 'db_connection_string_2',
'ฐานข้อมูล': 'db_2nd',
},
]
ข) sql_queries.py
ไฟล์นี้มีคิวรีสำหรับดึงข้อมูลจากฐานข้อมูลต้นทางและโหลดลงในฐานข้อมูลเป้าหมาย สคริปต์ต่อไปนี้จะช่วยคุณทำงานนี้:
# ตัวอย่างการสืบค้นจะไม่ซ้ำกันสำหรับแพลตฟอร์มฐานข้อมูลที่แตกต่างกัน
sqlserver_extract = ('''
เลือก sqlserver_col_1, sqlserver_col_2, sqlserver_col_3
จาก sqlserver_1_table
''')
sqlserver_insert = ('''
INSERT INTO table_demo (col_1, col_2, col_3)
ค่านิยม (?, ?, ?)
''')
mysql_extract = ('''
เลือก mysql_col_1, mysql_col_2, mysql_col_3
จาก mysql_demo_table
''')
mysql_insert = ('''
INSERT INTO table_demo (col_1, col_2, col_3)
ค่านิยม (?, ?, ?)
''')
# แบบสอบถามกำลังส่งออก
คลาส SQL_Query:
def __init__(ตัวเอง, extract_query, load_query):
self.extract_query = extract_query
self.load_query = load_query
# สร้างอินสแตนซ์สำหรับคลาส SQL_Query
sqlserver_query = SqlQuery (sqlserver_extract, sqlserver_insert)
mysql_query = SqlQuery (mysql_extract, mysql_insert)
#สร้างรายการวนซ้ำค่า
mysql_queries = [mysql_query]
sqlserver_queries = [sqlserver_query]
ค) etl.py

ไฟล์นี้ควรมีรหัสที่จำเป็นในการเข้าถึงฐานข้อมูลที่เกี่ยวข้องและดำเนินการค้นหาที่จำเป็น สคริปต์ต่อไปนี้จะช่วยคุณทำงานนี้:
# โมดูลที่ใช้หลาม
นำเข้า pyodbc
นำเข้า mysql.connector
def etl (แบบสอบถาม, source_cnx, target_cnx):
# ดึงข้อมูลจากฐานข้อมูลแหล่งสาธิต
source_cursor = source_cnx.cursor ()
source_cursor.execute(query.extract_query)
ข้อมูล = source_cursor.fetchall()
source_cursor.close()
# โหลดข้อมูลลงในตัวอย่าง Data Warehouse db
ถ้าข้อมูล:
target_cursor = target_cnx.cursor ()
target_cursor.execute("ใช้ {}".format(name_for_datawarehouse))
target_cursor.executemany (query.load_query ข้อมูล)
พิมพ์ ('ข้อมูลที่โหลดไปยังฐานข้อมูลเดโมคลังข้อมูล')
target_cursor.close()
อื่น:
พิมพ์ ('ข้อมูลว่างเปล่า')
def etl_process (แบบสอบถาม, target_cnx, source_db_config, db_platform):
# การกำหนดค่าการเชื่อมต่อฐานข้อมูลแหล่งสาธิต
ถ้า db_platform == 'mysql':
source_cnx = mysql.connector.connect (**source_db_config)
elif db_platform == 'sqlserver':
source_cnx = pyodbc.connect (**source_db_config)
อื่น:
กลับ 'ข้อผิดพลาด! แพลตฟอร์มฐานข้อมูลต้นทางที่ไม่รู้จัก'
# วนซ้ำคำสั่ง sql
สำหรับแบบสอบถามในแบบสอบถาม:
etl (แบบสอบถาม, source_cnx, target_cnx)
# ปิดการเชื่อมต่อฐานข้อมูลต้นทาง
source_cnx.close()
ง) main.py
ไฟล์นี้มีโค้ดที่จะวนซ้ำผ่านข้อมูลประจำตัวที่กำหนดเพื่อเชื่อมต่อกับฐานข้อมูลและดำเนินการ ETL Python ที่จำเป็น สคริปต์ต่อไปนี้จะช่วยคุณทำงานนี้:
# ตัวแปร
จาก db_credentials นำเข้า datawarehouse_db_config, sqlserver_db_config, mysql_db_config
จาก sql_queries นำเข้า sqlserver_queries, mysql_queries
#วิธีการ
จาก etl นำเข้า etl_process
def หลัก ():
พิมพ์ ('การเริ่มต้นกระบวนการข้อมูล etl')
# สร้างการเชื่อมต่อสำหรับ SQL Server ที่เก็บข้อมูลปลายทางที่ต้องการ
target_cnx = pyodbc.connect(**datawarehouse_db_config)
# วนซ้ำผ่านข้อมูลประจำตัว
# ฐานข้อมูล > mysql
สำหรับการกำหนดค่าใน mysql_db_config:
พยายาม:
พิมพ์ ("กำลังโหลด db: " + config['database'])
etl_process (mysql_queries, target_cnx, config, 'mysql')
ยกเว้นข้อยกเว้นเป็นข้อผิดพลาด:
พิมพ์("etl สำหรับ {} มีข้อผิดพลาด".format(config['database']))
print('ข้อความแสดงข้อผิดพลาด: {}'.format(ข้อผิดพลาด))
ดำเนินต่อ
# ฐานข้อมูล > sql-เซิร์ฟเวอร์
สำหรับการกำหนดค่าใน sqlserver_db_config:
พยายาม:
พิมพ์ ("กำลังโหลด db: " + config['database'])
etl_process (sqlserver_queries, target_cnx, config, 'sqlserver')
ยกเว้นข้อยกเว้นเป็นข้อผิดพลาด:
พิมพ์("etl สำหรับ {} มีข้อผิดพลาด".format(config['database']))
print('ข้อความแสดงข้อผิดพลาด: {}'.format(ข้อผิดพลาด))
ดำเนินต่อ
target_cnx.close()
ถ้า __name__ == "__main__":
หลัก()
บทสรุป
การทำงานที่ดี! คุณได้รับความเข้าใจพื้นฐานเกี่ยวกับการสร้าง Python ETL Pipeline เรียบร้อยแล้ว ตอนนี้คุณสามารถปรับใช้สคริปต์ Python ETL แบบกำหนดเองได้ตามความต้องการของคุณโดยทำการเปลี่ยนแปลงฐานข้อมูลที่ใช้และสืบค้นตามนั้น
หากต้องการสำรวจเครื่องมือ Python ETL ที่ใช้กันอย่างแพร่หลายในอุตสาหกรรม โปรดอ่านบล็อก Best Python ETL Tools
องค์กรส่วนใหญ่ในปัจจุบันทำงานกับ Big Data ดังนั้น การสร้างไปป์ไลน์ ETL ตั้งแต่เริ่มต้นสำหรับข้อมูลดังกล่าวอาจใช้เวลานานและท้าทาย
นอกจากนี้ องค์กรต่างๆ จะต้องลงทุนทรัพยากรจำนวนมากเพื่อสร้างมันขึ้นมา จากนั้นจึงรับประกันว่าจะสามารถติดตามปริมาณข้อมูลที่สูงและความผันผวนของสคีมาได้
ดังนั้น แทนที่จะสร้างสคริปต์ ETL ตั้งแต่เริ่มต้น คุณสามารถใช้ประโยชน์จาก Data Pipelines อัตโนมัติ เช่น Hevo
มีความคิดเกี่ยวกับเรื่องนี้หรือไม่? แจ้งให้เราทราบด้านล่างในความคิดเห็นหรือดำเนินการสนทนาไปที่ Twitter หรือ Facebook ของเรา
