Is there an elegant way to do an INSERT ... ON DUPLICATE KEY UPDATE in SQLAlchemy? I mean something with a syntax similar to inserter.insert().execute(list_of_dictionaries) ?
- 2,275
- 2
- 21
- 23
9 Answers
ON DUPLICATE KEY UPDATE post version-1.2 for MySQL
This functionality is now built into SQLAlchemy for MySQL only. somada141's answer below has the best solution: https://stackoverflow.com/a/48373874/319066
ON DUPLICATE KEY UPDATE in the SQL statement
If you want the generated SQL to actually include ON DUPLICATE KEY UPDATE, the simplest way involves using a @compiles decorator.
The code (linked from a good thread on the subject on reddit) for an example can be found on github:
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert
@compiles(Insert)
def append_string(insert, compiler, **kw):
s = compiler.visit_insert(insert, **kw)
if 'append_string' in insert.kwargs:
return s + " " + insert.kwargs['append_string']
return s
my_connection.execute(my_table.insert(append_string = 'ON DUPLICATE KEY UPDATE foo=foo'), my_values)
But note that in this approach, you have to manually create the append_string. You could probably change the append_string function so that it automatically changes the insert string into an insert with 'ON DUPLICATE KEY UPDATE' string, but I'm not going to do that here due to laziness.
ON DUPLICATE KEY UPDATE functionality within the ORM
SQLAlchemy does not provide an interface to ON DUPLICATE KEY UPDATE or MERGE or any other similar functionality in its ORM layer. Nevertheless, it has the session.merge() function that can replicate the functionality only if the key in question is a primary key.
session.merge(ModelObject) first checks if a row with the same primary key value exists by sending a SELECT query (or by looking it up locally). If it does, it sets a flag somewhere indicating that ModelObject is in the database already, and that SQLAlchemy should use an UPDATE query. Note that merge is quite a bit more complicated than this, but it replicates the functionality well with primary keys.
But what if you want ON DUPLICATE KEY UPDATE functionality with a non-primary key (for example, another unique key)? Unfortunately, SQLAlchemy doesn't have any such function. Instead, you have to create something that resembles Django's get_or_create(). Another StackOverflow answer covers it, and I'll just paste a modified, working version of it here for convenience.
def get_or_create(session, model, defaults=None, **kwargs):
instance = session.query(model).filter_by(**kwargs).first()
if instance:
return instance
else:
params = dict((k, v) for k, v in kwargs.iteritems() if not isinstance(v, ClauseElement))
if defaults:
params.update(defaults)
instance = model(**params)
return instance
- 2,196
- 17
- 31
-
1Note that the `append_string` code is non-functional on postgres (with it's new `ON CONFLICT [IGNORE|UPDATE]` feature in 9.5, as the ORM automatically appends a `RETURNING {primary key}` to inserts, and that results in invalid SQL. – Fake Name Aug 10 '15 at 03:03
-
what is the `foo=foo` part doing here, and what would I replace `foo` with in my own table? – nhinkle Jan 07 '17 at 00:53
-
`append_string` not work get `SAWarning: Can't validate argument 'append_string'; can't locate any SQLAlchemy dialect named 'append' % (k, dialect_name)` – wyx May 27 '19 at 14:33
-
1Note that the get_or_create example is exposed to race conditions on concurrent systems. Instead you should try to insert first, catch an exception on key duplication and query the result. – Korenz Feb 15 '21 at 14:27
I should mention that ever since the v1.2 release, the SQLAlchemy 'core' has a solution to the above with that's built in and can be seen under here (copied snippet below):
from sqlalchemy.dialects.mysql import insert
insert_stmt = insert(my_table).values(
id='some_existing_id',
data='inserted value')
on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
data=insert_stmt.inserted.data,
status='U'
)
conn.execute(on_duplicate_key_stmt)
- 1,114
- 2
- 14
- 25
-
-
1yeah I should've clarified. The above only works for MySQL *but* Postgres for example has had such functionality for some time now with http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html#sqlalchemy.dialects.postgresql.dml.Insert.on_conflict_do_update and http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html#sqlalchemy.dialects.postgresql.dml.Insert.on_conflict_do_nothing – somada141 Jun 04 '18 at 21:00
-
5This also works for arrays of values, if anyone ever needs that functionality. That means that `values` also accepts `list`s of `dict` objects. – sheba Nov 28 '18 at 14:47
-
May you please give a solid example of how `data` is populated? Is it like `data={'field_1'='value1'}`. Thanks – Houman Sep 04 '19 at 17:57
-
@Houman yeah that's how you define the values and then you use `mysql_noop_upsert_columns=[OrmClass.column]` to define the update columns. – somada141 Sep 05 '19 at 07:13
-
1does this works for bulk upserst too? because I have not managed to get it to work – Kailegh Feb 11 '20 at 11:43
-
@Kailegh the above example is an UPSERT (since it INSERTs and UPDATEs on a dupe). Did you mean something else? – somada141 Feb 11 '20 at 18:03
-
yeah, I mean does this work on a list of rows at the same time? o should I do it one by one? using in values an array of dicts, I have not found it in their documentation – Kailegh Feb 11 '20 at 18:54
-
@Kailegh yes it does. The `values` argument can be a `list` of `dict` with the field names you want inserted and then you can use the `mysql_upsert_columns` argument to define which columns to be upserted in the event of a dupe. SQLAlchemy will then compile the appropriate query to only update the right columns on the dupes – somada141 Feb 12 '20 at 06:21
-
1I have a unique index and an id primary key which is auto-increment.? In my case the id is keep incinerating how to resolve this issue? – M.Abulsoud Apr 22 '20 at 15:14
-
@somada141 I couldn't get it working with a list of dict. I assume that you'd pass the list of dict to insert statement first. What would be the required arguments to `on_duplicate_key_update` in this case? – atuljangra Sep 02 '21 at 18:17
Based on phsource's answer, and for the specific use-case of using MySQL and completely overriding the data for the same key without performing a DELETE statement, one can use the following @compiles decorated insert expression:
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert
@compiles(Insert)
def append_string(insert, compiler, **kw):
s = compiler.visit_insert(insert, **kw)
if insert.kwargs.get('on_duplicate_key_update'):
fields = s[s.find("(") + 1:s.find(")")].replace(" ", "").split(",")
generated_directive = ["{0}=VALUES({0})".format(field) for field in fields]
return s + " ON DUPLICATE KEY UPDATE " + ",".join(generated_directive)
return s
-
This example doesn't escape the field values very well. You should probably use the built-in escape methods: https://stackoverflow.com/a/25107658/319066 – phsource Dec 25 '18 at 20:06
-
1@phsource notice that in this example we override fields with their values from the original `INSERT` (referring to the field names and not the values), so no escaping is needed. Obviously, using the now-part-of-the-ORM functionality is better (unless using it with `INSERT FROM SELECT` which does not work as expected) – sheba Dec 28 '18 at 13:27
It's depends upon you. If you want to replace then pass OR REPLACE in prefixes
def bulk_insert(self,objects,table):
#table: Your table class and objects are list of dictionary [{col1:val1, col2:vale}]
for counter,row in enumerate(objects):
inserter = table.__table__.insert(prefixes=['OR IGNORE'], values=row)
try:
self.db.execute(inserter)
except Exception as E:
print E
if counter % 100 == 0:
self.db.commit()
self.db.commit()
Here commit interval can be changed to speed up or speed down
- 2,558
- 18
- 18
Got a simpler solution:
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert
@compiles(Insert)
def replace_string(insert, compiler, **kw):
s = compiler.visit_insert(insert, **kw)
s = s.replace("INSERT INTO", "REPLACE INTO")
return s
my_connection.execute(my_table.insert(replace_string=""), my_values)
- 526
- 3
- 9
-
11Take care. `REPLACE INTO` and `INSERT ... ON DUPLICATE KEY UPDATE` do different things. – Dennis S Hennen Mar 26 '14 at 14:21
-
3Notably, it *deletes* the row, so this solution is usually terribly useless on `InnoDB` (or any other transactional engine) tables, as it chokes on most any `FOREIGN KEY` constraint – Naltharial Mar 31 '14 at 12:07
-
It works fine with MySql. Having said that, I do not have any foreign keys on that table. – algarecu Oct 15 '15 at 09:59
I just used plain sql as:
insert_stmt = "REPLACE INTO tablename (column1, column2) VALUES (:column_1_bind, :columnn_2_bind) "
session.execute(insert_stmt, data)
- 117
- 1
- 3
My way
import typing
from datetime import datetime
from sqlalchemy.dialects import mysql
class MyRepository:
def model(self):
return MySqlAlchemyModel
def upsert(self, data: typing.List[typing.Dict]):
if not data:
return
model = self.model()
if hasattr(model, 'created_at'):
for item in data:
item['created_at'] = datetime.now()
stmt = mysql.insert(getattr(model, '__table__')).values(data)
for_update = []
for k, v in data[0].items():
for_update.append(k)
dup = {k: getattr(stmt.inserted, k) for k in for_update}
stmt = stmt.on_duplicate_key_update(**dup)
self.db.session.execute(stmt)
self.db.session.commit()
Usage:
myrepo.upsert([
{
"field11": "value11",
"field21": "value21",
"field31": "value31",
},
{
"field12": "value12",
"field22": "value22",
"field32": "value32",
},
])
- 9,021
- 7
- 52
- 82
The other answers have this covered but figured I'd reference another good example for mysql I found in this gist. This also includes the use of LAST_INSERT_ID, which may be useful depending on your innodb auto increment settings and whether your table has a unique key. Lifting the code here for easy reference but please give the author a star if you find it useful.
from app import db
from sqlalchemy import func
from sqlalchemy.dialects.mysql import insert
def upsert(model, insert_dict):
"""model can be a db.Model or a table(), insert_dict should contain a primary or unique key."""
inserted = insert(model).values(**insert_dict)
upserted = inserted.on_duplicate_key_update(
id=func.LAST_INSERT_ID(model.id), **{k: inserted.inserted[k]
for k, v in insert_dict.items()})
res = db.engine.execute(upserted)
return res.lastrowid
- 1,670
- 12
- 15
As none of these solutions seem all the elegant. A brute force way is to query to see if the row exists. If it does delete the row and then insert otherwise just insert. Obviously some overhead involved but it does not rely on modifying the raw sql and it works on non orm stuff.
- 205
- 3
- 4