En los dos artículos anteriores de esta serie, discutimos cómo usar Python y SQLAlchemy para realizar el proceso ETL. Hoy haremos lo mismo, pero esta vez usando Python y SQL Alchemy sin comandos SQL en formato textual. Esto nos permitirá usar SQLAlchemy independientemente del motor de base de datos al que estemos conectados. Entonces, comencemos.
Hoy discutiremos cómo realizar el proceso ETL usando Python y SQLAlchemy. Crearemos un script para extraer datos diarios de nuestra base de datos operativa, transformarlos y luego cargarlos en nuestro almacén de datos.
Este es el tercer artículo de la serie. Si no ha leído los dos primeros artículos (Uso de Python y MySQL en el proceso ETL y SQLAlchemy), le recomiendo que lo haga antes de continuar.
Toda esta serie es una continuación de nuestra serie de almacenamiento de datos:
- Creación de un DWH, primera parte:un modelo de datos comerciales de suscripción
- Creación de un DWH, segunda parte:un modelo de datos comerciales de suscripción
- Creación de un almacén de datos, parte 3:un modelo de datos comerciales de suscripción
Bien, ahora comencemos con el tema de hoy. Primero, veamos los modelos de datos.
Los modelos de datos
Modelo de datos de base de datos operativa (en vivo)
Modelo de datos DWH
Estos son los dos modelos de datos que usaremos. Para obtener más información sobre los almacenes de datos (DWH), consulte estos artículos:
- El esquema estelar
- El esquema del copo de nieve
- Esquema de estrella frente a esquema de copo de nieve
¿Por qué SQLAlchemy?
La idea detrás de SQLAlchemy es que después de importar bases de datos, no necesitamos código SQL que sea específico para el motor de base de datos relacionado. En cambio, podemos importar objetos a SQLAlchemy y usar la sintaxis de SQLAlchemy para las declaraciones. Eso nos permitirá usar el mismo idioma sin importar a qué motor de base de datos estemos conectados. La principal ventaja aquí es que un desarrollador no necesita preocuparse por las diferencias entre los diferentes motores de base de datos. Su programa SQLAlchemy funcionará exactamente igual (con cambios menores) si migra a un motor de base de datos diferente.
Decidí usar solo comandos de SQLAlchemy y listas de Python para comunicarme con el almacenamiento temporal y entre diferentes bases de datos. Las razones clave detrás de esta decisión son que 1) las listas de Python son bien conocidas y 2) el código sería legible para aquellos sin conocimientos de Python.
Esto no quiere decir que SQLAlchemy sea perfecto. Tiene ciertas limitaciones, que discutiremos más adelante. Por ahora, echemos un vistazo al siguiente código:
Ejecutando el script y el resultado
Este es el comando de Python utilizado para llamar a nuestro script. El script verifica los datos en la base de datos operativa, compara los valores con el DWH e importa los nuevos valores. En este ejemplo, estamos actualizando valores en dos tablas de dimensiones y una tabla de hechos; el script devuelve el resultado apropiado. Todo el script está escrito para que pueda ejecutarlo varias veces al día. Eliminará los datos "antiguos" de ese día y los reemplazará con nuevos.
Analicemos todo el guión, comenzando desde arriba.
Importando SQLAlchemy
Lo primero que debemos hacer es importar los módulos que usaremos en el script. Por lo general, importará sus módulos mientras escribe el script. En la mayoría de los casos, no sabrá exactamente qué módulos necesitará desde el principio.
from datetime import date # import SQLAlchemy from sqlalchemy import create_engine, select, MetaData, Table, and_, func, case
Hemos importado datetime
de Python módulo, que nos proporciona clases que trabajan con fechas.
A continuación, tenemos el sqlalchemy
módulo. No importaremos todo el módulo, solo las cosas que necesitamos, algunas específicas de SQLAlchemy (create_engine
, MetaData
, Table
), algunas partes de sentencias SQL (select
, and_
, case
) y func
, que nos permite usar funciones como count() y suma() .
Conexión a las bases de datos
Tendremos que conectarnos a dos bases de datos en nuestro servidor. Podríamos conectarnos a más bases de datos (MySQL, SQL Server o cualquier otra) desde diferentes servidores si fuera necesario. En este caso, ambas bases de datos son bases de datos MySQL y están almacenadas en mi máquina local.
# connect to databases engine_live = sqlalchemy.create_engine('mysql+pymysql://: @localhost:3306/subscription_live') connection_live = engine_live.connect() engine_dwh = sqlalchemy.create_engine('mysql+pymysql:// : @localhost:3306/subscription_dwh') connection_dwh = engine_dwh.connect() metadata = MetaData(bind=None)
Hemos creado dos motores y dos conexiones. No entraré en detalles aquí porque ya explicamos este punto en el artículo anterior.
Actualizando el dim_time Dimensión
Objetivo:insertar la fecha de ayer si aún no está insertada en la tabla.
En nuestro script, actualizaremos dos tablas de dimensiones con nuevos valores. El resto de ellos siguen el mismo patrón, por lo que solo repasaremos esto una vez; no necesitamos escribir un código casi idéntico unas cuantas veces más.
La idea es muy simple. Siempre ejecutaremos el script para insertar nuevos datos para ayer. Por lo tanto, debemos verificar si esa fecha se insertó en la tabla de dimensiones. Si ya está, no haremos nada; si no es así, lo agregaremos. Echemos un vistazo al código para actualizar el dim_time
mesa.
Primero, verificaremos si la fecha existe. Si no existe, lo agregaremos. Comenzamos almacenando la fecha de ayer en una variable. En Python, lo haces de esta manera:
yesterday = date.fromordinal(date.today().toordinal()-1) yesterday_str = str(yesterday)
La primera línea toma una fecha actual, la convierte en un valor numérico, resta 1 de ese valor y vuelve a convertir ese valor numérico en una fecha (ayer =hoy – 1 ). La segunda línea almacena la fecha en formato de texto.
A continuación, probaremos si la fecha ya está en la base de datos:
table_dim_time = Table('dim_time', metadata, autoload = True, autoload_with = engine_dwh) stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday_str) result = connection_dwh.execute(stmt).fetchall() date_exists = len(result)
Después de cargar la tabla, ejecutaremos una consulta que debería devolver todas las filas de la tabla de dimensiones donde el valor de hora/fecha es igual a ayer. El resultado podría tener 0 (no hay tal fecha en la tabla) o 1 fila (la fecha ya está en la tabla).
Si la fecha aún no está en la tabla, usaremos el comando insert() para agregarla:
if date_exists == 0: print("New value added.") stmt = table_dim_time.insert().values(time_date=yesterday, time_year=yesterday.year, time_month=yesterday.month, time_week=yesterday.isocalendar()[1], time_weekday=yesterday.weekday()) connection_dwh.execute(stmt) else: print("No new values.")
Una cosa nueva aquí que me gustaría señalar es el uso de. .year
, .month
, .isocalendar()[1]
y .weekday
para obtener partes de fechas.
Actualizando la dim_city Dimensión
Objetivo:insertar nuevas ciudades si las hay (es decir, comparar la lista de ciudades en la base de datos en vivo con la lista de ciudades en el DWH y agregar las que faltan).
Actualizando el dim_time
dimensión era bastante simple. Simplemente probamos si una fecha estaba en la tabla y la insertamos si aún no estaba allí. Para probar un valor en la base de datos DWH, usamos una variable de Python (ayer ). Usaremos ese proceso nuevamente, pero esta vez con listas.
Dado que no existe una manera fácil de combinar tablas de diferentes bases de datos en una sola consulta de SQLAlchemy, no podemos usar el enfoque descrito en la Parte 1 de esta serie. Por lo tanto, necesitaremos un objeto para almacenar los valores necesarios para la comunicación entre estas dos bases de datos. Decidí usar listas, porque son comunes y hacen el trabajo.
Primero, cargaremos el country
y city
tablas de una base de datos en vivo en los objetos relevantes.
# dim_city print("\nUpdating... dim_city") table_city = Table('city', metadata, autoload = True, autoload_with = engine_live) table_country = Table('country', metadata, autoload = True, autoload_with = engine_live) table_dim_city = Table('dim_city', metadata, autoload = True, autoload_with = engine_dwh)
Luego, cargaremos el dim_city
tabla del DWH en una lista:
# load whole dwh table in the list stmt = select([table_dim_city]); table_dim_city_list = connection_dwh.execute(stmt).fetchall()
Luego, haremos lo mismo con los valores de la base de datos activa. Unimos las tablas country
y city
por lo que tenemos todos los datos necesarios en esta lista:
# load all live values in the list stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name])\ .select_from(table_city\ .join(table_country)) table_city_list = connection_live.execute(stmt).fetchall()
Ahora recorreremos la lista que contiene los datos de la base de datos activa. Para cada registro, compararemos valores (city_name
, postal_code
y country_name
). Si no encontramos dichos valores, agregaremos un nuevo registro en dim_city
mesa.
# loop through live_db table # for each record test if it is missing in the dwh table new_values_added = 0 for city in table_city_list: id = -1; for dim_city in table_dim_city_list: if city[0] == dim_city[1] and city[1] == dim_city[2] and city[2] == dim_city[3]: id = dim_city[0] if id == -1: stmt = table_dim_city.insert().values(city_name=city[0], postal_code=city[1], country_name=city[2]) connection_dwh.execute(stmt) new_values_added = 1 if new_values_added == 0: print("No new values.") else: print("New value(s) added.")
Para determinar si el valor ya está en el DWH, probamos una combinación de atributos que deberían ser únicos. (La clave principal de la base de datos en vivo no nos ayuda mucho aquí). Podemos usar un código similar para actualizar otros diccionarios. No es la mejor solución, pero sigue siendo bastante elegante. Y hará exactamente lo que necesitamos.
Actualizando el fact_customer_subscribed Mesa
Objetivo:si tenemos datos antiguos para la fecha de ayer, eliminarlos primero. Agregue los datos de ayer al DWH, independientemente de si eliminamos algo en el paso anterior o no.
Después de actualizar todas las tablas de dimensiones, debemos actualizar las tablas de hechos. En nuestro script, actualizaremos solo una tabla de hechos. El razonamiento es el mismo que en la sección anterior:la actualización de otras tablas seguiría el mismo patrón, por lo que en su mayoría repetiríamos el código.
Antes de insertar valores en la tabla de hechos, necesitamos conocer los valores de las claves relacionadas de las tablas de dimensiones. Para ello, volveremos a cargar las dimensiones en las listas y las compararemos con los valores de la base de datos activa.
Lo primero que haremos será cargar el cliente y fact_customer_subscribed
tablas en objetos:
# fact_customer_subscribed print("\nUpdating... fact_customer_subscribed") table_customer = Table('customer', metadata, autoload = True, autoload_with = engine_live) table_fact_customer_subscribed = Table('fact_customer_subscribed', metadata, autoload = True, autoload_with = engine_dwh)
Ahora, necesitaremos encontrar claves para la dimensión de tiempo relacionada. Dado que siempre estamos insertando datos de ayer, buscaremos esa fecha en el dim_time
table y use su ID. La consulta devuelve 1 fila y el ID está en la primera posición (el índice comienza desde 0, por lo que es result[0][0]
):
# find key for the dim_time dimension stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday) result = connection_dwh.execute(stmt).fetchall() dim_time_id = result[0][0]
Durante ese tiempo, eliminaremos todos los registros asociados de la tabla de hechos:
# delete any existing data in the fact table for that time dimension value stmt = table_fact_customer_subscribed.delete().where(table_fact_customer_subscribed.columns.dim_time_id == dim_time_id) connection_dwh.execute(stmt)
Bien, ahora tenemos la ID de la dimensión de tiempo almacenada en el dim_time_id
variable. Esto fue fácil porque solo podemos tener un valor de dimensión de tiempo. La historia será diferente para la dimensión de la ciudad. Primero, cargaremos todos los valores que necesitamos:valores que describan de forma única la ciudad (no el ID) y valores agregados:
# prepare data for insert stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name, func.sum(case([(table_customer.columns.active == 1, 1)], else_=0)).label('total_active'), func.sum(case([(table_customer.columns.active == 0, 1)], else_=0)).label('total_inactive'), func.sum(case([(and_(table_customer.columns.active == 1, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_new'), func.sum(case([(and_(table_customer.columns.active == 0, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_canceled')])\ .select_from(table_customer\ .join(table_city)\ .join(table_country))\ .group_by(table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name)
Hay algunas cosas que me gustaría enfatizar sobre la consulta anterior:
func.sum(...)
es SUMA(...) de "SQL estándar".- El
case(...)
la sintaxis usaand_
antes de las condiciones, no entre ellas. .label(...)
funciona como un alias de SQL AS.- Estamos usando
\
para pasar a la siguiente línea y aumentar la legibilidad de la consulta. (Confía en mí, es prácticamente ilegible sin la barra inclinada; lo he probado :)) .group_by(...)
desempeña el papel de GROUP BY de SQL.
A continuación, recorreremos cada registro devuelto usando la consulta anterior. Para cada registro, compararemos los valores que definen de forma única una ciudad (city_name
, postal_code
, country_name
) con los valores almacenados en la lista creada a partir del DWH dim_city
mesa. Si los tres valores coinciden, almacenaremos la ID de la lista y la usaremos al insertar nuevos datos. De esta forma, para cada registro, tendremos ID para ambas dimensiones:
# loop through all new records # use time dimension # for each record find key for city dimension # insert row new_values = connection_live.execute(stmt).fetchall() for new_value in new_values: dim_city_id = -1; for dim_city in table_dim_city_list: if new_value[0] == dim_city[1] and new_value[1] == dim_city[2] and new_value[2] == dim_city[3]: dim_city_id = dim_city[0] if dim_city_id > 0: stmt_insert = table_fact_customer_subscribed.insert().values(dim_city_id=dim_city_id, dim_time_id=dim_time_id, total_active=new_value[3], total_inactive=new_value[4], daily_new=new_value[5], daily_canceled=new_value[6]) connection_dwh.execute(stmt_insert) dim_city_id = -1 print("Completed.")
Y eso es todo. Hemos actualizado nuestro DWH. El script sería mucho más largo si actualizáramos todas las tablas de dimensiones y hechos. La complejidad también sería mayor cuando una tabla de hechos se relaciona con más tablas de dimensiones. En ese caso, necesitaríamos un for bucle para cada tabla de dimensiones.
¡Esto no funciona!
Estaba muy decepcionado cuando escribí este script y luego descubrí que algo como esto no funcionaría:
stmt = select([table_city.columns.city_name])\ .select_from(table_city\ .outerjoin(table_dim_city, table_city.columns.city_name == table_dim_city.columns.city_name))\ .where(table_dim_city.columns.id.is_(None))
En este ejemplo, intento usar tablas de dos bases de datos diferentes. Si establecemos dos conexiones separadas, la primera conexión no "verá" las tablas de otra conexión. Si nos conectamos directamente al servidor, y no a una base de datos, no podremos cargar tablas.
Hasta que esto cambie (con suerte pronto), deberá usar algún tipo de estructura (por ejemplo, lo que hicimos hoy) para comunicarse entre las dos bases de datos. Esto complica el código, porque necesita reemplazar una sola consulta con dos listas y for anidadas bucles.
Comparta sus opiniones sobre SQLAlchemy y Python
Este fue el último artículo de esta serie. ¿Pero quién sabe? Tal vez intentemos otro enfoque en los próximos artículos, así que estad atentos. Mientras tanto, comparta sus opiniones sobre SQLAlchemy y Python en combinación con las bases de datos. ¿Qué crees que nos falta en este artículo? ¿Qué agregarías? Cuéntanos en los comentarios a continuación.
Puede descargar el script completo que usamos en este artículo aquí.
Y un agradecimiento especial a Dirk J Bosman (@dirkjobosman), quien recomendó esta serie de artículos.