raft协议的实现

背景

一致性协议(共识机制) 的理论近年来借助于大数据、分布式系统、区块链等技术的发展,从理论到实践都有了很多成果,
比如Zab的Zookeeper,Paxos的ElasticSearch,Raft的Etcd,还有各种区块链的共识一致性机制,比如 Pow , Pos , Dpos等等,
这些协议和技术都是为了解决一个核心问题: 在分布式环境中,如何保证数据的一致性和可靠性。

什么Raft

Raft 的主要思想和实现在许多篇文章中已经有阐述,这里不再复述,可以参阅
raft-paper 等文章

什么是Etcd

Etcd 分布式的kv存储系统,当然对比一些诸如HBase/Cassandra 甚至Redis的 Nosql ,cache系统,
etcd更多的优势是在于它的可靠性,可靠性的保证就是以raft为核心的一致性协议所提供

Java实现

作者用Java语言,复刻了Golang etcd/raft的代码,用以学习和测试Raft协议,后续会不断添加新功能
raft-java

Raft Core的主要模块和架构

RaftCore

RaftCore是raft协议的核心

  1. 逻辑控制
    接受来自外部的Message请求并处理,是通过方法 Step 来处理。 不同的角色(Leader,Candidate,Follower)有各自的处理方法。
    也包括了,是否发起自身竞争成为Leader的机制以及投票给其他Leader的机制。
  2. RaftStatus
    每一个节点都有存储了本节点关于Raft协议下的各种必要状态。 包括当前的term,leader信息,各类超时信息,以及一些配置参数。
  3. Node Progress
    如果该节点是Leader,会通过 Map 来存储其他节点的状态,包括每一个节点本地commit日志的index等,用来同步数据。
  4. Messages
    再控制流转流程中,需要对外部发送的数据都会存放在Msg数组中

Message

定义了消息的状态信息,包括数据的信息,用以在不同节点中传输。消息是以Protbuf作为序列化

Storeage

定义数据的存储方式。代码中以MemoryStorage来存储,用以测试核心流程
Unstable 和 Snapshot配合存在。

通信模块

通信模块也是可以自由实现,测试代码


作者:3h_william
联系: https://github.com/3h-william
出处:http://3h-william.github.io


Share Comments

Akka集群对于脑裂的策略

背景

一直以为,Akka的文档,是开源产品中写得较为优秀的,不但全面,而且有一定的深度。
在使用Akka Cluster的过程中,有几次关于脑裂的问题的发生,下面,就简单的介绍这篇文档:
Split Brain Resolver

什么是脑裂

1
2
3
4
A fundamental problem in distributed systems is that network partitions (split brain scenarios) and machine crashes are indistinguishable for the observer,
i.e. a node can observe that there is a problem with another node, but it cannot tell if it has crashed and will never be available again or
if there is a network issue that might or might not heal again after a while. Temporary and permanent failures are indistinguishable because decisions must be made in finite time,
and there always exists a temporary failure that lasts longer than the time limit for the decision.

简单的解释一下
脑裂常常发身在网络故障的时候,把一个分布式的系统,分成了至少2个partitions。 并且这两个partitions不能识别出到底谁才是正常的,因为集群需要再一个有限的短时间内做出一个判断,而网络的故障的时间是不定。
最终造成了整个集群处于一种不可用的状态。

策略

所有策略的前提是,集群处于一个稳定的(stable)状态。 也就是说,在这个时间点,已经发生了脑裂,但是,不会有反复 (back and force )节点的 up/down。
基于这个前提,akka提供了一些策略,来应付脑裂的情况

1 Static Quorum

配置文件中,固定一个数值(static quorum) 当集群发生脑裂的时候,任何一个partition中的node >= 过这个值,则认定是有效的。
数值和集群有一关系 总节点数,必须小于 quorum-size * 2 - 1 , 当然,也必须大于 quorum。

这个策略的有点是, 在任何脑裂发生的时候,当产生两个 partition的时候,效果是非常显著的, 因为必然会推举出一个 partition 有效。 另一个无效。
他的缺点是 :
a) quorum的数值是静态配置的,如果集群动态增加node,这个值必须随之而调整。
b) 如果partitions的数量超过2。 比如 10 Nodes,static quorum最小的值是6 ,如果发生脑裂时,将集群划分为3个partition,分别是 (3,3,4) ,那么任何partition都无法运作。

总的来说, Static Quorum相对简单粗放一些,可以应对大部分的异常,也有不少分布式系统使用的是这个策略, 比如 Elasticsearch。

2 Keep Majority

基本规则是保留整个 partition中,拥有majority nodes的partition。
他和 static quorum相比的好处是, 如果整个 cluster 的nodes数量是动态调整的,那么majority也是一个动态的值,这样会避免 static quorum的问题。
但此策略依旧不能避免的是,如果两个 partitions所拥有的node数量是相同的,那么,整个 cluster 将会被终止。

此外,akka提供了基于 keep majority,额外的一个配置项

1
2
3
4
akka.cluster.split-brain-resolver.keep-majority {
# if the 'role' is defined the decision is based only on members with that 'role'
role = ""
}

可以对于某些node定义他的价值,如果在多个partition中,拥有这些 valuable的节点数量较多的话,就会保留这个partition。
这个选项对于一个多角色的集群,某些node是无状态的(stateless)的工作节点,他们相对而言价值较低,而某些节点可能是数据持久化节点,或者是Master节点。
这样就可以区分出来多个Partition的价值,而保留价值高的部分。

3 Keep Oldest

顾名思义,整个集群只保留一个拥有最老节点的partition
当然,有一个特例是,如果某一个partition,只有这一个oldest node,则这个partition会关闭它自己。
这个策略适用于一些 Singleton 的场景,即某些特殊的服务只存在于 oldest的节点上。

4 Keep Referee

这个策略和 Keepr oldest类似,只是把 oldest节点替换成了 referee节点,referee节点可以是cluster中的任意一台。

Stable

最后,回顾一个前提,所有的策略都必须基于集群稳定的情况下,才做出决定。
在Cluster Singleton 和 Cluster Sharding 的过程中,新的Instance必须等到老的instance关闭。
为了减少集群同时存在两个instance的情况, 有一个 duration 来保持集群(脑裂)状态的稳定。
以下是akka的文档给出的建议:


作者:3h_william
联系: https://github.com/3h-william
出处:http://3h-william.github.io


Share Comments

新一代分布式调度系统--雷峰塔架构

背景

分布式任务调度系统工具很多,有Oozie ,Azkaban, airflow… ,大多数,都只能满足部分的需求。对于一个成熟的企业来说,只能当做工具,
不能称之为产品。 对于一些特殊的定制化的需求,是无法满足的。
因此,我们决定自主开发一个任务调度系统,定义为新一代调度系统,吸收各类调度系统的优点,并加以细化,基于Hadoop,不局限于Hadoop,提供完整的工作模式和工作机制。

什么是雷峰塔

雷峰塔是新一代分布式任务调度系统。 可以通过扩展任务类型来丰富调度系统,同时也可以自定义WebUI的部分插件功能。
其中WebUI的部分功能设计是参考了 Azkaban 的设计风格。

对比 Azkaban ?

更灵活的插件框架,满足更深的定制化需求。
多个工作节点在不同机器上协同调度,可以通过自定义负载均衡来实现任务的分发。
增加执行用户概念,一个执行器上可以用不同用户执行 Hadoop/Spark任务。

Overview

雷峰塔是一个分布式的任务调度系统。
其特点是 :
便于部署
插件灵活,扩展性感
Web UI 自定义

整体架构

雷峰塔架构是基于Master/Worker的架构模式

调度系统集群有三个角色

  1. Master
    Master作为任务协调节点,对外Rest API节点。整个集群只能存在一个。
  2. Worker
    Worker节点作为具体任务执行节点。
  3. Web
    界面操作入口

Java + Scala 混合服务

整个系统混合了 Java + Scala的代码编程
Java 主要处理对外接口逻辑层
Scala 主要应用于服务底层
其中,Akka 作为分布式的基础组件
插件模块核心用scala编程,也可以用Java/Scala实现自定义插件扩展

HDFS + Mysql

雷峰塔需要HDFS和Mysql (基本服务,如果有插件使用 Spark等还需要Yarn/Spark集群)的支持

  1. Mysql
    作为整个调度系统的 工作流、任务、调度、权限等使用的数据库。
  2. HDFS
    作为日志记录、调度资源共享的目录

工作流架构

以下是工作流提交的运行流程和机制

Flow /Job / Scheduler /Dependence 说明

  1. Flow (工作流)
    Flow定义为工作流,由其中字段DAG来 描述一个工作的执行流程,由一组Job组成。

  2. Job (任务)
    Job定义为任务,描述具体任务是什么。

  3. Scheduler (调度)
    调度定义,描述了调度的定义

  4. Dependence (资源)
    定义了一种资源的表达,某一个任务所需要的一些 jars,配置文件,脚本,都可以放在Dependence 中,
    其和任务是”多对一”的关系, 一个Dependence可以被多个Job所使用

  5. Execution (执行)
    Flow/Job/Scheduler 是静态概念, 一旦生成可执行的内容,生成与之对应的 FlowExecution,JobExecution和 SchedulerExecution

任务插件框架

核心部分是 Job Framework , 包含了 任务的解析,产生,到分发。
Job FrameWork : 开发者可以根据自己的使用场景,定制自己的任务解析器,包括任务的执行规则,流程等。

Job Framework

部分组件说明 :

  1. Job Plugin Conf 和 Job Plugin Jars
    Job Plugin打包的类,必须在所有的Master/Worker节点都部署相同的内容,如果更新,需要重启整个集群 。

  2. Job Serialized
    Job 描述方式,以Json为明文的存储,JobDefine(Object)为序列化对象生成
    Master根据配置文件、数据库内容、参数等,构建了一个 Job Define,并经过序列化后生成ExecutionRequestContent发送到 Worker,由Worker解析、执行

  3. Job Generator
    Job 生成器 ,在Master端执行,负责生成 Job Define

  4. Job Executor
    Job 执行器,在Worker端执行,负责解析 Job Define,并且执行具体的job

Load Balance

任务分发策略,也支持自定义。
目前支持的策略为:

  1. round-robin : 轮训选择一个节点执行
  2. one-manually : 指定任务在某一个节点执行

作者:3h_william
联系: https://github.com/3h-william
出处:http://3h-william.github.io


Share Comments

项目中,使用akka的一些经验和总结

背景

Akka 是scala中,应该说不仅仅是scala,整个jvm语言领域,较为优秀的并发框架。 但是,由于国内使用的同僚少之又少,所以相关的资料较少。
大部分的资料和经验来自于 官网, akka.io ,还有一些奇奇乖乖的问题通过stackoverflow得意解决,最后,是在是解决不了的问题,需要多尝试,debug,等解决。
总之,最终不论是多少坑也好,多少弯路也好,算是把akka用了起来,构建了一个以akka为分布式集群核心的任务调度集群。每天的 job数量在 3000+,系统也是相对稳定,深感欣慰。

一些关键点

  1. Akka的Cluster,路由发现目前测试是有点问题的,路由的配置个人感觉也是有点繁琐,而且中间环节控制的不够好,所以不推荐使用。
  2. 在定义 Actor的时候,因为构建一个 Actor的时候,和初始化对象是不一样的。 所以如果再初始化 actor中发生异常,是无法在代码中捕获,只能通过parent的message来捕获
    可以参考 question
    比较好的方式是,尽量保证在 初始化 Actor的时候, 减少异常发生的可能性(对数据库的操作)等。
  3. Actor中对于 Future的使用,取决于并发和串行。 该并发的时候用 Future,不该用的时候,就串行。 因为和 Java的线程模型的抽象程度不一样,所有不要用Java中的线程思维理解Akka中Message和Actor
  4. Akka的失败机制。在任何一个 Actor中发生异常,如何处理,默认情况是交给 Parent的supervisor,但是策略也有多种,根据具体使用场景,可以自定义。
  5. 最后,也是一个关键点,理解actor线程的模型,并发和串行,以及scala的异步线程池等概念,是设计复杂系统的基础。


作者:3h_william
联系: https://github.com/3h-william
出处:http://3h-william.github.io


Share Comments

使用Zookeeper过程中遇到客户端stuck的问题和解决建议

背景

最近在查询一个和zookeeper相关的case。追查结果是一个称之为”Reverse DNS lookup”的问题。
可以称之为:反向DNS查找。 https://en.wikipedia.org/wiki/Reverse_DNS_lookup

隐患

zookeeper 3.5以下的版本,会在客户端并发链接的时候,有一定几率发生stuck,随着并发连接的增多,发生概率越大。尤其是第一次创建连接的时候。
(因为zookeeper的client版本和server的版本兼容性,可以先升级客户端,客户端版本>服务端版本,是支持的)

原因:
1) 归根结底,是因为DNS解析的问题。 可参考:
issue 1666

2) 1666 issue在3.4.6已经fix,但是和它相关的另一个issue,只有在3.5才fix
issue 1891

案例与解释

简单的来说,问题出在 “通过IP找HostName”,上, 例如: 通过”10.x.x.x”找到”hadoopXXX” 这个方法上。
在计算机网络中,我们可以用ip也可以用HostName来表明一台唯一的机器,我们也知道有一个叫做DNS的服务来帮助我们将IP转化为hostName. 但是,这一个转换是有开销的
是需要通过上层的DNS服务器或者IP机器本身的网卡来达到映射。
至于细节,可以参考wiki

接下来,我们可以本地来模拟这个过程:
我们用Java可以启动以下代码,其中,注释的两行是两种构建方式。

1
2
3
4
5
6
7
8
9
10
for(int i=0;i<100;i++){
new Thread(new Runnable() {
@Override
public void run() {
// 1 InetSocketAddress ia = new InetSocketAddress( "10.x.x.x",2181);;
// 2 InetSocketAddress ia = new InetSocketAddress( "hadoopXXX/10.x.x.x",2181);
System.out.println(ia.getHostName());
}
}).start();
}
  • 当使用注释第一行的时候,线程是有很大的几率发生stuck.
  • 当使用注释第二行的时候,是不会有这个问题。
    而不凑巧对的是,zookeeper3.5以下的版本中,是存在第一行注释的代码的。因此,问题就产生了。

作者:3h_william
联系: https://github.com/3h-william
出处:http://3h-william.github.io


Share Comments

推荐算法与协同过滤

(注: 只是个人半年前的一些研究总结,在此仅此记录。)

1 背景资料:

推荐系统基于两种策略:
a) 基于内容的过滤(Content-based filtering)
b) 协同过滤 (Collaborative filtering) (CF)
模型:
) 内容过滤需要根据内容创建模板,诸如用物品的title,名字。等等。
当然也可以根据用户建立模板。用户的画像等等。
b) 基于协同过滤。会从历史记录中,找出item-User之间的关系,并建立相关模型。
差异:
从某些场景来看,内容过滤更为优秀,比如一个新用户(没有任何的历史)。或是一个新商品,没有任何的关联数据。这个时候基于Content-Based的过滤,显得更恰当。
然而,在大多数的场景下,协同过滤会变得更为优秀。原因在于,他更充分的考虑到User-Item之间的关系。当然,该方法也使用的更为广泛。

2 协同过滤的集中分类

2.1 基于物品的协同过滤

构建 Item -> User的向量: V(I->U)
计算 各个Verctor之间的相似度
直观理解: 把和你之前喜欢的物品近似的物品推荐给你

2.2 基于用户的的协同过滤

构建 User -> Item的向量: V(U->I)
计算 各个Verctor之间的相似度
直观理解:把就是把与你有相同爱好的用户所喜欢的物品,推荐给你

2.3 比Item-CF和User-CF

ItemCF: 多样性不足 (覆盖率)
UserItem: 长尾性不足。
综合:兴趣点不同。

2.4 于评分的协同过滤 (Slope One)

3 SVD与协同过滤

3.1 传统的SVD

SVD (singular value decomposition) 奇异值分解。

低阶近似:

  1. 给定一个矩阵C,对其奇异值分解:
  2. 构造,它是将的第k+1行至M行设为零,也就是把的最小的r-k个(the r-k smallest)奇异值设为零。
  3. 计算
    特征值数值的大小对矩阵-向量相乘影响的大小成正比,而奇异值和特征值也是正比关系,因此这里选取数值最小的r-k个特征值设为零合乎情理,即我们所希望的C-Ck尽可能的小

3.2 传统的SVD能否应用于协同过滤(基于评分)的模型?

首先有几个问题 :
1. 传统的SVD对于高维稀疏矩阵实惠丢失一些特征信息。
2. 当矩阵不完全时,SVD是无法定义的。 (完全我的理解是有些值是没有定义 ,缺省,用latent factor 可以解)
3. 如果只对已知的信息求解,会非常容易过度拟合。 (我的理解:没有加入Lambda*规则化)
另外,要计算SVD的特征值,对于高维数据来说,也是代价巨大的。

3.3 SVD因式分解,实现协同过滤

公式1 : 因式分解(plain)


设定有那么一个转换,能够将已知的评分Matrix分解成两个Factor矩阵相乘。一个和Item相关,一个和User相关

公式2: 规则化*Lambda (L2)

规则化的目的,是防止过度拟合。
其中Lambda太大,容易低拟合。Lambda太小,容易过拟合。
(这里的Lambda在Spark中需要根据实际的数据样本分布,维度的大小调整)
当然,这里用的是L2拟合。(区别于L1),L2是二次函数拟合。
(L2拟合的目的是使得W的元素很小,接近于0,但不等于0,会使得模型简单。这样使得个别值对全局的印象降低,越简单的模型越不容易产生过拟合)

公式3 基线公式(偏好)


其中:
U是所有投票的均值
bu是用户打分相对于均值的偏差。
bi 是该item被打分相对于均值的偏差
(该公式是ALS算法的最原始形态)

公式4 单个打分


该公式是由 (公式1 ) + (公式3) 推得
单条记录的打分

公式5 整体打分求最小化


由以上同时推得。

公式6 外部数据

为了避免冷启动问题,需要加入外部数据。否则会导致新用户的数据非常少。
那么外部数据时什么? 是基于该用户的一些其他特征(画像)

公式7 动态时间


将时间作为权重考虑进去。
目的是:用户的倾向会随着时间而发生变化,使得更好的捕捉最新的趋势。

公式8 自信度(Implicit , latent factor)


该公式和之后Spark中使用trainImplict的理论基本一致。
输入矩阵是不同的Confidence Levels

几种优化结果:

4 Implict Feedback

显然,如果用户对于物品已经有的评分,这些数据称之为explicit。
但是,有些数据,诸如物品的点击数等,这些并不能直接反应用户对该物品的评分。只是表明了一种倾向,但是,究竟是正向的倾向,还是负向的倾向,是不确定的。

Implicit feedback 的数据和 explicit data的数据是非常不同的:
a) 矩阵是完整定义了的。 (explicit的数据会有Miss)
b) 没有负向的反馈。 (没有明显的倾向)
c) 在0值的时候,是没有交互定义的。
d) 对于这部分数据,必然没有十足的把握(相对于显示的评分,信心不足)
e) 当然,用标准的SVD显然是无法求得这种关系的。

以下几张图可以一目了然其计算法则:

但,Collaborative Filtering for Implicit Feedback Dataset 这篇论文暂时没有搜索到原文。
只能通过代码来分析其计算公式。

1
val confidence = 1 + alpha * abs(rs(i))

所以,在Spark中,参数为alpha的值,是用来调节自信率的。

5 ALS和SGD

即便我们已经有了上述的许多公式,我们如何求解?

5.1 随机梯度下降(SGD)

Stochastic Gradient Descent

在数学上,我们知道。 一个符合convex的函数是可以求得最小值的。
这样一来,我们就有了梯度下降的求解方法,其梯度下降的速度,取决于convex函数的凸性强不强。


这就是梯度下降的迭代公式,当然,该算法已经应用于Mahout中实现。

5.2 ALS算法

Alternating Least Squares (交替最小方差算法)
该算法有两个优点:
1) ALS更容易并行化。
2) ALS对处理隐式数据更方便。

根据5.1的公式中:

这两共公式各自有两个变量,我们可以固定其中一个变量,这样使得其编程一个二次函数,能够被最优化求解。
而ALS就是不断的交替固定Pu和Qi,然后求解另一个变量的最优值。
当然,通常来说,SGD会比ALS更加简单和快速,但是ALS的并行性比较好。如果矩阵是稀疏矩阵,那么ALS更有优势

6 算法的检验

RMSE/MSE/MAPK
待续….

7 Reference

spark,als-ws 基础
http://www2.research.att.com/~volinsky/papers/ieeecomputer.pdf

协同过滤部分:
http://www.ibm.com/developerworks/cn/web/1103_zhaoct_recommstudy2/index.html
http://www.52ml.net/297.html

svd 部分:
http://blog.csdn.net/wangran51/article/details/7408414
http://blog.csdn.net/qianhen123/article/details/40077873

规则化:
http://blog.csdn.net/u012162613/article/details/44261657

Latent-factor-models
http://www.slideshare.net/sscdotopen/latent-factor-models-for-collaborative-filtering


作者:3h_william
联系: https://github.com/3h-william
出处:http://3h-william.github.io


Share Comments

使用sqoop生成ORC格式的文件,并给Hive加载

背景

在当前版本: sqoop-1.4.5(CDH540)中的sqoop并不支持导出格式为orcfile的格式。因此,如果我们想将数据从DB=>HDFS,并生成ORC格式,只有两个办法。

-> a) 使用HCatalog服务,目前没有测试过,看文档是可以生成ORC的格式。
-> b) 原始办法,通过Hive的insert命令将一张原本不是ORC格式的数据,导入到另一个ORC格式中,这样就有了ORC格式的文件。 但该方法会造成时间、资源消耗翻倍。

DIY

既然如此,自己动手,丰衣足食。介于近年来对于sqoop的使用经验,很方便的定位到了几个地方,并加入了一些类和方法,最终达到了目的。当然,因为我们的需求目前仅仅是从DB = > HDFS,即只是增加了import方法中的参数,并没有在export => DB中增加。

具体代码可以参考我已经提供的版本 link
使用ant编译后,替换原来的sqoop jar即可。

关键点

-> 1) 使用hive的提供的TypeInfoUtils 构建ORC file的Schema.并传递到每一个Map中
-> 2) 使用 org.apache.hadoop.hive.ql.io.orc.OrcNewOutputFormat 作为输出格式


作者:3h_william
联系: https://github.com/3h-william
出处:http://3h-william.github.io


Share Comments

HBase Coprocessor问题引起异常Balance

问题背景

最近发现HBase集群的文件compaction次数明显增加。经常到时M/R job扫描snapshot文件的时候发生文件找不到,为此trace了一下日志,大致发现了问题。
balance统计

问题大致流程Trace

  • 1) 全量索引M/R job 失败
  • 2) HFile文件不存在
  • 3) HFile发生了compaction,文件被代替
  • 4) HFile文件数量达到默认数量(3个) 触发Minor compaction
  • 5) Memstore被flush,生成了HFile
  • 6) HRegion 被close之前会触发flush (包括snapshot,倒到Memory的阀值也会触发),但close是主要原因。
  • 7) Cluster的Balance操作需要 Close & Move HRegion
  • 8) StochasticLoadBalancer HBase的Balance策略机制类,触发条件为:

    • a) 达到Balance触发条件 Max Region > AVG(Region数量) +1 OR Min Region <AVG(Region数量) -1
    • b) 调整后的Cost Function值比原来的小(策略更为优秀)
      附: CF的值:
      new ReadRequestCostFunction(conf),
      new WriteRequestCostFunction(conf),
      new MemstoreSizeCostFunction(conf),
      new StoreFileCostFunction(conf)
  • 9) 新增的Region数量并不多。不会经常触发balance

  • 10) Balance的大部分Region都是旧的Region,并且 无法调整到最优状态,导致不断调整,死循环。如下某一个时刻的Region分布:
    region分布

只要Region数小于73或者大于76,就会满足条件a。
并且,如果机会调整之后的cost比较小,就会满足b。

  • 11) 调整不到平衡点,可能是因为某些调整失败。
  • 12) 没有加载Phoenix coprocessor的机器如果balance 一些有该coprocessor的Region就会失败。
  • 13) 大规模调整从9/3 18:00开始第一个balance周期。
  • 14) 9/3 17:57 分执行了truncate的操作。

Balance Trace

因为日志涉及公司信息,不再给出,大体步骤如下:
需要调整的Region号: XXXXX

  • Step 1: 将该Region从server12迁移到server24 (这台机器已dead)
  • Step 2: 因为迁移server24失败,所以选择另一台机器进行迁移,选择了server34
  • Step 3: server34加载该Region失败,因为coprocessor的原因。
  • Step 4 : 选择server32迁移,也失败了,最终选择了server19成功

Balance的总结:

  • a) 我们可以看到,Balance的策略是失败的,因为最初是想将这个Region迁移到server34,而最终却是迁移到了server19。
  • b) 这样的balance,不仅仅是失败,而且会对server19增加了负担。
  • c) 从HBase代码我没有看到对这种情况的处理方式。但理论上一次失败是没有关系的,只要第二次Balance成功即可。
  • d) 但是,某些机器注定无法收容某些Region,所以,注定永远无法调整到一个平衡的balance掉。

遗留问题:
从日志中有一个问题: server24这台机器在9/3 15:29已经dead,为何balance策略仍然会选择这台机器。是不是又bug…


作者:3h_william
联系: https://github.com/3h-william
出处:http://3h-william.github.io


Share Comments

hbase bulkload 相关整理

由于近期Team内有同学用到bulkload,整理了一下过往我用到的bulkload的相关使用经验
基于hbase 0.900.94版本做一些简单的分析,可供参考。

1. 概述

官方说明
[0.94官方说明] (http://hbase.apache.org/0.94/book/arch.bulk.load.html)

简单的说,bluk load 是能够批量加载hfile => hbase的技术。
服务端,RegionServer处理客户端的bulkload请求。
客户端,向RegionServer发起blukload请求。

1.1. 案例

在2013年的maillog项目中,我们使用了bulk load技术。批量加载了从M/R生成的HFile数据。
因为是预先生成的HFile和处理过的,所以是一个HFile对应1个Region

加载数据量:

HFile(Region) 总数: 100个
占用磁盘大小 : 450G (snappy压缩后100G)
消耗时间 : 在数据已经存在与HDFS的情况下,每一个HFile加载时间 小于 1秒

1.2 特性

a) bulkload 在加载的同时,并不会影响正常读写。(如果写文件造成分区变化,对bulkload有影响)
b) bulkload 的加载是秒级的,在同一个HDFS cluster中,会将数据move。
c) bulkload 也支持从不同的hdfs cluster copy数据(不同版本没有测试过),同一个版本可行,它会把原来的move操作替换为copy,效率会降低。
(注意: 如果你的数据没有move而是copy,请check一下hdfs的URI写的是否正确一致)

1.3. 空表和非空表的策略

场景定义

bulkload 适用于初始化整个HBase表(空表), 也适用于对于增量数据的导入(非空表)

策略

显然,对于这两种场景的操作手法和策略是不同的。
对于任何的bulkload操作策略来说,评判目标应该是一致的: 效率和影响
效率 指的是如何在短时间内将数据load。
影响 指的是如何避免在做bulkload的时候,减少对集群的影响,避免hbase触发split/merge等heavy的操作。

操作手法一般包括:

a) HFile的预处理 通过M/R Job或者其他手段预先处理HFile,可以定义HFile大小,splitkey规则,可以在非空表的情况下,极大避免对集群的影响。
b) 预分区 通常来说,对于空表的bulkload应该是不会触发集群的split、merge等操作的。通过预分区可以有效的规避这一点。和Hbase 的split size结合使用。

2. Bulkload客户端流程

主要围绕LoadIncrementalHFiles 类来说明基本的流程。
可参考我上传的代码,对比以下链接的代码
[0.90版本官方提供的LoadIncrementalHFiles类] (http://trgit2/wz68/importdb_blukload/blob/master/src/main/java/com/newegg/email/importdb/bulkload/tool/LoadIncrementalHFiles.java)
[基于0.90版本修改后支持客户端并发的LoadIncrementalHFilesConcurrency类] (http://trgit2/wz68/importdb_blukload/blob/master/src/main/java/com/newegg/email/importdb/bulkload/tool/LoadIncrementalHFilesConcurrency.java)
总的来说,这部分代码比较简单,客户端代码很容易读懂。

Step 1. main entry

LoadIncrementalHFiles的entry方法,会接收两个参数,一个是表的名字,另一个是hfile文件或文件夹路径(此处可传递文件夹)
这里的LoadIncrementalHFiles是implements于org.apache.hadoop.util.Tool,但是因为这里只是借助于tool这个工具,实质上并没有启动任何M/R job,因此我们看到的只是一个local的app启动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// entry 方法
public static void main(String[] args) throws Exception {
int ret = ToolRunner.run(new LoadIncrementalHFiles(HBaseConfiguration.create()), args);
System.exit(ret);
}
// tool的默认加载方法run
public int run(String[] args) throws Exception {
if (args.length != 2) {
usage();
return -1;
}
String dirPath = args[0]; // ---> 路径(文件、文件夹)
String tableName = args[1]; // ---> table name
boolean tableExists = this.doesTableExist(tableName);
if (!tableExists)
this.createTable(tableName, dirPath);
Path hfofDir = new Path(dirPath);
HTable table = new HTable(conf,tableName);
doBulkLoad(hfofDir, table); // ---> 调用bulkload方法
return 0; // ---> Run方法中并没有调用M/R job client,因此只是一个local app,不能算是M/R的job
}

Step 2 . 串行化和并行化 doBulkLoad

串行化和并行化目前已经在高版本应该解决了。

a) 0.90版本的串行化: 以下这段是官方提供的代码,我们可以看到tryLoad是循环的。所以无法并发去做。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void doBulkLoad(Path hfofDir, HTable table) throws TableNotFoundException, IOException {
HConnection conn = table.getConnection();
if (!conn.isTableAvailable(table.getTableName())) {
throw new TableNotFoundException("Table " + Bytes.toStringBinary(table.getTableName()) + "is not currently available.");
}
Deque<LoadQueueItem> queue = null;
try {
queue = discoverLoadQueue(hfofDir);
while (!queue.isEmpty()) { // ----> 串行化消费队列,提交bulkload
LoadQueueItem item = queue.remove();
tryLoad(item, conn, table.getTableName(), queue);
}
} finally {
if (queue != null && !queue.isEmpty()) {
StringBuilder err = new StringBuilder();
err.append("-------------------------------------------------\n");
err.append("Bulk load aborted with some files not yet loaded:\n");
err.append("-------------------------------------------------\n");
for (LoadQueueItem q : queue) {
err.append(" ").append(q.hfilePath).append('\n');
}
LOG.error(err);
}
}
}

b) 基于0.90版本修改的并行化方案,通过构建线程池来完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public boolean doBulkLoad(Path hfofDir, HTable table) throws TableNotFoundException, IOException {
boolean isSuccess=false;
HConnection conn = table.getConnection();
if (!conn.isTableAvailable(table.getTableName())) {
throw new TableNotFoundException("Table " + Bytes.toStringBinary(table.getTableName()) + "is not currently available.");
}
List<Deque<LoadQueueItem>> queueList = null;
// Deque<LoadQueueItem> queue = null;
queueList = discoverLoadQueue(hfofDir);
int threadNums=conf.getInt(CONCURRENCY_NUMS, 1);
LOG.info("********* thread nums = "+threadNums);
ExecutorService executorService = Executors.newFixedThreadPool(threadNums); // -----> 并行化解决方案
List<Future<Boolean>> resultList = new ArrayList<Future<Boolean>>();
try{
for (Deque<LoadQueueItem> queue : queueList) {
Future<Boolean> future =executorService.submit(new LoadThread(queue, conn, table.getTableName()));
resultList.add(future);
}
for (Future<Boolean> result : resultList) {
if (!result.get(100, TimeUnit.MINUTES)) {
LOG.error("validatorThread return false");
return false;
}
}
isSuccess = true;
}catch(Exception e){
LOG.error(e);
}finally{
executorService.shutdown();
return isSuccess;
}
}

c) 官方0.94的版本的代码片段,已经提供了并行化,并且大幅修改了代码,开启并行化,需要手动设置参数: hbase.loadincremental.threads.max,默认值是机器的cpu cores相关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void doBulkLoad(Path hfofDir, final HTable table)
throws TableNotFoundException, IOException
{
final HConnection conn = table.getConnection();
if (!conn.isTableAvailable(table.getTableName())) {
throw new TableNotFoundException("Table " +
Bytes.toStringBinary(table.getTableName()) +
"is not currently available.");
}
// initialize thread pools ----> 并行化处理加载HFile
int nrThreads = cfg.getInt("hbase.loadincremental.threads.max",
Runtime.getRuntime().availableProcessors());
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
builder.setNameFormat("LoadIncrementalHFiles-%1$d");
ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads,
60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
builder.build());
((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true);
......

Step 3. 查看hfile边界,以决定是否需要split

这个地方就是是否会在客户端发生split的原因
直接参考0.94的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/**
* Attempt to assign the given load queue item into its target region group.
* If the hfile boundary no longer fits into a region, physically splits
* the hfile such that the new bottom half will fit and returns the list of
* LQI's corresponding to the resultant hfiles.
*
* protected for testing
*/
protected List<LoadQueueItem> groupOrSplit(Multimap<ByteBuffer, LoadQueueItem> regionGroups,
final LoadQueueItem item, final HTable table,
final Pair<byte[][], byte[][]> startEndKeys)
throws IOException {
final Path hfilePath = item.hfilePath;
final FileSystem fs = hfilePath.getFileSystem(getConf());
HFile.Reader hfr = HFile.createReader(fs, hfilePath,
new CacheConfig(getConf()));
final byte[] first, last;
try {
hfr.loadFileInfo();
first = hfr.getFirstRowKey();
last = hfr.getLastRowKey();
} finally {
hfr.close();
}
LOG.info("Trying to load hfile=" + hfilePath + ----> 从hfile中得到first 和 last的信息
" first=" + Bytes.toStringBinary(first) +
" last=" + Bytes.toStringBinary(last));
if (first == null || last == null) {
assert first == null && last == null;
// TODO what if this is due to a bad HFile?
LOG.info("hfile " + hfilePath + " has no entries, skipping");
return null;
}
if (Bytes.compareTo(first, last) > 0) {
throw new IllegalArgumentException(
"Invalid range: " + Bytes.toStringBinary(first) +
" > " + Bytes.toStringBinary(last));
}
int idx = Arrays.binarySearch(startEndKeys.getFirst(), first, ---> 从整个集群的statEndKeys的多列中搜索是否满足该HFile的Region
Bytes.BYTES_COMPARATOR);
if (idx < 0) {
// not on boundary, returns -(insertion index). Calculate region it
// would be in.
idx = -(idx + 1) - 1;
}
final int indexForCallable = idx;
boolean lastKeyInRange =
Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 ||
Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY);
if (!lastKeyInRange) {
List<LoadQueueItem> lqis = splitStoreFile(item, table, ---> 如果不满足,就会触发客户端split,调用splitStoreFile方法这里就时效率慢的原因!!!!
startEndKeys.getFirst()[indexForCallable],
startEndKeys.getSecond()[indexForCallable]);
return lqis;
}
// group regions.
regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item); ---> 如果HFile满足区间,或者是已经将不满足区间的HFile切分满足,那么就加到RegionGroup中,等待加载.
return null;
}

Step 4. 客户端split的两个方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// 决定split的范围,数量,分区
protected List<LoadQueueItem> splitStoreFile(final LoadQueueItem item,
final HTable table, byte[] startKey,
byte[] splitKey) throws IOException {
final Path hfilePath = item.hfilePath;
// We use a '_' prefix which is ignored when walking directory trees
// above.
final Path tmpDir = new Path(item.hfilePath.getParent(), "_tmp");
LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
"region. Splitting...");
String uniqueName = getUniqueName(table.getTableName());
HColumnDescriptor familyDesc = table.getTableDescriptor().getFamily(item.family);
Path botOut = new Path(tmpDir, uniqueName + ".bottom");
Path topOut = new Path(tmpDir, uniqueName + ".top");
splitStoreFile(getConf(), hfilePath, familyDesc, splitKey,
botOut, topOut);
// Add these back at the *front* of the queue, so there's a lower
// chance that the region will just split again before we get there.
List<LoadQueueItem> lqis = new ArrayList<LoadQueueItem>(2);
lqis.add(new LoadQueueItem(item.family, botOut));
lqis.add(new LoadQueueItem(item.family, topOut));
LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); // 如果split成功,我们会看到这些日志...
return lqis;
}
// 真正调用执行split,copy....
/**
* Split a storefile into a top and bottom half, maintaining
* the metadata, recreating bloom filters, etc.
*/
static void splitStoreFile(
Configuration conf, Path inFile,
HColumnDescriptor familyDesc, byte[] splitKey,
Path bottomOut, Path topOut) throws IOException
{
// Open reader with no block cache, and not in-memory
Reference topReference = new Reference(splitKey, Range.top);
Reference bottomReference = new Reference(splitKey, Range.bottom);
copyHFileHalf(conf, inFile, topOut, topReference, familyDesc); -----> 客户端split的真正copy的方法
copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
}

3. Q&A

1) Q: 目前的工具是否是并行化加载?

通过上面的分析,我们可以看到,从0.94开始就已经是有并行化处理了,因此我们不需要再像我之前的那种方式去做了。只要定义好自己的线程池大小就ok.   
但我们也需要注意的是,由于**LoadIncrementalHFiles** 工具并不是M/R job,所以不要指望能直接通过M/R来提高效率。  

2) Q: 如果避免split?

split有两个定义。一个是客户端的split,另一个是Region Server的split。  
在空表情况下,通过预分区的方式,只要key分的准确,就不会触发客户端的split。结合hbase 原本的 split size 参数,使HFile对应的HRegion大小合适,不会造成RSr的split.

3) Q: 对空表的bulkload的过程中,可否正常读写?

读肯定是没有问题的,写的话,如果数据量不大,不触发Region的split,则也没有关系。  

4) Q: 为什么bulkload的过程中,在数据已经准备的情况下,效率那么快,秒级别加载,并且不会disable Region?

前面只分析了Bulkload的客户端代码,如果去翻阅一下服务端代码我们可以知道。对于RS来说,bulkload就两件事情,第一件是mv文件,第二件是在RS的memory里加上新HFile的meta信息。  

5) Q: 对于非空表的增量数据bulkload,基本策略有哪些?

参考步骤如下:  
a)  对新增HFiles大小做预估计,如果当前cluster可以容纳,则不需要预分区,否则,可以先预分区。  
b)  通过M/R对HFile做符合当前集群分区情况进行切分。  
c)  加载。  

对此,官方设计中有一段说明:
If the region boundaries have changed during the course of bulk load preparation, or between the preparation and completion steps, the completebulkloads utility will automatically split the data files into pieces corresponding to the new boundaries. This process is not optimally efficient, so users should take care to minimize the delay between preparing a bulk load and importing it into the cluster, especially if other clients are simultaneously loading data through other means.

大致意思是,即使客户端工具能够完成split的工作,但并不是最有效的方式,用户应该注意边界问题。


作者:3h_william
联系: https://github.com/3h-william
出处:http://3h-william.github.io


Share Comments

为ElasticSearch 添加支持跨DC的特性

背景

无意间发现,去年年初提到社区的改动,在去年8月得到回复,虽然没有被采纳…
https://github.com/elastic/elasticsearch/pull/4651#issuecomment-51578376
对方提出两点:

  1. 不赞成ElasticSearch跨越数据中心构建。
  2. 认为同步和延迟会造成比较大的问题。

不过,目前为止,我认为这个特性有它的价值,所以在此分享一些当初的设计思想和改造方法。同时,透露一下,这个改造在公司内部也正常稳定运行了近一年多

改进点

Elastic Search的节点设计思想是,每一组索引分片,由一个primary做write操作,而前天的replica做读操作。
那么,如果对于一个跨数据中心的elastic Search设计,将会因为以下的场景,带来一些问题:

  1. 如果所有的写操作是由一个数据中心完成(事实上,大多数情况都是如此),由一个主的数据中心处理数据,然后分发到其他的数据中心(跨location)
  2. 而ES的Primary的规则,并不支持将Primary都分配在同属于一个location的Node,而是会根据其自身的集中策略来分配
  3. 根据以上两点,最终的结果是: 如果由两个数据中心,Primary可能分配到任何的数据中心,造成write操作会分散在不同的location

而我们要做的是,提供一种策略,使得primary分配到同一个location下的index分片

改进效果

分布

这种策略可以支持任何的情况:

  1. primary所在的location全部down掉,只剩下replica的location
  2. 任意顺序启动Node.
  3. 任意一种HA的可能情况。

代码部分,可参考我已经提交的版本

https://github.com/3h-william/elasticsearch-dc-version0.0.1


作者:3h_william
联系: https://github.com/3h-william
出处:http://3h-william.github.io


Share Comments