当用户发出查询时,Citus
coordinator 将其划分为更小的查询片段,其中每个查询片段可以在工作分片上独立运行。这允许Citus
将每个查询分布在集群中。
但是,将查询划分为片段的方式(以及传播哪些查询)因查询类型而异。 在某些高级情况下,手动控制此行为很有用。Citus
提供实用函数来将SQL
传播到workers
、shards
或placements
。
手动查询传播绕过coordinator
逻辑、锁定和任何其他一致性检查。 这些函数可作为最后的手段,以允许 Citus 否则不会在本机运行的语句。小心使用它们以避免数据不一致和死锁。
最小的执行级别是广播一条语句以在所有worker
上执行。这对于查看整个工作数据库的属性很有用。
-- List the work_mem setting of each worker database SELECT run_command_on_workers($cmd$ SHOW work_mem; $cmd$);
注意:
不应使用此命令在worker
上创建数据库对象,因为这样做会使以自动方式添加worker
节点变得更加困难。
注意:
本节中的run_command_on_workers
函数和其他手动传播命令只能运行返回单列单行的查询。
下一个粒度级别是在特定分布式表的所有分片上运行命令。例如,在直接在worker
上读取表的属性时,它可能很有用。 在worker
节点上本地运行的查询可以完全访问元数据,例如表统计信息。
run_command_on_shards
函数将SQL
命令应用于每个分片,其中提供分片名称以在命令中进行插值。 这是一个估计分布式表行数的示例,通过使用每个worker
上的pg_class
表来估计每个分片的行数。 请注意将替换为每个分片名称的%s
。
-- Get the estimated row count for a distributed table by summing the -- estimated counts of rows for each shard. SELECT sum(result::bigint) AS estimated_count FROM run_command_on_shards( 'my_distributed_table', $cmd$ SELECT reltuples FROM pg_class c JOIN pg_catalog.pg_namespace n on n.oid=c.relnamespace WHERE (n.nspname || '.' || relname)::regclass = '%s'::regclass AND n.nspname NOT IN ('citus', 'pg_toast', 'pg_catalog') $cmd$ );
最精细的执行级别是在所有分片及其副本(也称为放置)上运行命令。它对于运行数据修改命令很有用,这些命令必须应用于每个副本以确保一致性。
例如,假设一个分布式表有一个updated_at
字段,我们想要“触摸”所有行,以便在某个时间将它们标记为已更新。coordinator
上的普通UPDATE
语句需要按分布列进行过滤,但我们可以手动将更新传播到所有分片和副本:
-- note we're using a hard-coded date rather than -- a function such as "now()" because the query will -- run at slightly different times on each replica SELECT run_command_on_placements( 'my_distributed_table', $cmd$ UPDATE %s SET updated_at = '2017-01-01'; $cmd$ );
run_command_on_placements
的一个有用伴侣是run_command_on_colocated_placements
。 它将位于共置的分布式表的两个位置的名称插入到查询中。放置对总是被选择为本地的同一个worker
,其中完整的SQL
覆盖是可用的。因此,我们可以使用触发器等高级SQL
功能来关联表:
-- Suppose we have two distributed tables CREATE TABLE little_vals (key int, val int); CREATE TABLE big_vals (key int, val int); SELECT create_distributed_table('little_vals', 'key'); SELECT create_distributed_table('big_vals', 'key'); -- We want to synchronize them so that every time little_vals -- are created, big_vals appear with double the value -- -- First we make a trigger function, which will -- take the destination table placement as an argument CREATE OR REPLACE FUNCTION embiggen() RETURNS TRIGGER AS $$ BEGIN IF (TG_OP = 'INSERT') THEN EXECUTE format( 'INSERT INTO %s (key, val) SELECT ($1).key, ($1).val*2;', TG_ARGV[0] ) USING NEW; END IF; RETURN NULL; END; $$ LANGUAGE plpgsql; -- Next we relate the co-located tables by the trigger function -- on each co-located placement SELECT run_command_on_colocated_placements( 'little_vals', 'big_vals', $cmd$ CREATE TRIGGER after_insert AFTER INSERT ON %s FOR EACH ROW EXECUTE PROCEDURE embiggen(%L) $cmd$ );
Lebanon Address 版权所有
Powered by WordPress