HBase (2)底层与参数调优

HBase(2)底层架构、配置文件参数

底层原理

Master架构

image-20220729140115810
  • master读写zk,region server读写zk,实现master对region server的管理

  • master与hdfs系统交互:获取meta和wal数据

  • MasterProcWAL:在完成一个任务前,将任务内容记录到wal,当操作完成后,再将wal删除(避免执行过程中master宕机);每1h或者日志文件到达32M时,清除过去日志,重新开始记录

  • 图里的路径,可以根据hadoop网页版来看

  • Meta表格:

    • 全称:hbase:meta,保存region的分配情况
    • rowkey: [table],[region start key],[region id]——表名,region起始位置和 regionID(table指的是保存了实际数据的table,起始位置即为该region的第一个key)
    • column:
      • info:
        • regioninfo:region 信息,存储一个序列化的 HRegionInfo 对象
        • server:管理当前 region 的 RegionServer 信息,包含端口号(master做负载均衡时,查看该字段即可)
        • serverstartcode:当前 region 被分到 RegionServer 的起始时间
      • 如果一个表处于region切分过程,会多出两列info:splitA、info:splitB,存储值 HRegionInfo 对象,拆分结束后,删除这两列
  • 客户端对元数据进行操作的时候才连接 master,读写数据直接连接zookeeper读取目录/hbase/meta-region-server节点信息,不需要访问master——master 专注 meta 表的写操作

  • HBase 的 2.3 版本的新模式:Master Registry。客户端访问 master 来读取 meta 表信息,加大 master 的压力,减轻 zookeeper 的压力

  • 源码:(需要导入hbase-server的包)

    • HMaster类:

      1
      2
      3
      4
      5
      6
      7
      public class HMaster extends HRegionServer implements MasterServices {
      private static final Logger LOG = LoggerFactory.getLogger(HMaster.class);
      public static final String MASTER = "master";
      private final ActiveMasterManager activeMasterManager;
      private RegionServerTracker regionServerTracker;
      private DrainingServerTracker drainingServerTracker;
      LoadBalancerTracker loadBalancerTracker; // 负载均衡器
    • 负载均衡器(zknodetracker)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      129
      130
      131
      132
      133
      134
      135
      136
      137
      138
      139
      140
      141
      142
      143
      144
      145
      146
      147
      148
      149
      150
      151
      152
      153
      154
      155
      156
      public abstract class ZKNodeTracker extends ZKListener {
      protected static final Logger LOG = LoggerFactory.getLogger(ZKNodeTracker.class);
      protected final String node;
      private byte[] data;
      protected final Abortable abortable;
      private boolean stopped = false;

      public ZKNodeTracker(ZKWatcher watcher, String node, Abortable abortable) {
      super(watcher);
      this.node = node;
      this.abortable = abortable;
      this.data = null;
      }

      public synchronized void start() {
      this.watcher.registerListener(this);

      try {
      if (ZKUtil.watchAndCheckExists(this.watcher, this.node)) {
      byte[] data = ZKUtil.getDataAndWatch(this.watcher, this.node);
      if (data != null) {
      this.data = data;
      } else {
      LOG.debug("Try starting again because there is no data from {}", this.node);
      this.start();
      }
      }
      } catch (KeeperException var2) {
      this.abortable.abort("Unexpected exception during initialization, aborting", var2);
      }

      }

      public synchronized void stop() {
      this.stopped = true;
      this.notifyAll();
      }

      public synchronized byte[] blockUntilAvailable() throws InterruptedException {
      return this.blockUntilAvailable(0L, false);
      }

      public synchronized byte[] blockUntilAvailable(long timeout, boolean refresh) throws InterruptedException {
      if (timeout < 0L) {
      throw new IllegalArgumentException();
      } else {
      boolean notimeout = timeout == 0L;
      long startTime = System.currentTimeMillis();
      long remaining = timeout;
      if (refresh) {
      try {
      this.data = ZKUtil.getDataAndWatch(this.watcher, this.node);
      } catch (KeeperException var13) {
      LOG.warn("Unexpected exception handling blockUntilAvailable", var13);
      this.abortable.abort("Unexpected exception handling blockUntilAvailable", var13);
      }
      }

      for(boolean nodeExistsChecked = !refresh || this.data != null; !this.stopped && (notimeout || remaining > 0L) && this.data == null; remaining = timeout - (System.currentTimeMillis() - startTime)) {
      if (!nodeExistsChecked) {
      try {
      nodeExistsChecked = ZKUtil.checkExists(this.watcher, this.node) != -1;
      } catch (KeeperException var12) {
      LOG.warn("Got exception while trying to check existence in ZooKeeper of the node: " + this.node + ", retrying if timeout not reached", var12);
      }

      if (nodeExistsChecked) {
      LOG.debug("Node {} now exists, resetting a watcher", this.node);

      try {
      this.data = ZKUtil.getDataAndWatch(this.watcher, this.node);
      } catch (KeeperException var11) {
      LOG.warn("Unexpected exception handling blockUntilAvailable", var11);
      this.abortable.abort("Unexpected exception handling blockUntilAvailable", var11);
      }
      }
      }

      this.wait(100L);
      }

      return this.data;
      }
      }

      public synchronized byte[] getData(boolean refresh) {
      if (refresh) {
      try {
      this.data = ZKUtil.getDataAndWatch(this.watcher, this.node);
      } catch (KeeperException var3) {
      this.abortable.abort("Unexpected exception handling getData", var3);
      }
      }

      return this.data;
      }

      public String getNode() {
      return this.node;
      }

      public synchronized void nodeCreated(String path) {
      if (path.equals(this.node)) {
      try {
      byte[] data = ZKUtil.getDataAndWatch(this.watcher, this.node);
      if (data != null) {
      this.data = data;
      this.notifyAll();
      } else {
      this.nodeDeleted(path);
      }
      } catch (KeeperException var3) {
      this.abortable.abort("Unexpected exception handling nodeCreated event", var3);
      }

      }
      }

      public synchronized void nodeDeleted(String path) {
      if (path.equals(this.node)) {
      try {
      if (ZKUtil.watchAndCheckExists(this.watcher, this.node)) {
      this.nodeCreated(path);
      } else {
      this.data = null;
      }
      } catch (KeeperException var3) {
      this.abortable.abort("Unexpected exception handling nodeDeleted event", var3);
      }
      }

      }

      public synchronized void nodeDataChanged(String path) {
      if (path.equals(this.node)) {
      this.nodeCreated(path);
      }

      }

      public boolean checkIfBaseNodeAvailable() {
      try {
      if (ZKUtil.checkExists(this.watcher, this.watcher.getZNodePaths().baseZNode) == -1) {
      return false;
      }
      } catch (KeeperException var2) {
      this.abortable.abort("Exception while checking if basenode (" + this.watcher.getZNodePaths().baseZNode + ") exists in ZooKeeper.", var2);
      }

      return true;
      }

      public String toString() {
      return "ZKNodeTracker{node='" + this.node + ", stopped=" + this.stopped + '}';
      }
      }

RegionServer架构

image-20220729140458726
  • MemStore:写缓存
    • HFile 中数据是有序的,数据先存储在 MemStore 中根据rowkey排序,到达刷写时机才刷写到 HFile
    • 每次刷写会形成一个新的 HFile,写入对应的文件夹 store
  • WAL:预写日志
    • 数据要经 MemStore 排序后才刷写到 HFile,而数据保存在内存中可能会丢失
    • 数据先写入 Write-Ahead logfile 文件中,再写入 MemStore——系统故障时,数据通过该日志文件重建
  • BlockCache: 读缓存,每次查询出的数据缓存在 BlockCache,方便下次查询
  • 一个region server只有一个读缓存,而每个store有自己的写缓存

写流程

image-20220729152756587
  • 步骤(先创建了HBase连接)
    1. 访问 zookeeper,获取 hbase:meta 所在的位置(Region Server)
    2. 访问该 Region Server,获取 hbase:meta 表,客户端缓存,作为连接属性 MetaCache。Meta 表格含有数据,因此创建连接比较慢
    3. 调用 Table 的 put 方法写入数据,需要解析 RowKey,对照缓存的MetaCache,查看具体向哪个 RegionServer 写入
    4. 数据顺序写入(追加)到 WAL,此处写入直接写入磁盘,需要设置专门的线程控制 WAL 预写日志的滚动(类似 Flume)
    5. 根据写入命令的 RowKey 和 ColumnFamily,查看具体写入的Memstore,在 MemStore 中排序;
    6. 向客户端发送 ack;
    7. 到达 MemStore 的刷写时机后,数据刷写到对应的 store(要记住,hbase一直在追加!)

MemStore Flush

  • MemStore 刷写由多个线程控制,条件互相独立(关键在于,控制刷写文件的大小)
    • memstore大小
      • 某个 memstore 的大小达到hbase.hregion.memstore.flush.size(默认 128M),其所在 region 的所有 memstore 都会刷写
      • 当 memstore 的大小达到hbase.hregion.memstore.flush.size(默认值 128M) * hbase.hregion.memstore.block.multiplier(默认值 4) ,刷写并阻止继续往该 memstore 写数据
    • 低水位线、高水位线
      • HRegionServer 中属性 MemStoreFlusher 内部线程 FlushHandler 控制,标准为 LOWER_MARK(低水位线)和 HIGH_MARK(高水位线)——避免写缓存使用过多的内存,造成out of memory
      • 当 region server 中 memstore 的总大小达到低水位线 java_heapsize*hbase.regionserver.global.memstore.size(0.4) *hbase.regionserver.global.memstore.size.lower.limit(0.95), region 会按照其所有 memstore 的大小顺序(由大到小)依次进行刷写。直到 region server 中所有 memstore 的总大小小于上值
      • 当 region server 中 memstore 的总大小达到高水位线java_heapsize *hbase.regionserver.global.memstore.size(0.4)时,阻止继续往所有的 memstore 写数据
    • 到达刷写时间
      • 避免数据过长时间处于内存之中
      • HRegionServer 的属性 PeriodicMemStoreFlusher 控制进行,重要性较低,5min会执行一次
      • 自动刷新的时间间隔由该属性进行配置:hbase.regionserver.optionalcacheflushinterval(默认1小时)
    • WAL文件数目
      • WAL 文件的数量超过hbase.regionserver.max.logs,region按时间顺序依次刷写,直到 WAL 文件数量减小到 hbase.regionserver.max.log 以下(最大值为 32)

读流程

image-20220729152937264
  • HFile结构

    • HFile 是 HDFS 上每个 store 文件夹下实际存储数据的文件
    • 存储内容包括数据本身(key-value)、元数据记录、文件信息、数据索引(某个key-value数据对的位置)、元数据索引和一个固定长度的尾部信息(记录文件的修改情况,即版本)
    • 键值对按照块大小(默认 64K)保存,数据索引按照块创建(一个索引对应一个块),块越多,索引越大
    • 每个 HFile 维护一个布隆过滤器(读取时可以大致判断要 get 的 key 是否在该 HFile 中)
    • KeyValue内容:
      • rowlength:key 的长度
      • row:key 的值
      • columnfamilylength:列族长度
      • columnfamily:列族
      • columnqualifier:列名
      • timestamp:时间戳(默认系统时间)
      • keytype:Put
  • HFile 存储经过序列化,无法直接查看。通过 HBase 的命令查看元数据内容:hbase hfile -m -f /hbase/data/命名空间/表名/regionID/列族/HFile名

  • 步骤(连接创建同写流程)

    1. 创建 Table 对象发送 get 请求

    2. 优先访问 Block Cache,缓存中有大量的元数据

    3. 读缓存中的数据可能过期,因此需要再次读取写缓存和 store 中的文件

    4. 将读取到的数据合并版本,按照 get 的要求返回(如果只要一个,则高版本覆盖低版本)

  • 合并读取数据的优化(读取数据都需要读取三个位置,再合并不同版本的数据)

    • Block Cache 缓存之前读取的内容和元数据,如果 HFile 没有发生变化(记录在 HFile 尾信息中),则不需要读取HFile
    • HFile 带有索引文件,较快地读取对应 RowKey 数据
    • 使用布隆过滤器快速判断当前 HFile 是否存在需要读取的 RowKey——过滤器如果判断没有,则一定没有,如果有,可能没有(hash相同),但也只需要多读一次

storefile compaction

image-20220729161308619
  • memstore 每次刷写都会生成一个新的 HFile,需要合并 HFile,清理过期和已删除的数据
  • 合并分为:
    • Minor Compaction:将临近的若干个较小的 HFile 合并成一个较大的 HFile,清理部分过期和删除的数据,由系统使用一组参数自动控制
    • Major Compaction:将一个 Store 下所有 HFile 合并成一个 HFile,清理所有过期和删除的数据,由参数hbase.hregion.majorcompaction控制,默认 7 天。
  • Minor Compaction 控制机制:
    • 参与合并的文件需要通过符合参数:
      • hbase.hstore.compaction.ratio(默认1.2F):合并文件选择算法中使用的比率
      • hbase.hstore.compaction.min(默认3):Minor Compaction 最少合并文件个数。
      • hbase.hstore.compaction.max(默认10):Minor Compaction 最大合并文件数
      • hbase.hstore.compaction.min.size(默认128M):单个 Hfile 文件大小的最小值,小于该值会被合并
      • hbase.hstore.compaction.max.size(默认Long.MAX_VALUE):单个 Hfile 文件大小的最大值,大于该值的不会被合并
    • 拉取 store 中的所有文件为一个集合,按照从旧到新的顺序遍历。 判断条件:
      • 过小(128M)合并,过大不合并
      • 文件大小/hbase.hstore.compaction.ratio < 剩余文件大小和 则参与合并。比值设置过大,则最终合并为1个特别大的文件,设置过小,则产生多个HFile(剩余文件指的是,比该文件新的文件)
      • 满足合并条件的文件个数达不到个数要求(3 <= count <= 10),则不合并

region spit

  • 自定义分区:每一个 region 维护 startRow、endRowKey,若加入的数据符合某个 region 维护的 rowKey 范围,则该数据由该 region 维护。具体方法略
  • 系统分区:
    • HRegionServer 完成,操作之前通过 ZK 向 master 汇报,修改对应的 Meta 表信息,添加两列 info:splitA 和 info:splitB
    • 操作 HDFS 上面对应的文件,创建文件引用,不挪动数据
    • 刚完成拆分两个 Region 都由原先的 RegionServer 管理,之后汇报给 Master, Master 将修改后的信息写入到 Meta 表,下一次触发负载均衡机制才修改 Region 的管理服务者,数据等到下一次压缩时才实际进行移动
    • 拆分策略(父类为 RegionSplitPolicy):SteppingSplitPolicy,如果当前 RegionServer 上该表只有一个 Region, 按照2 * hbase.hregion.memstore.flush.size分裂,否则按照hbase.hregion.max.filesize分裂

调优

RowKey的设计

  • 设计rowkey目的是让数据均匀分布于所有的 region,一定程度上防止数据倾斜

    • 生成随机数、hash、散列值
    • 时间戳反转
    • 字符串拼接
  • hbase 设计 rowKey 的特点为: 适用性强、泛用性差,能够实现一个需求但不能同时实现多个需要

    • 例如:统计张三在 2021 年 12 月份消费的总金额和统计所有人在 2021 年 12 月份消费的总金额

    • 可枚举的放在前面(时间放在前面):date(yyyy-MM)^A^Auserdate(-dd hh:mm:ss ms)

      1
      2
      3
      4
      scan: startRow => 2021-12^A^Azhangsan
      stopRow => 2021-12^A^Azhangsan.
      scan: startRow => 2021-12
      stopRow => 2021-12.
    • ^A:避免扫描数据混乱,解决字段长度不一致的问题,HBase使用ascii为1的符号进行填充——此时,假设人名的长度为定长10,需要填充两个符号

    • .:如果没有.,则只能扫描rowkey为2021-12,因此选择比-的ascii大的符号.

  • 预分区优化:略

参数调优

  • 下面参数均位于hbase-site.xml中

  • Zookeeper 会话超时时间:

    • zookeeper.session.timeout
    • 默认值为 90000 毫秒(90s)——某个 RegionServer 挂掉,90s 后 Master 才察觉到
  • RPC 监听数量

    • hbase.regionserver.handler.count
    • 默认值为 30,指定 RPC 监听的数量,根据客户端的请求数进行调整,读写请求较多时增加此值
  • 手动控制 Major Compaction

    • hbase.hregion.majorcompaction
    • 默认值:604800000 秒(7 天), Major Compaction的周期,若关闭自动 Major Compaction可将其设为 0,此时需要手动合并
  • 优化 HStore 文件大小

    • hbase.hregion.max.filesize
    • 默认值 10737418240(10GB),如果 HFile 的大小达到这个数值,region 被切分为两 个 Hfile
  • 优化 HBase 客户端缓存

    • hbase.client.write.buffer
    • 默认值 2097152bytes(2M),HBase 客户端缓存,增大该值可以减少 RPC 调用次数,但会消耗更多内存
  • 指定 scan.next 扫描 HBase 所获取的行数

    • hbase.client.scanner.caching
    • 指定 scan.next 方法获取的默认行数
  • BlockCache 占用 RegionServer 堆内存的比例

    • hfile.block.cache.size
    • 默认 0.4,读请求比较多的情况下可适当调大
  • MemStore 占用 RegionServer 堆内存的比例

    • hbase.regionserver.global.memstore.size
    • 默认 0.4,写请求较多的情况下可适当调大

JVM调优

HBase的经验原则

  • Region 大小在 10-50G
  • cell 大小不超过 10M
  • 1 张表有 1 到 3 个列族,最好 1 个,如果使用多个尽量保证不同时读取多个列族
  • 1 到 2 个列族的表格,设计 50-100 个 Region
  • 列族名称尽量短,不需要像关系型数据库具有准确的名称和描述
  • 如果 RowKey 中时间在最前面,会有大量旧数据存储在不活跃的 Region 中,使用时仅会操作少数的活动 Region,此时需要增加 Region 个数
  • 如果只有一个列族用于写入数据,可以减小写缓存