SQL trabaja con y devuelve datos tabulares (o relaciones, si prefiere pensarlo de esa manera, pero no todas las tablas de SQL son relaciones). Lo que esto implica es que una tabla anidada como la que se muestra en la pregunta no es una característica tan común. Hay formas de producir algo por el estilo en Postgresql, por ejemplo, usando arreglos de JSON o compuestos, pero es completamente posible obtener datos tabulares y realizar el anidamiento en la aplicación. Python tiene itertools.groupby()
, que se ajusta bastante bien, dados los datos ordenados.
El error column "incoming.id" must appear in the GROUP BY clause...
está diciendo que los no agregados en la lista de selección, la cláusula que tiene, etc. deben aparecer en el GROUP BY
cláusula o usarse en un agregado, para que no tengan posiblemente valores indeterminados . En otras palabras, el valor tendría que elegirse de alguna fila del grupo, porque GROUP BY
condensa las filas agrupadas en una sola fila , y cualquiera podría adivinar de qué fila fueron elegidos. La implementación podría permitir esto, como lo hace SQLite y MySQL, pero el estándar SQL lo prohíbe. La excepción a la regla es cuando hay una dependencia funcional
; el GROUP BY
cláusula determina los no agregados. Piense en una unión entre tablas A y B agrupados por A la clave principal de . Independientemente de la fila de un grupo, el sistema elegiría los valores para A Las columnas de , serían las mismas ya que la agrupación se realizó en función de la clave principal.
Para abordar el enfoque general previsto de 3 puntos, una forma sería seleccionar una unión de entradas y salidas, ordenadas por sus marcas de tiempo. Dado que no existe una jerarquía de herencia configuración, ya que puede que ni siquiera haya una, no estoy familiarizado con la contabilidad, volver a usar Core y tuplas de resultados simples facilita las cosas en este caso:
incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
where(Incoming.accountID == accountID)
outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
where(Outgoing.accountID == accountID)
all_entries = incoming.union(outgoing)
all_entries = all_entries.order_by(all_entries.c.timestamp)
all_entries = db_session.execute(all_entries)
Luego, para formar la estructura anidada itertools.groupby()
se usa:
date_groups = groupby(all_entries, lambda ent: ent.timestamp.date())
date_groups = [(k, [dict(ent) for ent in g]) for k, g in date_groups]
El resultado final es una lista de 2 tuplas de fecha y una lista de diccionarios de entradas en orden ascendente. No es exactamente la solución ORM, pero hace el trabajo. Un ejemplo:
In [55]: session.add_all([Incoming(accountID=1, amount=1, description='incoming',
...: timestamp=datetime.utcnow() - timedelta(days=i))
...: for i in range(3)])
...:
In [56]: session.add_all([Outgoing(accountID=1, amount=2, description='outgoing',
...: timestamp=datetime.utcnow() - timedelta(days=i))
...: for i in range(3)])
...:
In [57]: session.commit()
In [58]: incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
...: where(Incoming.accountID == 1)
...:
...: outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
...: where(Outgoing.accountID == 1)
...:
...: all_entries = incoming.union(outgoing)
...: all_entries = all_entries.order_by(all_entries.c.timestamp)
...: all_entries = db_session.execute(all_entries)
In [59]: date_groups = groupby(all_entries, lambda ent: ent.timestamp.date())
...: [(k, [dict(ent) for ent in g]) for k, g in date_groups]
Out[59]:
[(datetime.date(2019, 9, 1),
[{'accountID': 1,
'amount': 1.0,
'description': 'incoming',
'id': 5,
'timestamp': datetime.datetime(2019, 9, 1, 20, 33, 6, 101521),
'type': 'incoming'},
{'accountID': 1,
'amount': 2.0,
'description': 'outgoing',
'id': 4,
'timestamp': datetime.datetime(2019, 9, 1, 20, 33, 29, 420446),
'type': 'outgoing'}]),
(datetime.date(2019, 9, 2),
[{'accountID': 1,
'amount': 1.0,
'description': 'incoming',
'id': 4,
'timestamp': datetime.datetime(2019, 9, 2, 20, 33, 6, 101495),
'type': 'incoming'},
{'accountID': 1,
'amount': 2.0,
'description': 'outgoing',
'id': 3,
'timestamp': datetime.datetime(2019, 9, 2, 20, 33, 29, 420419),
'type': 'outgoing'}]),
(datetime.date(2019, 9, 3),
[{'accountID': 1,
'amount': 1.0,
'description': 'incoming',
'id': 3,
'timestamp': datetime.datetime(2019, 9, 3, 20, 33, 6, 101428),
'type': 'incoming'},
{'accountID': 1,
'amount': 2.0,
'description': 'outgoing',
'id': 2,
'timestamp': datetime.datetime(2019, 9, 3, 20, 33, 29, 420352),
'type': 'outgoing'}])]
Como se mencionó, Postgresql puede producir prácticamente el mismo resultado que usar una matriz de JSON:
from sqlalchemy.dialects.postgresql import aggregate_order_by
incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
where(Incoming.accountID == accountID)
outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
where(Outgoing.accountID == accountID)
all_entries = incoming.union(outgoing).alias('all_entries')
day = func.date_trunc('day', all_entries.c.timestamp)
stmt = select([day,
func.array_agg(aggregate_order_by(
func.row_to_json(literal_column('all_entries.*')),
all_entries.c.timestamp))]).\
group_by(day).\
order_by(day)
db_session.execute(stmt).fetchall()
Si de hecho Incoming
y Outgoing
se pueden considerar como hijos de una base común, por ejemplo Entry
, el uso de uniones se puede automatizar un poco con herencia de tablas concretas
:
from sqlalchemy.ext.declarative import AbstractConcreteBase
class Entry(AbstractConcreteBase, Base):
pass
class Incoming(Entry):
__tablename__ = 'incoming'
id = Column(Integer, primary_key=True)
accountID = Column(Integer, ForeignKey('account.id'))
amount = Column(Float, nullable=False)
description = Column(Text, nullable=False)
timestamp = Column(TIMESTAMP, nullable=False)
account = relationship("Account", back_populates="incomings")
__mapper_args__ = {
'polymorphic_identity': 'incoming',
'concrete': True
}
class Outgoing(Entry):
__tablename__ = 'outgoing'
id = Column(Integer, primary_key=True)
accountID = Column(Integer, ForeignKey('account.id'))
amount = Column(Float, nullable=False)
description = Column(Text, nullable=False)
timestamp = Column(TIMESTAMP, nullable=False)
account = relationship("Account", back_populates="outgoings")
__mapper_args__ = {
'polymorphic_identity': 'outgoing',
'concrete': True
}
Desafortunadamente usando AbstractConcreteBase
requiere una llamada manual a configure_mappers()
cuando se han definido todas las clases necesarias; en este caso, la primera posibilidad es después de definir User
, porque Account
depende de él a través de las relaciones:
from sqlalchemy.orm import configure_mappers
configure_mappers()
Luego, para obtener todos los Incoming
y Outgoing
en una sola consulta ORM polimórfica use Entry
:
session.query(Entry).\
filter(Entry.accountID == accountID).\
order_by(Entry.timestamp).\
all()
y proceda a usar itertools.groupby()
como arriba en la lista resultante de Incoming
y Outgoing
.