Data warehouse in MySQL
New challange – data warehouse for the purpose of specific customer (not sales aggregation).
Before I started to read know how about data warehouse I prepared my own plan. The result of basic structure was almost fantastic, I only had to borrow the names for the elements from official sources.
The type of data warehouse is obviously ROLAP (Relational OLAP).
In short, the basic table containing aggregated data is fact table, dictionary tables (separately for data warehouse needs) are called dimension tables.
The typical data structure is “star” or “snow corn” – this illustrates the difference between’em.
ETL for Fact Tables
Due to the fact that the source and destination environment is homogenous, I decide to use triggers to register changes and save them in two tables representing old and new data (it cannot be made with one procedure for all fact tables – because it requires to execute dynamic queries, which is impossible in triggers).
I work with table `product_to_category`:
mysql> CREATE TABLE IF NOT EXISTS `product_to_category` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`product_id` int(11) NOT NULL,
`category_id` int(11) NOT NULL,
PRIMARY KEY (`product_id`,`category_id`),
UNIQUE KEY `id_unique` (`id`)
) ENGINE=MyISAM DEFAULT CHARSET=latin1 AUTO_INCREMENT=6 ;
mysql> desc product_to_category;
+-------------+------------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------------+------------------+------+-----+---------+----------------+
| id | int(10) unsigned | NO | UNI | NULL | auto_increment |
| product_id | int(11) | NO | PRI | NULL | |
| category_id | int(11) | NO | PRI | NULL | |
+-------------+------------------+------+-----+---------+----------------+
Procedure used to create tables as storage for old and new data:
DROP PROCEDURE IF EXISTS `OLAP_DDL_SHADOW_TABLE_MAKE`//
CREATE PROCEDURE `OLAP_DDL_SHADOW_TABLE_MAKE`(IN SourceTable VARCHAR(250), IN ShadowType VARCHAR(10))
MAIN_BLOCK: BEGIN
DECLARE _ShadowNameOld VARCHAR(255);
DECLARE _ShadowNameNew VARCHAR(255);
DECLARE _table VARCHAR(200);
DECLARE _schema VARCHAR(32);
DECLARE DestinationSchema VARCHAR(20) DEFAULT 'mms_olap';
SELECT SUBSTRING_INDEX(TRIM(SourceTable), ".", 1) INTO _schema;
SELECT SUBSTRING_INDEX(TRIM(SourceTable), ".", -1) INTO _table;
IF (CHAR_LENGTH(_schema)) < 4 OR (CHAR_LENGTH(_table)) < 4 THEN
SELECT "Unknown table or schema. Required 3 chars as the minimum." FROM DUAL;
LEAVE MAIN_BLOCK;
END IF;
IF ShadowType NOT IN ('fact', 'dimension') THEN
SELECT "Error. Uknown shadow type (fact|dimension)" FROM DUAL;
LEAVE MAIN_BLOCK;
END IF;
SET _ShadowNameOld = CONCAT("etl_", _table, "_old");
SET _ShadowNameNew = CONCAT("etl_", _table, "_new");
SET @DeleteQuery = CONCAT("DROP /* OLAP_DDL_SHADOW_TABLE_MAKE */ TABLE IF EXISTS `", DestinationSchema, "`.`", _ShadowNameNew, "`;");
PREPARE MyStmt FROM @DeleteQuery;
EXECUTE MyStmt;
SET @DeleteQuery = CONCAT("DROP /* OLAP_DDL_SHADOW_TABLE_MAKE */ TABLE IF EXISTS `", DestinationSchema, "`.`", _ShadowNameOld, "`;");
PREPARE MyStmt FROM @DeleteQuery;
EXECUTE MyStmt;
IF ShadowType = 'fact' THEN
SET @CreateQuery = CONCAT("CREATE TABLE /* OLAP_DDL_SHADOW_TABLE_MAKE */ `", DestinationSchema, "`.`", _ShadowNameNew, "`
(olap_id INT UNSIGNED NOT NULL COMMENT 'Internal OLAP Key', `olap_timestamp` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Insert timestamp',
PRIMARY KEY(`olap_id`, `olap_timestamp`), INDEX `prsn_timestamp_idx` ( `olap_timestamp` )) partition by hash(olap_id) partitions 4 SELECT * FROM `", _schema,"`.`", _table,"` LIMIT 0;");
PREPARE MyStmt FROM @CreateQuery;
EXECUTE MyStmt;
SET @CreateQuery = CONCAT("CREATE TABLE /* OLAP_DDL_SHADOW_TABLE_MAKE */ `", DestinationSchema, "`.`", _ShadowNameOld, "`
(olap_id INT UNSIGNED NOT NULL COMMENT 'Internal OLAP Key', `olap_timestamp` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Insert timestamp',
PRIMARY KEY(`olap_id`, `olap_timestamp`), INDEX `prsn_timestamp_idx` ( `olap_timestamp` )) partition by hash(olap_id) partitions 4 SELECT * FROM `", _schema,"`.`", _table,"` LIMIT 0;");
PREPARE MyStmt FROM @CreateQuery;
EXECUTE MyStmt;
ELSEIF ShadowType = 'dimension' THEN
-- we need only new data
SET @CreateQuery = CONCAT("CREATE TABLE /* OLAP_DDL_SHADOW_TABLE_MAKE */ `", DestinationSchema, "`.`", _ShadowNameNew, "` SELECT * FROM `", _schema,"`.`", _table,"` LIMIT 0;");
PREPARE MyStmt FROM @CreateQuery;
EXECUTE MyStmt;
SET @CreateQuery = CONCAT("ALTER TABLE /* OLAP_DDL_SHADOW_TABLE_MAKE */ `", DestinationSchema, "`.`", _ShadowNameNew, "` ADD COLUMN olap_id VARCHAR(200) NOT NULL COMMENT 'Internal OLAP Key', ADD COLUMN `olap_timestamp` DATE NOT NULL COMMENT 'Insert/Change date', ADD PRIMARY KEY(`olap_id`(200), `olap_timestamp`), ADD INDEX `prsn_timestamp_idx` ( `olap_timestamp` ) ;");
PREPARE MyStmt FROM @CreateQuery;
EXECUTE MyStmt;
END IF;
END MAIN_BLOCK;
Now, if I want to create shadow tables for my fact table source I write following instruction:
CALL OLAP_DDL_SHADOW_TABLE_MAKE('product_to_category', 'fact');
and I obtain two, self descripting tables (etl_product_to_category_old, etl_product_to_category_new).
In order to capture changes of data I use very similar trigger for UPDATE Timing:
BEFORE:
DROP TRIGGER IF EXISTS `PRODUCT_TO_CATEGORY_BU`//
CREATE DEFINER = root@localhost TRIGGER PRODUCT_TO_CATEGORY_BU BEFORE UPDATE ON `product_to_category`
FOR EACH ROW
BEGIN
SET @OLAP_TIMESTAMP = CURRENT_TIMESTAMP;
-- Replace instead of insert due to the fact, that newer values are more important
REPLACE INTO /* OLAP_ShadowTableFill */ `etl_product_to_category_old` SELECT OLD.Id, @OLAP_TIMESTAMP, `product_to_category`.* FROM `product_to_category` WHERE Id = OLD.Id ;
END ;
AFTER:
DROP TRIGGER IF EXISTS `PRODUCT_TO_CATEGORY_AU`//
CREATE DEFINER = root@localhost TRIGGER PRODUCT_TO_CATEGORY_AU AFTER UPDATE ON `product_to_category`
FOR EACH ROW
BEGIN
-- [update] Do not update old date (with REPLACE). Problem is when there is an update changing data and next update causes there is not difference between new and old record.
INSERT IGNORE INTO /* OLAP_ShadowTableFill */ `etl_product_to_category_new` SELECT OLD.Id, @OLAP_TIMESTAMP /* source of variable is in BEFORE Timing trigger */,`product_to_category`.* FROM `product_to_category` WHERE Id = NEW.Id ;
END ;
How it works? I insert three records:
INSERT INTO `product_to_category` VALUES (1, 1), (2, 1), (3, 2) ;
Next, update it:
UPDATE `product_to_category` SET `category_id` = 2 WHERE `product_id` =1;
and now I can read changes from shadow tables as follows:
SELECT `OLD`.Id, `NEW`.category_id `New category`, `OLD`.category_id `Old category`
FROM `etl_product_to_category_new` `NEW`
INNER JOIN `etl_product_to_category_old` `OLD` ON `OLD`.`olap_id`=`NEW`.`olap_id` AND `OLD`.`olap_timestamp` = `NEW`.`olap_timestamp`;
As you can see I allow maximum 1 change per 1 second and it is sufficient for me. I used REPLACE function in order to ensure that the last change will be registered (in contrary I could use INSERT IGNORE).
It is the source which is used to fulfill three (3) fact tables – conforming to my needs. Such data extraction has also its own name: Change Data Capture (CDC).
My pattern is described in a/m link as “Triggers on tables”. The only difference is that I use a common procedure to log changes in partiotioned tables with names “etl_*_old/new” which are simultanously queue tables.
Event that I wrote is for my specific purpose (it includes quasi transactions implemented on MyISAM, etc.) so there is no need to present it here. I would like to add that I use another shadow table for fact table which contains relation between last olap_id and and fact_id – thanks to this I know which record should be decremented.
ETL for Dimension Tables.
The situation where data of dictionary tables also are changed has its own name: Slowly Changing Dimension . As the name suggests this type od data should be changed slowly, so I introduced limitation – one day as the minimum interval for change registration.
I create an adequate table for `category` with procedure OLAP_ShadowTableMake:
call OLAP_ShadowTableMake('category', 'dimension');
Now I receive following the table:
mysql> desc etl_category_new;
+----------------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+----------------+--------------+------+-----+---------+-------+
| id | int(11) | NO | | 0 | |
| name | varchar(200) | NO | | NULL | |
| olap_id | varchar(200) | NO | PRI | NULL | |
| olap_timestamp | date | NO | PRI | NULL | |
+----------------+--------------+------+-----+---------+-------+
4 rows in set (0.02 sec)
The only difference is the data type for olap_timestamp – it stemps from the fact I am not interested in changes which are made more frequent then one day (it is slowly changing dimension).
I add triggers for INSERT and UPDATE with timing AFTER, e.g.
DROP TRIGGER IF EXISTS `OLAP_DIMMENSION_CATEGORY_AI`//
CREATE DEFINER = root@localhost TRIGGER `OLAP_DIMMENSION_CATEGORY_AI` AFTER INSERT ON `category`
FOR EACH ROW
BEGIN
SET @OLAP_TIMESTAMP:=CURRENT_DATE;
REPLACE INTO /* OLAP */ `category_shadow_new` SELECT *, NEW.Id, @OLAP_TIMESTAMP FROM `category` WHERE id = NEW.id ;
END ;
Next common process for ETL is to Transform and Load data, I use Event Schedulre for this purpose.
I used type4 of SCD and add field which contains number of versions.
This is example of my event:
SET NAMES utf8//
DROP EVENT IF EXISTS OLAP_DIMENSION_CATEGORY//
CREATE DEFINER=root@localhost EVENT OLAP_DIMENSION_CATEGORY
ON SCHEDULE EVERY 15 SECOND
STARTS '2011-01-17 15:20:00' DO
BEGIN
DECLARE LeaveProcedure BOOLEAN DEFAULT FALSE;
DECLARE done INT DEFAULT 0;
DECLARE EventId TINYINT UNSIGNED DEFAULT 1;
DECLARE EventStatus TINYINT DEFAULT 0;
DECLARE `MyTimestamp` TIMESTAMP;
DECLARE `OlapTimestamp` DATE;
DECLARE CategoryId INT UNSIGNED DEFAULT 0;
DECLARE CategoryName VARCHAR(200) DEFAULT '';
DECLARE OlapId VARCHAR(200) DEFAULT '';
DECLARE MyQueue CURSOR FOR Select olap_id, olap_timestamp, name, id From `etl_category_new`; /* WHERE olap_timestamp > '0000-01-01' It is myisam and lock for this event */
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = 1;
DECLARE CONTINUE HANDLER FOR SQLWARNING BEGIN END;
DECLARE CONTINUE HANDLER FOR SQLEXCEPTION
BEGIN
END;
SELECT `status`, `update_timestamp` INTO EventStatus, MyTimestamp From `sys_event_lock` WHERE `id` = 1;
IF LeaveProcedure = FALSE AND ((EventStatus IS NULL OR EventStatus = 0) OR (MyTimestamp < DATE_ADD(NOW(), INTERVAL -5 MINUTE /* Max execution time */ ))) THEN
-- START TRANSACTION;
UPDATE `sys_event_lock` SET `status` = 1 WHERE `id` = 1;
-- COMMIT;
MAIN_BLOCK: BEGIN
DECLARE EXIT HANDLER FOR SQLEXCEPTION
BEGIN
-- ROLLBACK;
-- START TRANSACTION;
UPDATE `sys_event_lock` SET error_timestamp=CURRENT_TIMESTAMP, error_messages = CONCAT_WS("|", error_messages, @LAST_COMMAND) WHERE `id` = 1;
-- COMMIT;
END;
-- START TRANSACTION;
SET done = 0;
OPEN MyQueue;
REPEAT FETCH MyQueue INTO OlapId, OlapTimestamp, CategoryName, CategoryId;
IF done = 0 THEN
SET @LAST_COMMAND := CONCAT(
"INSERT IGNORE /* OLAP */ INTO olap_category_as_dimension (`id`, `source_id`, `name`, `valid_from`) VALUES(", CONCAT_WS(",", 'NULL', CategoryId, QUOTE(CategoryName), QUOTE(OlapTimestamp) ), ");"
);
PREPARE MyStmt FROM @LAST_COMMAND;
EXECUTE MyStmt;
-- does anything was changed?
IF (ROW_COUNT() = 0) THEN
-- Move or update record in history
SET @LAST_COMMAND := CONCAT("INSERT INTO olap_category_as_dimension_history (id, source_id, `name`, valid_from, valid_to) SELECT id, source_id, name, valid_from, ", QUOTE(date_sub(CURRENT_DATE, INTERVAL 1 DAY)) /* valid till yesterday*/ ," FROM olap_category_as_dimension
WHERE source_id= ", CategoryId," ON DUPLICATE KEY UPDATE /* Only category attributes */ `name`=VALUES(`name`); ");
PREPARE MyStmt FROM @LAST_COMMAND;
EXECUTE MyStmt;
-- Replace record with new values
SET @LAST_COMMAND := CONCAT(
"REPLACE /* OLAP */ INTO olap_category_as_dimension (`id`, `source_id`, `name`, `valid_from`, version)
VALUES(", CONCAT_WS(",", 'NULL', CategoryId, QUOTE(CategoryName), QUOTE(OlapTimestamp)), ", (SELECT COUNT(*) FROM olap_category_as_dimension_history WHERE source_id=", CategoryId, "));" );
PREPARE MyStmt FROM @LAST_COMMAND;
EXECUTE MyStmt;
END IF;
SET @LAST_COMMAND := CONCAT(
"DELETE From `category_shadow_new` /* OLAP */ WHERE olap_id=", QUOTE(OlapId), " AND olap_timestamp= ", QUOTE(OlapTimestamp), " ;"
);
PREPARE MyStmt FROM @LAST_COMMAND;
EXECUTE MyStmt;
END IF;
UNTIL done = 1 END REPEAT;
CLOSE MyQueue;
-- COMMIT;
END MAIN_BLOCK;
-- START TRANSACTION;
UPDATE `sys_event_lock` SET `status` = 0, `duration` = UNIX_TIMESTAMP(CURRENT_TIMESTAMP) - UNIX_TIMESTAMP(update_timestamp) WHERE `id` = EventId;
UPDATE `sys_event_lock` SET `duration_total` = `duration` + `duration_total` WHERE `id` = EventId;
-- COMMIT;
END IF;
END;
In order to fulfill fact table with old data I have to prepare a separate process. It can be also useful in the case of disaster and weekly data reconstruction.
No comments yet.