""" mbed SDK Copyright (c) 2011-2014 ARM Limited Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. Author: Przemyslaw Wirkus """ import re import MySQLdb as mdb # Imports from TEST API from workspace_tools.test_db import BaseDBAccess class MySQLDBAccess(BaseDBAccess): """ Wrapper for MySQL DB access for common test suite interface """ def __init__(self): BaseDBAccess.__init__(self) self.DB_TYPE = 'mysql' def detect_database(self, verbose=False): """ detect database and return VERION data structure or string (verbose=True) """ query = 'SHOW VARIABLES LIKE "%version%"' rows = self.select_all(query) if verbose: result = [] for row in rows: result.append("\t%s: %s"% (row['Variable_name'], row['Value'])) result = "\n".join(result) else: result = rows return result def parse_db_connection_string(self, str): """ Parsing SQL DB connection string. String should contain: - DB Name, user name, password, URL (DB host), name Function should return tuple with parsed (host, user, passwd, db) or None if error E.g. connection string: 'mysql://username:password@127.0.0.1/db_name' """ result = BaseDBAccess().parse_db_connection_string(str) if result is not None: (db_type, username, password, host, db_name) = result if db_type != 'mysql': result = None return result def is_connected(self): """ Returns True if we are connected to database """ return self.db_object is not None def connect(self, host, user, passwd, db): """ Connects to DB and returns DB object """ try: self.db_object = mdb.connect(host=host, user=user, passwd=passwd, db=db) # Let's remember connection credentials self.db_type = self.DB_TYPE self.host = host self.user = user self.passwd = passwd self.db = db except mdb.Error, e: print "Error %d: %s"% (e.args[0], e.args[1]) self.db_object = None self.db_type = None self.host = None self.user = None self.passwd = None self.db = None def connect_url(self, db_url): """ Connects to database using db_url (database url parsing), store host, username, password, db_name """ result = self.parse_db_connection_string(db_url) if result is not None: (db_type, username, password, host, db_name) = result if db_type == self.DB_TYPE: self.connect(host, username, password, db_name) def reconnect(self): """ Reconnects to DB and returns DB object using stored host name, database name and credentials (user name and password) """ self.connect(self.host, self.user, self.passwd, self.db) def disconnect(self): """ Close DB connection """ if self.db_object: self.db_object.close() self.db_object = None self.db_type = None def escape_string(self, str): """ Escapes string so it can be put in SQL query between quotes """ con = self.db_object result = con.escape_string(str) return result if result else '' def select_all(self, query): """ Execute SELECT query and get all results """ con = self.db_object cur = con.cursor(mdb.cursors.DictCursor) cur.execute(query) rows = cur.fetchall() return rows def insert(self, query, commit=True): """ Execute INSERT query, define if you want to commit """ con = self.db_object cur = con.cursor() cur.execute(query) if commit: con.commit() return cur.lastrowid def get_next_build_id(self, name, desc='', location='', type=None, status=None): """ Insert new build_id (DB unique build like ID number to send all test results) """ if status is None: status = self.BUILD_ID_STATUS_STARTED if type is None: type = self.BUILD_ID_TYPE_TEST query = """INSERT INTO `%s` (%s_name, %s_desc, %s_location, %s_type_fk, %s_status_fk) VALUES ('%s', '%s', '%s', %d, %d)"""% (self.TABLE_BUILD_ID, self.TABLE_BUILD_ID, self.TABLE_BUILD_ID, self.TABLE_BUILD_ID, self.TABLE_BUILD_ID, self.TABLE_BUILD_ID, self.escape_string(name), self.escape_string(desc), self.escape_string(location), type, status) index = self.insert(query) # Provide inserted record PK return index def get_table_entry_pk(self, table, column, value, update_db=True): """ Checks for entries in tables with two columns (_pk, ) If update_db is True updates table entry if value in specified column doesn't exist """ # TODO: table buffering result = None table_pk = '%s_pk'% table query = """SELECT `%s` FROM `%s` WHERE `%s`='%s'"""% (table_pk, table, column, self.escape_string(value)) rows = self.select_all(query) if len(rows) == 1: result = rows[0][table_pk] elif len(rows) == 0 and update_db: # Update DB with new value result = self.update_table_entry(table, column, value) return result def update_table_entry(self, table, column, value): """ Updates table entry if value in specified column doesn't exist Locks table to perform atomic read + update """ result = None con = self.db_object cur = con.cursor() cur.execute("LOCK TABLES `%s` WRITE"% table) table_pk = '%s_pk'% table query = """SELECT `%s` FROM `%s` WHERE `%s`='%s'"""% (table_pk, table, column, self.escape_string(value)) cur.execute(query) rows = cur.fetchall() if len(rows) == 0: query = """INSERT INTO `%s` (%s) VALUES ('%s')"""% (table, column, self.escape_string(value)) cur.execute(query) result = cur.lastrowid con.commit() cur.execute("UNLOCK TABLES") return result def update_build_id_info(self, build_id, **kw): """ Update additional data inside build_id table Examples: db.update_build_id_info(build_id, _status_fk=self.BUILD_ID_STATUS_COMPLETED, _shuffle_seed=0.0123456789): """ if len(kw): con = self.db_object cur = con.cursor() # Prepare UPDATE query # ["`mtest_build_id_pk`=[value-1]", "`mtest_build_id_name`=[value-2]", "`mtest_build_id_desc`=[value-3]"] set_list = [] for col_sufix in kw: assign_str = "`%s%s`='%s'"% (self.TABLE_BUILD_ID, col_sufix, self.escape_string(str(kw[col_sufix]))) set_list.append(assign_str) set_str = ', '.join(set_list) query = """UPDATE `%s` SET %s WHERE `mtest_build_id_pk`=%d"""% (self.TABLE_BUILD_ID, set_str, build_id) cur.execute(query) con.commit() def insert_test_entry(self, build_id, target, toolchain, test_type, test_id, test_result, test_output, test_time, test_timeout, test_loop, test_extra=''): """ Inserts test result entry to database. All checks regarding existing toolchain names in DB are performed. If some data is missing DB will be updated """ # Get all table FK and if entry is new try to insert new value target_fk = self.get_table_entry_pk(self.TABLE_TARGET, self.TABLE_TARGET + '_name', target) toolchain_fk = self.get_table_entry_pk(self.TABLE_TOOLCHAIN, self.TABLE_TOOLCHAIN + '_name', toolchain) test_type_fk = self.get_table_entry_pk(self.TABLE_TEST_TYPE, self.TABLE_TEST_TYPE + '_name', test_type) test_id_fk = self.get_table_entry_pk(self.TABLE_TEST_ID, self.TABLE_TEST_ID + '_name', test_id) test_result_fk = self.get_table_entry_pk(self.TABLE_TEST_RESULT, self.TABLE_TEST_RESULT + '_name', test_result) con = self.db_object cur = con.cursor() query = """ INSERT INTO `%s` (`mtest_build_id_fk`, `mtest_target_fk`, `mtest_toolchain_fk`, `mtest_test_type_fk`, `mtest_test_id_fk`, `mtest_test_result_fk`, `mtest_test_output`, `mtest_test_time`, `mtest_test_timeout`, `mtest_test_loop_no`, `mtest_test_result_extra`) VALUES (%d, %d, %d, %d, %d, %d, '%s', %.2f, %.2f, %d, '%s')"""% (self.TABLE_TEST_ENTRY, build_id, target_fk, toolchain_fk, test_type_fk, test_id_fk, test_result_fk, self.escape_string(test_output), test_time, test_timeout, test_loop, self.escape_string(test_extra)) cur.execute(query) con.commit()