0

Hi I have the following example insert query:

Took it from here:

class E2E:
    def __init__(self):
        self.logger = logging.getLogger(self.__class__.__name__)
        self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
        self._db_client = MySQLdb.connect(
        host='127.0.0.1', user='root', passwd='local1234', db='innodb')
        self._db_client.autocommit(True)

        self._fields: Set[str] = {'rowId', 'sessionId', 'request', 'taskId', 's3Pointer'}

    def execute_insert(self) -> None:
        query = self._get_query()
        cursor = self._db_client.cursor(cursorclass=None)

        docs_affected = cursor.execute(query)
        self._db_client.commit()
        cursor.close()
        if docs_affected == 0:
            raise Exception("A newer version has already been added to the database")

    def _get_query(self) -> str:
        return """insert into upsert_table
                (unkey1, unkey2, val1, val2, lastmodified)
            values
                ('A', '20', 'Updated Meeting B', 'room 1-10', UNIX_TIMESTAMP('2018-08-25 20:00:13')),
                ('B', '22', 'Should not update Meeting B', 'room 2-20', UNIX_TIMESTAMP('2017-08-25 19:00:02')),
                ('F', '66', 'New Meeting F', 'room 6-6', UNIX_TIMESTAMP('2017-08-25 20:00:06'))
            on duplicate key update
                val1 = if (lastmodified<values(lastmodified), values(val1), val1),
                val2 = if (lastmodified< values(lastmodified), values(val2), val2),
                lastmodified = if (lastmodified<values(lastmodified), values(lastmodified), lastmodified);
                """


cls = E2E()
cls.execute_insert()

I would like to implement the same behavior with SQLalchemy. So I saw this

But I failed to find how to have a condition inside a column,

Example of code that I have tried to write:

engine = create_engine("mysql://root:local1234@127.0.0.1:3306/innodb",
                                    encoding='latin1', echo=True)

session = Session(engine)



class UpsertTable2(Base):
     __tablename__ = 'upsert_table2'

     id = Column(INTEGER(unsigned=True), primary_key=True)
     unkey1 = Column(String(32), nullable=False)
     unkey2 = Column(String(32), nullable=False)
     val1 = Column(String(32))
     val2 = Column(String(32))
     lastmodified = Column(INTEGER(unsigned=True))
     __table_args__ = (UniqueConstraint('unkey1', 'unkey2', name='unkey1_2_uix'),)


     def __repr__(self):
         return f"id:{self.id}, unkey1:{self.unkey1}, unkey2:{self.unkey2}"


def sqlalchemy_to_dict(model: Base) -> Dict[String,Any]:
    model_fields_and_state = model.__dict__
    model_fields_and_state.pop('_sa_instance_state')
    return model_fields_and_state


Base.metadata.create_all(engine)

example1 = UpsertTable2(unkey1='A', unkey2='20', val1="Updated Meeting A", val2="room 1-10", lastmodified=1)

stmt1 = insert(UpsertTable2)
as_dict = sqlalchemy_to_dict(example1)
stmt2 = stmt1.values(as_dict)
print(stmt2)

on_duplicate_key_stmt = stmt2.on_duplicate_key_update(
    val1=f"if (lastmodified<{stmt2.inserted.lastmodified},{stmt2.inserted.val1}, val1)",
    val2=f"if (lastmodified<{stmt2.inserted.lastmodified},{stmt2.inserted.val2}, val2)",
    lastmodified=f"if (lastmodified<{stmt2.inserted.lastmodified},{stmt2.inserted.lastmodified}, lastmodified)"
)
print(on_duplicate_key_stmt)

connection = engine.connect()

connection.execute(on_duplicate_key_stmt)
Ehud Lev
  • 1,809
  • 19
  • 31

0 Answers0