手把手教你:将ClickHouse集群迁至云上

1. 前言

随着云上ClickHouse服务完善,越来越多的用户将自建ClickHouse服务迁移至云上。对于不同数据规模,我们选择不同的方案:

  • 对于数据量比较小的表,通常小于10GB情况下,可以将数据到处为CSV格式,在云上集群重新写入数据;
  • 使用clickhouse发行版自带工具clickhouse-copier 来完成。

本文详解clickhouse-copier 完成跨ClickHouse集群数据迁移(当然也可以用户集群内部数据不同表间数据迁移)。

2. Zookeeper集群准备

如果已经有Zookeeper集群,请忽略本章节。

由于clickhouse-copier 需要Zookeeper存储数据迁移任务信息,需要部署一个Zookeeper集群。

Zookeeper集群到源ClickHouse集群与目标ClickHouse集群之间的网络是正常的。

在本文中,我们部署一个单节点的Zookeeper集群。

步骤1: 准备可执行文件

代码语言:txt
复制
$ wget http://apache.is.co.za/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1.tar.gz
$ tar -xvf zookeeper-3.6.1.tar.gz
$ chown hadoop:hadoop -R  zookeeper-3.6.1

步骤2:切换到hadoop账号

代码语言:txt
复制
su hadoop

步骤3: 准备配置文件 conf/zoo.cfg,填写配置,举例如下:

代码语言:txt
复制
tickTime=2000
dataDir=/var/data/zookeepe
clientPort=2181

步骤4:增加myid文件

代码语言:txt
复制
echo 1 > /var/data/zookeeper/myid

步骤5:启动Zookeeper进程

代码语言:txt
复制
$ bin/zkServer.sh start

后续,我们可以用该Zookeeper存储数据迁移任务信息。

3. 定义迁移任务

在任务迁移数据前,需要定义迁移任务。迁移任务信息定义在xml文件中。具体包含如下信息:

  • 源集群,包含数据分片信息
  • 目的集群,包含数据分片信息
  • 执行数据迁移任务的线程数量
  • 定义待迁移的表信息,有tables字段指定,包括:
    • 数据源集群名称,由cluster_pull指定
    • 数据源数据库名称,由database_pull指定
    • 数据源表名称,由table_pull指定
    • 目的集群名称,由cluster_push指定
    • 目的数据库名称,由database_push指定
    • 目的表名称,由table_push指定
    • 目的表引擎定义,由engine指定
    • 待迁移的partition列表,由enabled_partitions指定。未指定,则全表迁移

如果目标集群数据库不存在,则不会自动创建。故迁移数据前,确保目标集群数据库存在。源表和目标表的Schema相同,表引擎可以不相同。

举例如下:

代码语言:txt
复制
<yandex>
    <!-- Configuration of clusters as in an ordinary server config -->
    <remote_servers>
        <source_cluster>
            <shard>
                <internal_replication>false</internal_replication>
                    <replica>
                        <host>172.16.0.72</host>
                        <port>9000</port>
                    </replica>
            </shard>
        </source_cluster>
    &lt;destination_cluster&gt;
        &lt;shard&gt;
            &lt;internal_replication&gt;false&lt;/internal_replication&gt;
                &lt;replica&gt;
                    &lt;host&gt;172.16.0.115&lt;/host&gt;
                    &lt;port&gt;9000&lt;/port&gt;
                &lt;/replica&gt;
                &lt;replica&gt;
                    &lt;host&gt;172.16.0.47&lt;/host&gt;
                    &lt;port&gt;9000&lt;/port&gt;
                &lt;/replica&gt;
        &lt;/shard&gt;
        &lt;shard&gt;
            &lt;internal_replication&gt;false&lt;/internal_replication&gt;
                &lt;replica&gt;
                    &lt;host&gt;172.16.0.138&lt;/host&gt;
                    &lt;port&gt;9000&lt;/port&gt;
                &lt;/replica&gt;
                &lt;replica&gt;
                    &lt;host&gt;172.16.0.49&lt;/host&gt;
                    &lt;port&gt;9000&lt;/port&gt;
                &lt;/replica&gt;
        &lt;/shard&gt;
    &lt;/destination_cluster&gt;
&lt;/remote_servers&gt;

&lt;!-- How many simultaneously active workers are possible. If you run more workers superfluous workers will sleep. --&gt;
&lt;max_workers&gt;8&lt;/max_workers&gt;

&lt;!-- Setting used to fetch (pull) data from source cluster tables --&gt;
&lt;settings_pull&gt;
    &lt;readonly&gt;1&lt;/readonly&gt;
&lt;/settings_pull&gt;

&lt;!-- Setting used to insert (push) data to destination cluster tables --&gt;
&lt;settings_push&gt;
    &lt;readonly&gt;0&lt;/readonly&gt;
&lt;/settings_push&gt;

&lt;settings&gt;
    &lt;connect_timeout&gt;300&lt;/connect_timeout&gt;
    &lt;!-- Sync insert is set forcibly, leave it here just in case. --&gt;
    &lt;insert_distributed_sync&gt;1&lt;/insert_distributed_sync&gt;
&lt;/settings&gt;

&lt;tables&gt;
    &lt;!-- A table task, copies one table. --&gt;
    &lt;table_lineorder&gt;
        &lt;!-- Source cluster name (from &lt;remote_servers/&gt; section) and tables in it that should be copied --&gt;
        &lt;cluster_pull&gt;source_cluster&lt;/cluster_pull&gt;
        &lt;database_pull&gt;default&lt;/database_pull&gt;
        &lt;table_pull&gt;lineorder&lt;/table_pull&gt;

        &lt;!-- Destination cluster name and tables in which the data should be inserted --&gt;
        &lt;cluster_push&gt;destination_cluster&lt;/cluster_push&gt;
        &lt;database_push&gt;default&lt;/database_push&gt;
        &lt;table_push&gt;lineorder_7&lt;/table_push&gt;

        &lt;engine&gt;
        ENGINE=ReplicatedMergeTree(&#39;/clickhouse/tables/{shard}/lineorder_7&#39;,&#39;{replica}&#39;)
        PARTITION BY toYear(LO_ORDERDATE)
        ORDER BY (LO_ORDERDATE, LO_ORDERKEY)
        &lt;/engine&gt;

        &lt;!-- Sharding key used to insert data to destination cluster --&gt;
        &lt;sharding_key&gt;rand()&lt;/sharding_key&gt;

        &lt;!-- Optional expression that filter data while pull them from source servers --&gt;
        &lt;!-- &lt;where_condition&gt;&lt;/where_condition&gt; --&gt;
       &lt;!--
        &lt;enabled_partitions&gt;
        &lt;/enabled_partitions&gt;
       --&gt;
    &lt;/table_lineorder&gt;
&lt;/tables&gt;

</yandex>

准备完成配置文件后,在Zookeeper上准备路径,并将定义任务文件上传到Zookeeper中。假设配置文件为task.xml, 执行如下指令:

代码语言:txt
复制
$ bin/zkCli.sh create /clickhouse/copytasks ""
$ bin/zkCli.sh create /clickhouse/copytasks/task ""
$ bin/zkCli.sh create /clickhouse/copytasks/task/description "cat ./task.xml"

4. 启动任务

定义好迁移任务后,就可以启动clickhouse-copier来迁移数据了。在此之前,需要准备的配置文件, 配置文件中描述了Zookeeper地址,以及日志配置。举例如下:

代码语言:txt
复制
<yandex>
<logger>
<level>trace</level>
<size>100M</size>
<count>3</count>
</logger>

&lt;zookeeper&gt;
    &lt;node index=&#34;1&#34;&gt;
        &lt;host&gt;172.16.0.139&lt;/host&gt;
        &lt;port&gt;2181&lt;/port&gt;
    &lt;/node&gt;
&lt;/zookeeper&gt;

</yandex>

假设该文件命名为config.xml


可以使用如下命令启动clickhouse-copier:

代码语言:txt
复制
 $ clickhouse-copie
--config ./config.xml
--task-path /clickhouse/copytasks/task
--base-dir ./clickhouse </code>

其中,--task-path指定数据迁移任务在Zookeeper上的路径,即第3节中创建的路径。需要主要的是,路径下必现包含description文件。

如果数据量比较多,可以部署多个clickhouse-copier并发执行迁移任务。

5. 总结

clickhouse-copier是ClickHouse发行版自带的工具,在稳定性可靠性上是有保证的。在使用过程中,需要注意的问题:

  • 在迁移过程中,源集群的表需要停止写入;
  • 在迁移过程中,占用源,目的集群网络带宽,需要仔细评估;
  • clickhouse-copier提供了较多灵活性,包括数据分片算法,指定迁移表的partitions;