Implementing a "distributed" reporting server using some of postgres10 features.
Today i will try to show how strong Postgres 10 is by combining different features in order to create a “distributed” reporting server. The features that i will be using are :
- Logical Replication
- Partitioning
- Foreign Data Wrappers
- Table Inheritance
The scenario that we want to implement is the following :
- 10.0.0.2, bucket, insert / update / delete
- 10.0.0.3, node2016, data holder for 2016
- 10.0.0.4, node2017, data holder for 2017
- 10.0.0.5, node2018, data holder for 2018
- 10.0.0.6, reporting proxy, main point for selects
CREATE TABLE data_bucket (
id int ,
data text,
insert_time timestamp without time zone DEFAULT now())
PARTITION BY RANGE (insert_time);
CREATE TABLE data_p2016 PARTITION OF data_bucket
FOR VALUES FROM ('2016-01-01 00:00:00') TO ('2017-01-01 00:00:00');
CREATE TABLE data_p2017 PARTITION OF data_bucket
FOR VALUES FROM ('2017-01-01 00:00:00') TO ('2018-01-01 00:00:00');
CREATE TABLE data_p2018 PARTITION OF data_bucket
FOR VALUES FROM ('2018-01-01 00:00:00') TO ('2019-01-01 00:00:00');
create unique index data_p2016_uniq on data_p2016 (id);
create unique index data_p2017_uniq on data_p2017 (id);
create unique index data_p2018_uniq on data_p2018 (id);
create index data_p2016_time on data_p2016 (insert_time);
create index data_p2017_time on data_p2017 (insert_time);
create index data_p2018_time on data_p2018 (insert_time);
CREATE PUBLICATION pub_data_p2016 FOR TABLE data_p2016
WITH (publish='insert,update');
CREATE PUBLICATION pub_data_p2017 FOR TABLE data_p2017
WITH (publish='insert,update');
CREATE PUBLICATION pub_data_p2018 FOR TABLE data_p2018
WITH (publish='insert,update');
-- node 2016
CREATE TABLE data_p2016 (
id int,
data text,
insert_time timestamp without time zone );
create unique index data_p2016_uniq on data_p2016 (id);
create index data_p2016_time on data_p2016 (insert_time);
CREATE SUBSCRIPTION sub_data_p2016
CONNECTION 'dbname=monkey host=10.0.0.2 user=postgres port=5432'
PUBLICATION pub_data_p2016;
-- node 2017
CREATE TABLE data_p2017 (
id int,
data text,
insert_time timestamp without time zone ) ;
create unique index data_p2017_uniq on data_p2017 (id);
create index data_p2017_time on data_p2017 (insert_time);
CREATE SUBSCRIPTION sub_data_p2017
CONNECTION 'dbname=monkey host=10.0.0.2 user=postgres port=5432'
PUBLICATION pub_data_p2017;
-- node 2018
CREATE TABLE data_p2018 (
id int,
data text,
insert_time timestamp without time zone ) ;
create unique index data_p2018_uniq on data_p2017 (id);
create index data_p2018_time on data_p2017 (insert_time);
CREATE SUBSCRIPTION sub_data_p2018
CONNECTION 'dbname=monkey host=10.0.0.2 user=postgres port=5432'
PUBLICATION pub_data_p2018;
Here, for each node we create the data table, indexes and a subscription pointing to the bucket server.
Right now every row that gets into the bucket is being transferred to the appropriate node. One last thing is missing, putting everything together. For aggregating all nodes we have the reporting proxy container. In this server we need to run the following SQL statements :
create extension if not exists postgres_fdw;
CREATE SERVER data_node_2016
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS(host '10.0.0.3',port '5432',dbname 'monkey');
CREATE SERVER data_node_2017
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS(host '10.0.0.4',port '5432',dbname 'monkey');
CREATE SERVER data_node_2018
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS(host '10.0.0.5',port '5432',dbname 'monkey');
CREATE USER MAPPING FOR postgres SERVER data_node_2016 OPTIONS(user 'postgres');
CREATE USER MAPPING FOR postgres SERVER data_node_2017 OPTIONS(user 'postgres');
CREATE USER MAPPING FOR postgres SERVER data_node_2018 OPTIONS(user 'postgres');
CREATE TABLE reporting_table (
id int,
data text,
insert_time timestamp without time zone);
CREATE FOREIGN TABLE data_node_2016 (
CHECK ( insert_time >= DATE '2016-01-01' AND insert_time < DATE '2017-01-01' ))
INHERITS (reporting_table) SERVER data_node_2016
options (table_name 'data_p2016');
CREATE FOREIGN TABLE data_node_2017 (
CHECK ( insert_time >= DATE '2017-01-01' AND insert_time < DATE '2018-01-01' ))
INHERITS (reporting_table) SERVER data_node_2017 options (table_name 'data_p2017');
CREATE FOREIGN TABLE data_node_2018 (
CHECK ( insert_time >= DATE '2018-01-01' AND insert_time < DATE '2019-01-01' ))
INHERITS (reporting_table) SERVER data_node_2018 options (table_name 'data_p2018');
We first create the Postgres foreign data wrapper extension , create remote servers and user mappings for each data node, then create the main reporting table and finally we create three foreign tables, one for each node using table inheritance.
The structure is ready, everything is now connected and we should be good for testing. But before we test this let’s describe what to expect. By inserting into data_bucket data should be replicated into yearly partitions, these partitions will be replicated to their data nodes and the reporting proxy should aggregate all nodes by using foreign scans.
Let’s insert some randomly generated data by inserting into the data_bucket:
insert into data_bucket
select generate_series(1,1000000),
md5(random()::text),
timestamp '2016-01-01 00:00:00' + random() *
(timestamp '2019-01-01 00:00:00' - timestamp '2016-01-01 00:00:00');
Data should be distributed into all three nodes. Now from reporting_table we created in the reporting proxy we should be able to see everything, notice the explain plans :
monkey=# select count (*) from reporting_table ;
count
---------
1000000
(1 row)
monkey=# select min (insert_time),max(insert_time) from reporting_table;
min | max
----------------------------+----------------------------
2016-01-01 00:03:17.062862 | 2018-12-31 23:59:39.671967
(1 row)
monkey=# explain analyze select min (insert_time),max(insert_time) from reporting_table;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------
Aggregate (cost=598.80..598.81 rows=1 width=16) (actual time=1708.333..1708.334 rows=1 loops=1)
-> Append (cost=0.00..560.40 rows=7681 width=8) (actual time=0.466..1653.186 rows=1000000 loops=1)
-> Seq Scan on reporting_table (cost=0.00..0.00 rows=1 width=8) (actual time=0.002..0.002 rows=0 loops=1)
-> Foreign Scan on data_node_2016 (cost=100.00..186.80 rows=2560 width=8) (actual time=0.464..544.597 rows=334088 loops=1)
-> Foreign Scan on data_node_2017 (cost=100.00..186.80 rows=2560 width=8) (actual time=0.334..533.149 rows=332875 loops=1)
-> Foreign Scan on data_node_2018 (cost=100.00..186.80 rows=2560 width=8) (actual time=0.323..534.776 rows=333037 loops=1)
Planning time: 0.220 ms
Execution time: 1709.252 ms
(8 rows)
monkey=# select * from reporting_table where insert_time = '2016-06-21 17:59:44';
id | data | insert_time
----+------+-------------
(0 rows)
monkey=# select * from reporting_table where insert_time = '2016-06-21 17:59:44.154904';
id | data | insert_time
-----+----------------------------------+----------------------------
150 | 27da6c5606ea26d4ca51c6b642547d44 | 2016-06-21 17:59:44.154904
(1 row)
monkey=# explain analyze select * from reporting_table where insert_time = '2016-06-21 17:59:44.154904';
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------
Append (cost=0.00..125.17 rows=7 width=44) (actual time=0.383..0.384 rows=1 loops=1)
-> Seq Scan on reporting_table (cost=0.00..0.00 rows=1 width=44) (actual time=0.002..0.002 rows=0 loops=1)
Filter: (insert_time = '2016-06-21 17:59:44.154904'::timestamp without time zone)
-> Foreign Scan on data_node_2016 (cost=100.00..125.17 rows=6 width=44) (actual time=0.381..0.381 rows=1 loops=1)
Planning time: 0.172 ms
Execution time: 0.801 ms
(6 rows)
Some might say that ok, but we have all the data in 2 places, which is true.. but do we actually need data in the bucket? Answer is no, we don’t , we only need them in case we need to update. Remember that we set logical replication to only replicate insert and updates? This means that we can delete whatever we want from either the bucket or its partitions, so we can have any custom data retention, we can even truncate them if we want to remove data fast.
Now, is this solution perfect ? No, it’s not, foreign data wrappers are obviously slower and they can’t perform all operations but with each Postgres version they are getting better.
Thanks for reading.
Source: eVOL Monkey