Хотя тема секционирования уже поднималась ранее , хочу вернуться к ней, чтобы рассказать о своем опыте решения этой задачи, возникшей в связи с необходимостью аналитической обработки больших объемов данных.
Помимо секционирования я рассмотрю крайне упрощенную реализацию «снимков» агрегированных запросов, которые автоматически обновляются при изменении исходных данных.
Одним из основных требований к разрабатываемой системе было использование свободного программного обеспечения, поэтому выбор пал на PostgreSQL. На момент начала работы над проектом я знал PostgreSQL достаточно поверхностно, но был вполне знаком с возможностями Oracle Database. Поскольку речь шла об аналитической обработке, хотелось иметь аналоги таких вариантов Oracle, как Разделение И Материализованные представления .
После ознакомления с возможностями PostgreSQL , стало понятно, что этот функционал так или иначе придется писать вручную.
Разумеется, речь не шла ни о какой полноценной реализации Materialized Views, обеспечивающей переписывание запросов .
Для моих нужд возможности создавать автоматически обновляемые агрегированные однотабличные выборки оказалось вполне достаточно (поддержка объединения таблиц, скорее всего, будет добавлена в ближайшее время).
Для разбиения я планировал использовать неоднократно описанный подход, используя унаследованный таблицы, вставка данных которых контролируется триггером.
У меня была идея использовать его для управления разделами Правила , но я от него отказался, поскольку в моем случае преобладала вставка данных в одиночные записи.
Начал я, конечно, с таблиц для хранения метаданных: ps_tables.sql
Здесь все совершенно очевидно.create sequence ps_table_seq; create table ps_table ( id bigint default nextval('ps_table_seq') not null, name varchar(50) not null unique, primary key(id) ); create sequence ps_column_seq; create table ps_column ( id bigint default nextval('ps_column_seq') not null, table_id bigint not null references ps_table(id), name varchar(50) not null, parent_name varchar(50), type_name varchar(8) not null check (type_name in ('date', 'key', 'nullable', 'sum', 'min', 'max', 'cnt')), unique (table_id, name), primary key(id) ); create table ps_range_partition ( table_id bigint not null references ps_table(id), type_name varchar(10) not null check (type_name in ('day', 'week', 'month', 'year')), start_value date not null, end_value date not null, primary key(table_id, start_value) ); create table ps_snapshot ( snapshot_id bigint not null references ps_table(id), table_id bigint not null references ps_table(id), type_name varchar(10) not null check (type_name in ('day', 'week', 'month', 'year')), primary key(snapshot_id) );
Единственное, что стоит упомянуть, это типы столбцов:
Тип | Описание |
дата | Столбец, содержащий календарную дату, используемый при секционировании и агрегировании данных (поддерживаются типы даты и времени PostgreSQL).
|
ключ | Ключ, используемый в предложении group by при агрегировании данных (поддерживаются все целочисленные типы PostgreSQL).
|
обнуляемый | Ключ, используемый при агрегировании данных, возможно, содержащий нулевое значение.
|
сумма | Сумма значений |
мин | Минимальное значение |
Макс | Максимальное значение |
CNT | Подсчет количества ненулевых значений |
create or replace function ps_trigger_regenerate(in p_table bigint) returns void
as $$
declare
l_sql text;
l_table_name varchar(50);
l_date_column varchar(50);
l_flag boolean;
tabs record;
columns record;
begin
select name into l_table_name
from ps_table where id = p_table;
l_sql :=
'create or replace function ps_' || l_table_name || '_insert_trigger() returns trigger ' ||
'as $'|| '$ ' ||
'begin ';
for tabs in
select a.snapshot_id as id,
b.name as table_name,
a.type_name as snapshot_type
from ps_snapshot a, ps_table b
where a.table_id = p_table
and b.id = a.snapshot_id
loop
l_flag = FALSE;
l_sql := l_sql ||
'update ' || tabs.table_name || ' set ';
for columns in
select name, parent_name, type_name
from ps_column
where table_id = tabs.id
and not type_name in ('date', 'key', 'nullable')
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
if columns.type_name = 'sum' then
l_sql := l_sql ||
columns.name || ' = ' || columns.name || ' + coalesce(NEW.' || columns.parent_name || ', 0) ';
end if;
if columns.type_name = 'min' then
l_sql := l_sql ||
columns.name || ' = least(coalesce(' || columns.name || ', NEW.' || columns.parent_name || '), coalesce(NEW.' || columns.parent_name || ', ' || columns.name || ')) ';
end if;
if columns.type_name = 'max' then
l_sql := l_sql ||
columns.name || ' = greatest(coalesce(' || columns.name || ', NEW.' || columns.parent_name || '), coalesce(NEW.' || columns.parent_name || ', ' || columns.name || ')) ';
end if;
if columns.type_name = 'cnt' then
l_sql := l_sql ||
columns.name || ' = ' || columns.name || ' + case when NEW.' || columns.parent_name || ' is null then 0 else 1 end ';
end if;
end loop;
l_flag = FALSE;
l_sql := l_sql || 'where ';
for columns in
select name, parent_name, type_name
from ps_column
where table_id = tabs.id
and type_name in ('date', 'key', 'nullable')
loop
if l_flag then
l_sql := l_sql || 'and ';
end if;
l_flag := TRUE;
if columns.type_name = 'date' then
l_sql := l_sql ||
columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') ';
end if;
if columns.type_name = 'key' then
l_sql := l_sql ||
columns.name || ' = NEW.' || columns.parent_name || ' ';
end if;
if columns.type_name = 'nullable' then
l_sql := l_sql ||
columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)';
end if;
end loop;
l_sql := l_sql || '; ' ||
'if not FOUND then ' ||
'insert into ' || tabs.table_name || '(';
l_flag = FALSE;
for columns in
select name, type_name
from ps_column
where table_id = tabs.id
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
l_sql := l_sql || columns.name;
end loop;
l_sql := l_sql || ') values (';
l_flag = FALSE;
for columns in
select name, parent_name, type_name
from ps_column
where table_id = tabs.id
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
if columns.type_name = 'date' then
l_sql := l_sql || 'date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ')';
elsif columns.type_name = 'cnt' then
l_sql := l_sql || 'case when NEW.' || columns.parent_name || ' is null then 0 else 1 end';
elsif columns.type_name in ('nullable', 'sum') then
l_sql := l_sql || 'coalesce(NEW.' || columns.parent_name || ', 0)';
else
l_sql := l_sql || 'NEW.' || columns.parent_name;
end if;
end loop;
l_sql := l_sql || '); ' ||
'end if; ';
end loop;
select name into l_date_column
from ps_column
where table_id = p_table
and type_name = 'date';
for tabs in
select to_char(start_value, 'YYYYMMDD') as start_value,
to_char(end_value, 'YYYYMMDD') as end_value,
type_name
from ps_range_partition
where table_id = p_table
order by start_value desc
loop
l_sql := l_sql ||
'if NEW.' || l_date_column || ' >= to_date(''' || tabs.start_value || ''', ''YYYYMMDD'') and NEW.' || l_date_column || ' < to_date(''' || tabs.end_value || ''', ''YYYYMMDD'') then ' ||
'insert into ' || l_table_name || '_' || tabs.start_value || ' values (NEW.*); ' ||
'return null; ' ||
'end if; ';
end loop;
l_sql := l_sql ||
'return NEW; '||
'end; '||
'$'||'$ language plpgsql';
execute l_sql;
l_sql :=
'create or replace function ps_' || l_table_name || '_raise_trigger() returns trigger ' ||
'as $'|| '$ ' ||
'begin ' ||
'raise EXCEPTION ''Can''''t support % on MIN or MAX aggregate'', TG_OP;' ||
'end; '||
'$'||'$ language plpgsql';
execute l_sql;
l_sql :=
'create or replace function ps_' || l_table_name || '_delete_trigger() returns trigger ' ||
'as $'|| '$ ' ||
'begin ';
for tabs in
select a.snapshot_id as id,
b.name as table_name,
a.type_name as snapshot_type
from ps_snapshot a, ps_table b
where a.table_id = p_table
and b.id = a.snapshot_id
loop
l_flag = FALSE;
l_sql := l_sql ||
'update ' || tabs.table_name || ' set ';
for columns in
select name, parent_name, type_name
from ps_column
where table_id = tabs.id
and type_name in ('sum', 'cnt')
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
if columns.type_name = 'sum' then
l_sql := l_sql ||
columns.name || ' = ' || columns.name || ' - OLD.' || columns.parent_name || ' ';
end if;
if columns.type_name = 'cnt' then
l_sql := l_sql ||
columns.name || ' = ' || columns.name || ' - case when OLD.' || columns.parent_name || ' is null then 0 else 1 end ';
end if;
end loop;
l_flag = FALSE;
l_sql := l_sql || 'where ';
for columns in
select name, parent_name, type_name
from ps_column
where table_id = tabs.id
and type_name in ('date', 'key', 'nullable')
loop
if l_flag then
l_sql := l_sql || 'and ';
end if;
l_flag := TRUE;
if columns.type_name = 'date' then
l_sql := l_sql ||
columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') ';
end if;
if columns.type_name = 'key' then
l_sql := l_sql ||
columns.name || ' = NEW.' || columns.parent_name || ' ';
end if;
if columns.type_name = 'nullable' then
l_sql := l_sql ||
columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)';
end if;
end loop;
l_sql := l_sql || '; ';
end loop;
l_sql := l_sql ||
'return null; '||
'end; '||
'$'||'$ language plpgsql';
execute l_sql;
l_sql :=
'create or replace function ps_' || l_table_name || '_update_trigger() returns trigger ' ||
'as $'|| '$ ' ||
'begin ';
for tabs in
select a.snapshot_id as id,
b.name as table_name,
a.type_name as snapshot_type
from ps_snapshot a, ps_table b
where a.table_id = p_table
and b.id = a.snapshot_id
loop
l_flag = FALSE;
l_sql := l_sql ||
'update ' || tabs.table_name || ' set ';
for columns in
select name, parent_name, type_name
from ps_column
where table_id = tabs.id
and type_name in ('sum', 'cnt')
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
if columns.type_name = 'sum' then
l_sql := l_sql ||
columns.name || ' = ' || columns.name || ' - OLD.' || columns.parent_name || ' + NEW.' || columns.parent_name || ' ';
end if;
if columns.type_name = 'cnt' then
l_sql := l_sql ||
columns.name || ' = ' || columns.name ||
' - case when OLD.' || columns.parent_name || ' is null then 0 else 1 end ' ||
' + case when NEW.' || columns.parent_name || ' is null then 0 else 1 end ';
end if;
end loop;
l_flag = FALSE;
l_sql := l_sql || 'where ';
for columns in
select name, parent_name, type_name
from ps_column
where table_id = tabs.id
and type_name in ('date', 'key', 'nullable')
loop
if l_flag then
l_sql := l_sql || 'and ';
end if;
l_flag := TRUE;
if columns.type_name = 'date' then
l_sql := l_sql ||
columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') ';
end if;
if columns.type_name = 'key' then
l_sql := l_sql ||
columns.name || ' = NEW.' || columns.parent_name || ' ';
end if;
if columns.type_name = 'nullable' then
l_sql := l_sql ||
columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)';
end if;
end loop;
l_sql := l_sql || '; ';
end loop;
l_sql := l_sql ||
'return null; '||
'end; '||
'$'||'$ language plpgsql';
execute l_sql;
end;
$$ language plpgsql;
Несмотря на свой устрашающий внешний вид, эта функция достаточно проста.
Его задача — сгенерировать (на основе доступных метаданных) четыре функции, используемые при построении триггеров:
- ps_TABLE_insert_trigger() — Функция, управляющая вставкой данных
- ps_TABLE_update_trigger() — Функция, управляющая обновлением данных.
- ps_TABLE_delete_trigger() — Функция, управляющая удалением данных
- ps_TABLE_raise_trigger() — Функция, запрещающая обновление и удаление данных
Типичное определение функции ps_TABLE_insert_trigger() будет выглядеть следующим образом: create or replace function ps_data_insert_trigger() returns trigger
as $$
begin
update data_month set
sum_field = sum_field + NEW.sum_field
, min_field = least(min_field, NEW.min_field)
where date_field = date_trunc('month', NEW.date_field)
and key_field = NEW.key_field;
if not FOUND then
insert into data_month(date_field, key_field, sum_field, min_field)
values (date_trunc('month', NEW.date_field), NEW.key_field, NEW.sum_field, NEW.min_field);
end if;
if NEW.date_field >= to_date('20130101', 'YYYYMMDD') and
NEW.date_field < to_date('20130201', 'YYYYMMDD') then
insert into data_20130101 values (NEW.*);
return null;
end if;
return NEW;
end;
$$ language plpgsql;
На самом деле функция выглядит немного сложнее, поскольку нулевые значения обрабатываются особым образом.
Но в качестве иллюстрации приведенный пример вполне адекватен.
Логика этого кода очевидна:
- При вставке данных в исходную таблицу мы пытаемся обновить счетчики в агрегированном представлении data_month.
- Если это не помогло (в data_month запись не найдена), добавьте новую запись.
- Далее проверяем, что каждый раздел находится в диапазоне дат (в примере один раздел), и в случае успеха вставляем запись в соответствующий раздел (поскольку раздел наследуется от основной таблицы, можно смело использовать звездочка) и верните ноль, чтобы предотвратить вставку записи в основную таблицу.
- Если ни один из разделов не соответствует, верните NEW, разрешив вставку в основную таблицу.
На практике это весьма удобно.
Даже если мы не создадим раздел заранее или получим данные с неверной датой, вставка данных пройдет успешно.
Впоследствии вы сможете проанализировать содержимое основной таблицы, выполнив запрос: select * from only data
После этого создайте недостающие разделы (как будет показано ниже, данные будут автоматически перенесены из основной таблицы в созданный раздел).
В таких случаях количество записей, не попадающих в свой раздел, обычно невелико и затраты на передачу данных незначительны.
Теперь осталось сделать обвязку.
Начнем с функции создания нового раздела: ps_add_range_partition(varchar, varchar, varchar, дата) create or replace function ps_add_range_partition(in p_table varchar, in p_column varchar,
in p_type varchar, in p_start date) returns void
as $$
declare
l_sql text;
l_end date;
l_start_str varchar(10);
l_end_str varchar(10);
l_table bigint;
l_flag boolean;
columns record;
begin
perform 1
from ps_table a, ps_column b
where a.id = b.table_id and lower(a.name) = lower(p_table)
and b.type_name = 'date' and lower(b.name) <> lower(p_column);
if FOUND then
raise EXCEPTION 'Conflict DATE columns';
end if;
l_end := p_start + ('1 ' || p_type)::INTERVAL;
perform 1
from ps_table a, ps_range_partition b
where a.id = b.table_id and lower(a.name) = lower(p_table)
and (( p_start >= b.start_value and p_start < b.end_value ) or
( b.start_value >= p_start and b.start_value < l_end ));
if FOUND then
raise EXCEPTION 'Range intervals intersects';
end if;
perform 1
from ps_table
where lower(name) = lower(p_table);
if not FOUND then
insert into ps_table(name) values (lower(p_table));
end if;
select id into l_table
from ps_table
where lower(name) = lower(p_table);
perform 1
from ps_column
where table_id = l_table and type_name = 'date'
and lower(name) = lower(p_column);
if not FOUND then
insert into ps_column(table_id, name, type_name)
values (l_table, lower(p_column), 'date');
end if;
insert into ps_range_partition(table_id, type_name, start_value, end_value)
values (l_table, p_type, p_start, l_end);
l_start_str = to_char(p_start, 'YYYYMMDD');
l_end_str = to_char(l_end, 'YYYYMMDD');
l_sql :=
'create table ' || p_table || '_' || l_start_str || '(' ||
'check (' || p_column || ' >= to_date(''' || l_start_str || ''', ''YYYYMMDD'') and ' ||
p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')), ' ||
'primary key (';
l_flag := FALSE;
for columns in
select f.name as name
from ( select ps_array_to_set(a.conkey) as nn
from pg_constraint a, pg_class b
where b.oid = a.conrelid
and a.contype = 'p'
and b.relname = p_table ) c,
( select d.attname as name, d.attnum as nn
from pg_attribute d, pg_class e
where e.oid = d.attrelid
and e.relname = p_table ) f
where f.nn = c.nn
order by f.nn
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
l_sql := l_sql || columns.name;
end loop;
l_sql := l_sql ||
')) inherits (' || p_table || ')';
execute l_sql;
l_sql :=
'create index ' || p_table || '_' || l_start_str || '_date on ' || p_table || '_' || l_start_str || '(' || p_column || ')';
execute l_sql;
perform ps_trigger_regenerate(l_table);
execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table;
execute 'drop trigger if exists ps_' || p_table || '_after_update on ' || p_table;
execute 'drop trigger if exists ps_' || p_table || '_after_delete on ' || p_table;
l_sql :=
'insert into ' || p_table || '_' || l_start_str || ' ' ||
'select * from ' || p_table || ' where ' ||
p_column || ' >= to_date(''' || l_start_str || ''', ''YYYYMMDD'') and ' ||
p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')';
execute l_sql;
l_sql :=
'delete from only ' || p_table || ' where ' ||
p_column || ' >= to_date(''' || l_start_str || ''', ''YYYYMMDD'') and ' ||
p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')';
execute l_sql;
l_sql :=
'create trigger ps_' || p_table || '_before_insert ' ||
'before insert on ' || p_table || ' for each row ' ||
'execute procedure ps_' || p_table || '_insert_trigger()';
execute l_sql;
perform 1
from ps_snapshot a, ps_column b
where b.table_id = a.snapshot_id and a.table_id = l_table
and b.type_name in ('min', 'max');
if FOUND then
l_sql :=
'create trigger ps_' || p_table || '_after_update ' ||
'after update on ' || p_table || ' for each row ' ||
'execute procedure ps_' || p_table || '_raise_trigger()';
execute l_sql;
l_sql :=
'create trigger ps_' || p_table || '_after_delete ' ||
'after delete on ' || p_table || ' for each row ' ||
'execute procedure ps_' || p_table || '_raise_trigger()';
execute l_sql;
l_sql :=
'create trigger ps_' || p_table || '_' || l_start_str || '_after_update ' ||
'after update on ' || p_table || '_' || l_start_str || ' for each row ' ||
'execute procedure ps_' || p_table || '_raise_trigger()';
execute l_sql;
l_sql :=
'create trigger ps_' || p_table || '_' || l_start_str || '_after_delete ' ||
'after delete on ' || p_table || '_' || l_start_str || ' for each row ' ||
'execute procedure ps_' || p_table || '_raise_trigger()';
execute l_sql;
else
l_sql :=
'create trigger ps_' || p_table || '_after_update ' ||
'after update on ' || p_table || ' for each row ' ||
'execute procedure ps_' || p_table || '_update_trigger()';
execute l_sql;
l_sql :=
'create trigger ps_' || p_table || '_after_delete ' ||
'after delete on ' || p_table || ' for each row ' ||
'execute procedure ps_' || p_table || '_delete_trigger()';
execute l_sql;
l_sql :=
'create trigger ps_' || p_table || '_' || l_start_str || '_after_update ' ||
'after update on ' || p_table || '_' || l_start_str || ' for each row ' ||
'execute procedure ps_' || p_table || '_update_trigger()';
execute l_sql;
l_sql :=
'create trigger ps_' || p_table || '_' || l_start_str || '_after_delete ' ||
'after delete on ' || p_table || '_' || l_start_str || ' for each row ' ||
'execute procedure ps_' || p_table || '_delete_trigger()';
execute l_sql;
end if;
end;
$$ language plpgsql;
Здесь после проверки корректности входных данных мы добавляем необходимые метаданные, после чего создаем наследуемую таблицу.
Затем пересоздаем триггерные функции вызовом ps_trigger_regenerate, после чего данные, попадающие под условие секционирования, переносим в созданный раздел с помощью динамического запроса и пересоздаем сами триггеры.
Трудности возникли с двумя моментами.
- Мне пришлось немного повозиться с добавлением месяца, дня или года к дате начала (в зависимости от входного параметра p_type:
l_end := p_start + ('1 ' || p_type)::INTERVAL;
- Поскольку первичный ключ не наследуется, мне пришлось написать запрос к Системные каталоги , чтобы получить список столбцов первичного ключа исходной таблицы (я посчитал нецелесообразным также хранить в своих метаданных описание первичного ключа):
select f.name as name from ( select ps_array_to_set(a.conkey) as nn from pg_constraint a, pg_class b where b.oid = a.conrelid and a.contype = 'p' and b.relname = p_table ) c, ( select d.attname as name, d.attnum as nn from pg_attribute d, pg_class e where e.oid = d.attrelid and e.relname = p_table ) f where f.nn = c.nn order by f.nn
Функция удаления раздела значительно проще и не требует особых комментариев: ps_del_range_partition (varchar, дата) create or replace function ps_del_range_partition(in p_table varchar, in p_start date)
returns void
as $$
declare
l_sql text;
l_start_str varchar(10);
l_table bigint;
begin
select id into l_table
from ps_table
where lower(name) = lower(p_table);
l_start_str = to_char(p_start, 'YYYYMMDD');
delete from ps_range_partition
where table_id = l_table
and start_value = p_start;
perform ps_trigger_regenerate(l_table);
l_sql :=
'insert into ' || p_table || ' ' ||
'select * from ' || p_table || '_' || l_start_str;
execute l_sql;
perform 1
from ( select 1
from ps_range_partition
where table_id = l_table
union all
select 1
from ps_snapshot
where table_id = l_table ) a;
if not FOUND then
execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table;
execute 'drop trigger if exists ps_' || p_table || '_after_update on ' || p_table;
execute 'drop trigger if exists ps_' || p_table || '_after_delete on ' || p_table;
execute 'drop function ps_' || p_table || '_insert_trigger() cascade';
execute 'drop function ps_' || p_table || '_raise_trigger() cascade';
execute 'drop function ps_' || p_table || '_update_trigger() cascade';
execute 'drop function ps_' || p_table || '_delete_trigger() cascade';
delete from ps_column where table_id = l_table;
delete from ps_table where id = l_table;
end if;
perform 1
from ps_range_partition
where table_id = l_table;
if not FOUND then
delete from ps_column
where table_id = l_table
and type_name = 'date';
end if;
execute 'drop table ' || p_table || '_' || l_start_str;
end;
$$ language plpgsql;
При удалении раздела данные, конечно, не теряются, а переносятся в основную таблицу (сначала удаляются триггеры, так как, как оказалось, в операторе вставки не работает единственное ключевое слово).
Остаётся только добавить функции для управления «живыми» снимками данных: ps_add_snapshot_column (varchar, varchar, varchar, varchar) create or replace function ps_add_snapshot_column(in p_snapshot varchar,
in p_column varchar, in p_parent varchar, in p_type varchar) returns void
as $$
declare
l_table bigint;
begin
perform 1
from ps_table
where lower(name) = lower(p_snapshot);
if not FOUND then
insert into ps_table(name) values (lower(p_snapshot));
end if;
select id into l_table
from ps_table
where lower(name) = lower(p_snapshot);
insert into ps_column(table_id, name, parent_name, type_name)
values (l_table, lower(p_column), lower(p_parent), p_type);
end;
$$ language plpgsql;
ps_add_snapshot(varchar, varchar, varchar) create or replace function ps_add_snapshot(in p_table varchar, in p_snapshot varchar,
in p_type varchar) returns void
as $$
declare
l_sql text;
l_table bigint;
l_snapshot bigint;
l_flag boolean;
columns record;
begin
select id into l_snapshot
from ps_table
where lower(name) = lower(p_snapshot);
perform 1
from ps_column
where table_id = l_snapshot
and type_name in ('date', 'key');
if not FOUND then
raise EXCEPTION 'Key columns not found';
end if;
perform 1
from ps_column
where table_id = l_snapshot
and not type_name in ('date', 'key', 'nullable');
if not FOUND then
raise EXCEPTION 'Aggregate columns not found';
end if;
perform 1
from ps_table
where lower(name) = lower(p_table);
if not FOUND then
insert into ps_table(name) values (lower(p_table));
end if;
select id into l_table
from ps_table
where lower(name) = lower(p_table);
insert into ps_snapshot(table_id, snapshot_id, type_name)
values (l_table, l_snapshot, p_type);
perform ps_trigger_regenerate(l_table);
l_sql := 'create table ' || p_snapshot || ' (';
l_flag := FALSE;
for columns in
select name, type_name
from ps_column
where table_id = l_snapshot
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
if columns.type_name = 'date' then
l_sql := l_sql || columns.name || ' date not null';
else
l_sql := l_sql || columns.name || ' bigint not null';
end if;
end loop;
l_sql := l_sql || ', primary key (';
l_flag := FALSE;
for columns in
select name
from ps_column
where table_id = l_snapshot
and type_name in ('date', 'key', 'nullable')
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
l_sql := l_sql || columns.name;
end loop;
l_sql := l_sql || '))';
execute l_sql;
execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table;
execute 'drop trigger if exists ps_' || p_table || '_after_update on ' || p_table;
execute 'drop trigger if exists ps_' || p_table || '_after_delete on ' || p_table;
l_sql :=
'create trigger ps_' || p_table || '_before_insert ' ||
'before insert on ' || p_table || ' for each row ' ||
'execute procedure ps_' || p_table || '_insert_trigger()';
execute l_sql;
perform 1
from ps_snapshot a, ps_column b
where b.table_id = a.snapshot_id and a.table_id = l_table
and b.type_name in ('min', 'max');
if FOUND then
l_sql :=
'create trigger ps_' || p_table || '_after_update ' ||
'after update on ' || p_table || ' for each row ' ||
'execute procedure ps_' || p_table || '_raise_trigger()';
execute l_sql;
l_sql :=
'create trigger ps_' || p_table || '_after_delete ' ||
'after delete on ' || p_table || ' for each row ' ||
'execute procedure ps_' || p_table || '_raise_trigger()';
execute l_sql;
else
l_sql :=
'create trigger ps_' || p_table || '_after_update ' ||
'after update on ' || p_table || ' for each row ' ||
'execute procedure ps_' || p_table || '_update_trigger()';
execute l_sql;
l_sql :=
'create trigger ps_' || p_table || '_after_delete ' ||
'after delete on ' || p_table || ' for each row ' ||
'execute procedure ps_' || p_table || '_delete_trigger()';
execute l_sql;
end if;
l_sql := 'insert into ' || p_snapshot || '(';
l_flag := FALSE;
for columns in
select name
from ps_column
where table_id = l_snapshot
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
Теги: #разделение #olap #postgresql #postgresql
-
Важность Управления Поступлением В Школу
19 Oct, 24 -
Формула Расчета Ипотеки В Excel
19 Oct, 24 -
Решения
19 Oct, 24 -
Как Мы Создали Новый Рынок В России
19 Oct, 24 -
Почему В Мозгу Возникают Ошибки?
19 Oct, 24