Hi I have the following example insert query:
Took it from here:
class E2E:
def __init__(self):
self.logger = logging.getLogger(self.__class__.__name__)
self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self._db_client = MySQLdb.connect(
host='127.0.0.1', user='root', passwd='local1234', db='innodb')
self._db_client.autocommit(True)
self._fields: Set[str] = {'rowId', 'sessionId', 'request', 'taskId', 's3Pointer'}
def execute_insert(self) -> None:
query = self._get_query()
cursor = self._db_client.cursor(cursorclass=None)
docs_affected = cursor.execute(query)
self._db_client.commit()
cursor.close()
if docs_affected == 0:
raise Exception("A newer version has already been added to the database")
def _get_query(self) -> str:
return """insert into upsert_table
(unkey1, unkey2, val1, val2, lastmodified)
values
('A', '20', 'Updated Meeting B', 'room 1-10', UNIX_TIMESTAMP('2018-08-25 20:00:13')),
('B', '22', 'Should not update Meeting B', 'room 2-20', UNIX_TIMESTAMP('2017-08-25 19:00:02')),
('F', '66', 'New Meeting F', 'room 6-6', UNIX_TIMESTAMP('2017-08-25 20:00:06'))
on duplicate key update
val1 = if (lastmodified<values(lastmodified), values(val1), val1),
val2 = if (lastmodified< values(lastmodified), values(val2), val2),
lastmodified = if (lastmodified<values(lastmodified), values(lastmodified), lastmodified);
"""
cls = E2E()
cls.execute_insert()
I would like to implement the same behavior with SQLalchemy. So I saw this
But I failed to find how to have a condition inside a column,
Example of code that I have tried to write:
engine = create_engine("mysql://root:local1234@127.0.0.1:3306/innodb",
encoding='latin1', echo=True)
session = Session(engine)
class UpsertTable2(Base):
__tablename__ = 'upsert_table2'
id = Column(INTEGER(unsigned=True), primary_key=True)
unkey1 = Column(String(32), nullable=False)
unkey2 = Column(String(32), nullable=False)
val1 = Column(String(32))
val2 = Column(String(32))
lastmodified = Column(INTEGER(unsigned=True))
__table_args__ = (UniqueConstraint('unkey1', 'unkey2', name='unkey1_2_uix'),)
def __repr__(self):
return f"id:{self.id}, unkey1:{self.unkey1}, unkey2:{self.unkey2}"
def sqlalchemy_to_dict(model: Base) -> Dict[String,Any]:
model_fields_and_state = model.__dict__
model_fields_and_state.pop('_sa_instance_state')
return model_fields_and_state
Base.metadata.create_all(engine)
example1 = UpsertTable2(unkey1='A', unkey2='20', val1="Updated Meeting A", val2="room 1-10", lastmodified=1)
stmt1 = insert(UpsertTable2)
as_dict = sqlalchemy_to_dict(example1)
stmt2 = stmt1.values(as_dict)
print(stmt2)
on_duplicate_key_stmt = stmt2.on_duplicate_key_update(
val1=f"if (lastmodified<{stmt2.inserted.lastmodified},{stmt2.inserted.val1}, val1)",
val2=f"if (lastmodified<{stmt2.inserted.lastmodified},{stmt2.inserted.val2}, val2)",
lastmodified=f"if (lastmodified<{stmt2.inserted.lastmodified},{stmt2.inserted.lastmodified}, lastmodified)"
)
print(on_duplicate_key_stmt)
connection = engine.connect()
connection.execute(on_duplicate_key_stmt)