神机喵算


  • 首页

  • 归档

  • 标签

流式处理架构的“瓶颈”:数据访问(上)

发表于 2016-09-20

写在之前:这是用微软输入法打出来的一篇文章。

背景

在Linkedin,大体架构是:ApacheSamza作为流式处理框架,Apache Kafka作为持久化的订阅/发布消息中间件,Databus监控数据库的变化。

数据访问(Data access)的两种模式

为什么数据访问是规模化的挑战?我们处理的数据访问模式主要分两种:

Read/write data:
这里给出一个在Linkedin中使用read-write数据访问模式的场景。Linkedin许多应用需要推送信息给会员,不管通过email还是通知。为了保证更好的用户体验,尽量不多给会员发email。Linkedin开发一个基于Samza的ATC(Air Traffic Control )应用来控制email和通知送达终端用户。ATC追踪每个会员收到的最后一封邮件的时间,以及所有新的邮件请求的时间。ATC维护着每个会员的状态信息(read/write)。

Read-only data:
同样,给出一个只读数据访问的场景。Linkedin开发一个应用监听会员点击某条广告的事件的时间。这个应用会生成一个AdQuality事件,并高亮点击某些特定广告的会员特征。AdQuality事件会用来训练广告推荐机器学习模型。应用处理AdQuality事件会去查询点击广告的会员的画像。

数据访问的关键特征

除了以上描述的两种访问模式,以下两种数据访问的关键特征也将会极大地影响事件流式处理的架构。

数据访问是否分区?
上面的场景,Kafka的topic是以MemberId分区的。输入事件已经按会员信息(MemberId)分区,那每个事件处理节点仅仅需要访问一个不变的会员数据集。后续我们会看到,如何对已经分区过的数据访问进行缓存优化。
另外一种场景,假设处理每个事件都需要查询Company库获取会员的更多信息,那每个事件处理节点就得查询可能的每个公司。这是访问未分区数据的例子。

数据集的大小
后面会看到,访问一个5M数据集的解决方案跟访问一个5 TB数据集的方案完全不同。比如,你可以把5M数据集完整的存储到每个节点上,然而很显然你不可能对5 TB的数据集做同样的操作。

数据访问的解决方案

下面是展示的是两种常用的数据访问解决方案。

远程存储:这是开发应用的传统的方式。当一个应用处理一个事件,它会远程调用一个隔离的SQL或者No-SQL数据库。这种方法中,写操作总是采用远程调用,但是读数据可以通过本地缓存进行一定程度上的优化。LinkedIn有大量的应用采用这种方法。
另外一种模式,在远程数据库(比如,Oracle)前面前置一个远程缓存(比如,Couchbase)。远程缓存主要用来数据读操作,应用通过Databus之类的工具追踪数据库变化,并替代远程缓存。

本地存储(嵌入式):这种方法是要求事件处理结果存储的位置和事件处理的地方在同一台机器上。极端的情况是所有数据存储访问都是本地,这样效率最高。
Samza天生就是支持嵌入式本地数据库,它支持把RocksDB嵌入你的事件处理器。它是通过Kafka log compacted topic做备份。

也有其它框架,比如Microsoft ServiceFabric,它本身内建支持本地应用存储。ServiceFabric支持持久化数据到本地磁盘,并把备份持久化到其它处理器实例。ServiceFabric持久化会自动备份到Azure storage。

事件到达点 VS 事件处理点

本地存储和远程存储的讨论是相对于事件处理的位置。在选择使用的框架时,事件到达的位置可能与事件处理的位置不在同一位置。

像GoogleDataflow这类框架支持从未分区的输入源(GooglePub-Sub)读取数据。这种模型中,当事件到达时,处理器会先找出当前事件应该归哪个处理器处理,并转发到对应的实际处理器处理。

而Samza、Spark Streaming和Flink之类的流处理框架《实时流处理框架选型:就应该这样“拉出来遛遛”》天然就支持事件的分区(Kafka,Kinesis等),因此不会再做一步转发处理。

如果你的应用得瓶颈是网络带宽或者计算能力,那处理事件的节点和事件的到达在同一个节点不会出现转发,将会极大的提高性能。

抛开在事件处理之前进行事件转发的情况,本文讨论的在事件处理过程中考虑数据访问这些依然对所有的事件处理框架适用。

在路上

海量流式处理没有一个完美的方案,大家可以根据公司的场景进行平衡取舍。

本文主要讲了两种数据访问的模式,以及数据访问关键特征对架构的影响,并给出了相应的解决方案。最后讨论了事件到达和事件处理位置不同带来的解决方案不同。下期更精彩,敬请关注。

PS:最近更新较少,说声抱歉了。这几周工作较忙,主要涉及到的知识点有:CDH集群、Hue、Zeppelin、Caravel for Hive、elasticsearch on hadoop/Hive、Gobblin、Neo4j等,可以后台留言交流。

参考:
[1] https://engineering.linkedin.com/blog/2016/08/stream-processing-hard-problems-part-ii--data-access


侠天,专注于大数据、机器学习和数学相关的内容,并有个人公众号:bigdata_ny分享相关技术文章。

若发现以上文章有任何不妥,请联系我。

image

机器学习模型选择如此简单

发表于 2016-09-20

写在之前:有些概念跟平时你见过的机器学习文章描述的可能不太一样,但是它会给你一种“心里一颤“的感觉。

机器学习的讨论经常会涉及到的问题是:什么机器学习模型才是最好的?是逻辑回归模型,随机森林模型,贝叶斯方法模型,支持向量机模型?抑或是神经网络模型?每个人似乎都有自己心中最爱!但这些讨论试图把机器学习的挑战缩减为单个问题,而这对机器学习的初学者带来了特别严重的误解。

选择一个好的机器学习模型固然重要,但这远远不够。在缺乏领域知识,基本假设,数据选型和实际的应用的情况下,还是值得商榷的。关于机器学习模型评价这部分将留在下一篇文章阐述。

feature engineering (FE),algorithm selection (AS),and parameter tuning (PT);

模型选择

能训练一个“合适”的模型和预测是相当依赖特征工程、参数调优和模型选择。模型选择是机器学习过程比较难的部分,复杂、迭代,经常不断的去“试错”和重复。

模型选择实战

相信大家对Scikit-Learn“如何选择Estimator”里的流程图非常熟悉了,不熟悉的点开链接读读。这个流程图是给初学者一个选择机器学习算法的最佳实践的参考手册。

image

首先,看下我们的数据集(三个数据集参考上篇《可视化图表让机器学习“biu”的一样简单:特征分析》)的样本数是否够50。

1
2
3
print len(occupancy) # 8,143
print len(credit) # 30,000
print len(concrete) # 1,030

很显然这个条件是满足的。接着看下是否我们是否预测类别。对于房屋入住和信用卡数据集来说是判断类别;而混凝土数据集,缓凝土的抗压强度是连续数据,所以预测的是数量。因此,为前两个数据集选择分类器(classifier);为后者选择回归模型(regressor)。

因为我们的两个判断类别的数据集都小于100K,接着按图选择sklearn.svm.LinearSVC(其会将数据集映射到高维特征空间);如果失败,就再选择sklearn.neighbors.KNeighborsClassifier(其会分配样本到它的K领域)。你应该还记得房屋入住数据集单位不统一,所以这里引入scale进行归一化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from sklearn.preprocessing import scale
from sklearn.svm import LinearSVC
from sklearn.neighbors import KNeighborsClassifier
def classify(attributes, targets, model):
# Split data into 'test' and 'train' for cross validation
splits = cv.train_test_split(attributes, targets, test_size=0.2)
X_train, X_test, y_train, y_test = splits
model.fit(X_train, y_train)
y_true = y_test
y_pred = model.predict(X_test)
print(confusion_matrix(y_true, y_pred))
# Divide data frame into features and labels
features = occupancy[['temp', 'humid', 'light', 'co2', 'hratio']]
labels = occupancy['occupied']
# Scale the features
stdfeatures = scale(features)
classify(stdfeatures, labels, LinearSVC())
classify(stdfeatures, labels, KNeighborsClassifier())

对于信用卡数据集使用相同的classify,根据上篇的特征分析经验,我们这里需要先进行数据缺失处理。

1
2
3
4
5
6
7
8
9
10
11
12
features = credit[[
'limit', 'sex', 'edu', 'married', 'age', 'apr_delay', 'may_delay',
'jun_delay', 'jul_delay', 'aug_delay', 'sep_delay', 'apr_bill', 'may_bill',
'jun_bill', 'jul_bill', 'aug_bill', 'sep_bill', 'apr_pay', 'may_pay',
'jun_pay', 'jul_pay', 'aug_pay', 'sep_pay'
]]
labels = credit['default']
stdfeatures = scale(features)
classify(stdfeatures, labels, LinearSVC())
classify(stdfeatures, labels, KNeighborsClassifier())

对于混凝土数据集,我们得决定是否所有的特征都重要,或者只有一部分重要。如果选择所有的特征都很重要,那根据流程图手册路线应该选择sklearn.linear_model.RidgeRegression或者sklearn.svm.SVR(有点类似LinearSVC classifier);如果觉得只有部分特征重要,那就选择sklearn.linear_model.Lasso(其会在预测时舍弃部分特征)或者sklearn.linear_model.ElasticNet(其介入 Lasso方法和Ridge方法之间,L1和 L2惩罚的线性组合)。

下面来试试看咯:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from sklearn.linear_model import Ridge, Lasso, ElasticNet
def regress(attributes, targets, model):
splits = cv.train_test_split(attributes, targets, test_size=0.2)
X_train, X_test, y_train, y_test = splits
model.fit(X_train, y_train)
y_true = y_test
y_pred = model.predict(X_test)
print('Mean squared error = {:0.3f}'.format(mse(y_true, y_pred)))
print('R2 score = {:0.3f}'.format(r2_score(y_true, y_pred)))
features = concrete[[
'cement', 'slag', 'ash', 'water', 'splast', 'coarse', 'fine', 'age'
]]
labels = concrete['strength']
regress(features, labels, Ridge())
regress(features, labels, Lasso())
regress(features, labels, ElasticNet())

正如上面代码展示的那样,Scikit-Learn API使得我们可以快速的发布我们需要的模型,这是Scikit-Learn一个强有力的魔力。

可视化模型

Scikit-Learn流程图非常有用是因为其提供了使用路径地图,但是它不能提供各种模型的函数。因而另外两幅图成为Scikit-Learn的权威:分类器比较和聚类比较。

多个小图形很容易比较出不同的数据集适合的聚类算法:

image

类似的,分类器比较图很好的帮助我们对不同的数据集选择哪种合适的分类器:

image

一般来说,这些图形仅仅是证明了各种模型对不同数据集的优化;但我相信大家都希望有一种可视化工具可以对同一个数据集使用不同的模型的情况进行比较。

模型簇

首先给出“模型”这个词的定义,它包括三方面:

  • 模型簇:比如,linear model,nearest neighbors,SVM,Bayes等模型;
  • 模型形式:比如,sklearn.linear_model.Ridge(),sklearn.linear_model.Lasso(),sklearn.linear_model.ElasticNet等;
  • 拟合模型:比如,Ridge().fit(X_train, y_train)。

模型簇是由特征空间决定的;模型形式是通过试验和统计检验来选择的;拟合模型是由参数调优和机器计算生成的。

我们讨论的这些,模型形式的试验会在后续文章中讲到,这部分是我们期望能够得到回报的想象空间。模型形式是指出我们的特征是如何和模型簇相关。

我喜欢的模型展示工具之一是Dr. Saed Sayad的可交互的“数据挖掘地图”。它比Scikit-Learn的流程图手册综合性更高,并且结合里模型簇和模型形式的概念。除了预测方法外,Sayad地图也包含了统计方法部分。

这里给出一个普适的流程图,它旨在结合Sayad地图和 Scikit-Learn流程图。颜色和等级代表模型形式和模型簇:

image

总结

通过在同一个数据集上比较和对比不同模型的性能,我们能从模型簇中直观的选取模型形式。

下期将会讲解拟合模型和调参的可视化工具。

PS:这周深入的“研究”了下Flume,日志收集利器,但是对Hadoop版本的支持太低,填了不少坑……

希望我写的对部分人有用,如果是这样,请让我知道,谢谢。


侠天,专注于大数据、机器学习和数学相关的内容,并有个人公众号:bigdata_ny分享相关技术文章。

若发现以上文章有任何不妥,请联系我。

image

【Spark 2.0系列】:Catalog和自定义Optimizer

发表于 2016-08-21

Catalog和自定义Optimizer

Spark 2.0系列第一篇见Spark 2.0系列】:Spark Session API和Dataset API,本文将讲解Spark 2.0 的Catalog 和Custom Optimizer。

首先,先了解下RDD 和Dataset 在开发中使用对比。

RDD 和Dataset 使用对比

Dataset API 是RDD 和DataFrame API 的统一,但大部分Dataset API 与RDD API使用方法看起来是相似的(其实实现方法是不同的)。所以RDD代码很容易转换成Dataset API。下面直接上代码:

WordCount
  • RDD
1
2
3
4
5
val rdd = sparkContext.textFile("src/main/resources/data.txt")
val wordsRDD = rdd.flatMap(value => value.split("\\s+"))
val wordsPair = wordsRDD.map(word => (word,1))
val wordCount = wordsPair.reduceByKey(_+_)
  • Dataset
1
2
3
4
5
6
val ds = sparkSession.read.text("src/main/resources/data.txt")
import sparkSession.implicits._
val wordsDs = ds.flatMap(value => value.split("\\s+"))
val wordsPairDs = wordsDs.groupByKey(value => value)
val wordCountDs = wordsPairDs.count()
其它
RDD Dataset
Caching rdd.cache() ds.cache()
Filter val filteredRDD = wordsRDD.filter(value => value ==”hello”) val filteredDS = wordsDs.filter(value => value ==”hello”)
Map Partition val mapPartitionsRDD = rdd.mapPartitions(iterator => List(iterator.count(value => true)).iterator) val mapPartitionsDs = ds.mapPartitions(iterator => List(iterator.count(value => true)).iterator)
reduceByKey val reduceCountByRDD = wordsPair.reduceByKey(+) val reduceCountByDs = wordsPairDs.mapGroups((key,values) =>(key,values.length))
Dataset 和RDD 相互转换
  • RDD
1
val dsToRDD = ds.rdd
  • Dataset

RDD 转换成Dataframe稍麻烦,需要指定schema。

1
2
3
4
val rddStringToRowRDD = rdd.map(value => Row(value))
val dfschema = StructType(Array(StructField("value",StringType)))
val rddToDF = sparkSession.createDataFrame(rddStringToRowRDD,dfschema)
val rDDToDataSet = rddToDF.as[String]
Catalog API

DataSet 和Dataframe API 支持结构化数据分析,而结构化数据重要的是管理metadata。这里的metadata包括temporary metadata(临时表);registered udfs;permanent metadata(Hive metadata或HCatalog)。

早期Spark版本并未提供标准的API访问metadata,开发者需要使用类似show tables的查询来查询metadata;而Spark 2.0 在Spark SQL中提供标准API 调用catalog来访问metadata。

访问Catalog

建立SparkSession,然后调用Catalog:

1
val catalog = sparkSession.catalog
查询数据库
1
catalog.listDatabases().select("name").show()

listDatabases可查询所有数据库。在Hive中,Catalog可以访问Hive metadata中的数据库。listDatabases返回一个dataset,所以你可以使用适用于dataset的所有操作去处理metadata。

用createTempView 注册Dataframe

早期版本Spark用registerTempTable注册dataframe,而Spark 2.0 用createTempView替代。

1
df.createTempView("sales")

一旦注册视图,即可使用listTables访问所有表。

查询表
1
catalog.listTables().select("name").show()
检查表缓存

通过Catalog可检查表是否缓存。访问频繁的表缓存起来是非常有用的。

1
catalog.isCached("sales")

默认表是不缓存的,所以你会得到false。

1
2
df.cache()
catalog.isCached("sales")

现在将会打印true。

删除视图
1
catalog.dropTempView("sales")
查询注册函数
1
2
catalog.listFunctions().
select("name","description","className","isTemporary").show(100)

Catalog不仅能查询表,也可以访问UDF。上面代码会显示Spark Session中所有的注册函数(包括内建函数)。

自定义 Optimizer
Catalyst optimizer

Spark SQL使用Catalyst优化所有的查询,优化之后的查询比直接操作RDD速度要快。Catalyst是基于rule的,每个rule都有一个特定optimization,比如,ConstantFolding rule用来移除常数表达式,具体可直接看Spark SQL源代码。

在早期版本Spark中,如果想自定义optimization,需要开发者修改Spark源代码。操作起来麻烦,而且要求开发者能读懂源码。在Spark 2.0中,已提供API自定义optimization。

访问Optimized plan

在开始编写自定义optimization之前,先来看看如何访问optimized plan:

1
2
3
val df = sparkSession.read.option("header","true").csv("src/main/resources/data.csv")
val multipliedDF = df.selectExpr("amountPaid * 1")
println(multipliedDF.queryExecution.optimizedPlan.numberedTreeString)

上面的代码是加载一个csv文件,并对某一行所有值乘以1。queryExecution 可访问查询相关的所有执行信息。 queryExecution 的optimizedPlan对象可以访问dataframe的optimized plan。

Spark中的执行计划以tree表示,所以用numberedTreeString打印optimized plan。打印结果如下:

00 Project [(cast(amountPaid#3 as double) 1.0) AS (amountPaid 1)#5]
01 +- Relation[transactionId#0,customerId#1,itemId#2,amountPaid#3] csv

所有执行计划是由底向上读取:

  • 01 Relation - 从csv 文件建立一个dataframe
  • 00 Project - 投影操作
编写自定义optimizer rule

从上面的执行计划可以清晰的看到:对一列的每个值乘以1 这里并没有优化。我们知道,乘以1 这个操作应该返回的是值本身,所以可以利用这个特点来增加只能点的optimizer。代码如下:

1
2
3
4
5
6
7
8
object MultiplyOptimizationRule extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
case Multiply(left,right) if right.isInstanceOf[Literal] &&
right.asInstanceOf[Literal].value.asInstanceOf[Double] == 1.0 =>
println("optimization of one applied")
left
}
}

这里MultiplyOptimizationRule扩展自Rule类,采用Scala的模式匹配编写。检测右操作数是否是 1,如果是1 则直接返回左节点。

把MultiplyOptimizationRule加入进optimizer:

1
sparkSession.experimental.extraOptimizations = Seq(MultiplyOptimizationRule)

你可以使用extraOptimizations将定义好的Rule加入 catalyst。

下面实际使用看看效果:

1
2
3
4
val multipliedDFWithOptimization = df.selectExpr("amountPaid * 1")
println("after optimization")
println(multipliedDFWithOptimization.queryExecution.
optimizedPlan.numberedTreeString)

我们看到打印结果:

00 Project [cast(amountPaid#3 as double) AS (amountPaid * 1)#7]
01 +- Relation[transactionId#0,customerId#1,itemId#2,amountPaid#3] csv

说明自定义Optimizer已生效。


侠天,专注于大数据、机器学习和数学相关的内容,并有个人公众号:bigdata_ny分享相关技术文章。

若发现以上文章有任何不妥,请联系我。

image

Spark借助Redis提升45倍处理效率!

发表于 2016-08-21

摘要:时常采用内存数据结构会使得程序更加高效,比如,Spark借助Redis可以提速45倍。

Spark代表着下一代大数据处理技术,并且,借着开源算法和计算节点集群分布式处理,Spark和Hadoop在执行的方式和速度已经远远的超过传统单节点的技术架构。但Spark利用内存进行数据处理,这让Spark的处理速度超过基于磁盘的Hadoop 100x 倍。

但Spark和内存数据库Redis结合后可显著的提高Spark运行任务的性能,这源于Redis优秀的数据结构和执行过程,从而减小数据处理的复杂性和开销。Spark通过一个Redis连接器可以访问Redis的数据和API,加速Spark处理数据。

Spark和Redis结合使用到底有多大的性能提升呢?结合这两者来处理时序数据时可以提高46倍以上——而不是提高百分之四十五。

为什么这些数据处理速度的提升是很重要的呢?现在,越来越多的公司期望在交易完成的同时完成对应的数据分析。公司的决策也需要自动化,而这些需要数据分析能够实时的进行。Spark是一个用的较多的数据处理框架,但它不能做到百分之百实时,要想做到实时处理Spark还有很大一步工作需要做。
此处输入图片的描述
图1

Spark RDD

Spark采用弹性分布式数据集(RDD),可将数据存在易变的内存中或持久化到磁盘上。 RDD具有不可变化性,分布式存储在Spark集群的各节点,RDD经过tansform操作后创建出一个新的RDD。RDD是Spark中数据集的一种重要抽象,具有良好的容错性、高效的迭代处理。

Redis

Redis天生为高性能设计,通过良好的数据存储结构能达到亚毫秒级的延迟。Redis的数据存储结构不仅仅提高内存的利用和减小应用的复杂性,也降低了网络负载、带宽消耗和处理时间。Redis数据结构包括字符串(strings), 散列(hashes), 列表(lists), 集合(sets), 有序集合(sorted sets), bitmaps, hyperloglogs 和 地理空间(geospatial)索引半径查询。

下面来展示Redis的数据结构如何来简化应用的处理时间和复杂度。这里用有序集合来举例,一个以评分(score)大小排序的元素集合。
此处输入图片的描述
图2

Redis能存储多种数据类型,并自动的以评分(score)排序。常见的例子有,按价格排序的商品,以阅读数排序的文章名,股票价格时序数据,带时间戳的传感器读数。
有序集合依赖Redis优秀的内建操作可以实现范围查询、求交集,可以非常快地(O(log(N)))完成添加,删除和更新元素的操作。Redis内建函数不仅减少代码开发,在内存中执行也减小了网络延时和带宽消耗,可达到亚毫秒级的吞吐延迟。特别地,对时序数据集合来讲,有序集合数据结构比使用内存键值对或使用磁盘的数据库,能给数据分析带来数量级上的性能提升。

Spark-Redis connector

为了提高Spark数据分析的能力,Redis团队开发了一个Spark-Redis connector,它使得Spark可以直接使用Redis作为数据源,顺理成章的Spark也能使用Redis的各数据结构,进而显著的提升Spark分析数据的速度。
此处输入图片的描述
图3

为了展示Spark结合Redis所产生的效果,Redis团队拿时序数据集合做基准测试,测试了Spark在不同情况下执行时间范围查询:Spark使用堆外内存;Spark使用Tachyon作为堆外缓存;Spark使用HDFS存储;Spark结合Redis使用。

Redis团队改进了Cloudera的Spark分析时序数据的包,采用Redis有序集合数据结构加速时序数据分析,并且实现Spark访问Redis各类数据结构的接口。此Spark-Redis时序开发包主要做了两件事:

  1. 它让Redis节点与Spark集群的节点自动匹配,确保每个Spark节点都使用本地Redis节点,这样可以明显的优化延迟时间;
  2. 集成Spark DataFrame和Spark读取数据源,使得Spark SQL查询可自动转化,并能借助Redis能有效的恢复数据。

换句话说,使用Spark-Redis时序开发包意味着用户无需担心Spark和Redis两者如何使用。用户使用Spark SQL进行数据分析可以获得极大的查询性能提升。

基准测试

基准测试的时序数据集是跨度32年的1024个股票交易市场按天随机生成的数据。每个股票交易所都有有序数据集,以日期和元素属性(开盘价、最高价、最低价、收盘价等)排序,在Redis中以有序数据结构存储,采用Spark进行数据分析,描述如图4
此处输入图片的描述
图4

在上述列子中,就有序集合AAPL来看,有序数据集合以天为评分(score,以蓝色表示),每天相关的值为一行(Member,以灰色表示)。在Redis中,只要执行一个ZRANGEBYSCORE操作就可以获取一个指定时间范围内的所有股票数据,并且Redis执行此查询要比其他Key/Value数据库快100倍。
从图x可以看到,横向比较各种情况的基准测试,Spark结合Redis执行时间片的查询速度比Spark使用HDFS快135倍、比Spark使用堆内内存或Spark使用Tachyon作为堆外内存要快45倍。
此处输入图片的描述
图5

Spark-Redis其它应用

按照“Getting Started with Spark and Redis”指南,你可以一步步安装Spark集群和使用Spark-Redis包。它提供一个简单的wordcount的例子展示如何使用Spark结合Redis。待你熟练使用后可以自己进一步挖掘、优化其他的Redis数据结构。
Redis的有序集合数据结构很适合时序数据集合,而Redis其他数据结构(比如,列表(lists), 集合(sets)和 地理空间(geospatial)索引半径查询)也能进一步丰富Spark的数据分析。当使用Spark抽取地理空间信息来获取新产品的人群偏好和邻近中心的位置,可结合Redis的地理空间(geospatial)索引半径查询来优化。

Spark支持一系列的数据分析,包括SQL、机器学习、图计算和流式数据。Spark本身的内存数据处理能力有一定的限制,而借着Redis可以让Spark更快的做数据分析。其实Spark的DataFrame和Datasets已经在做类似的优化,先把数据进行结构化放在内存里进行计算,并且Datasets可以省掉序列化和反序列化的消耗。结合Spark和Redis,借助Redis的共享分布式内存数据存储机制,可以处理数百万个记录乃至上亿的记录
时序数据的分析仅仅是一个开始,更多的性能优化可以参见:Spark-Redis。

参考;


侠天,专注于大数据、机器学习和数学相关的内容,并有个人公众号:bigdata_ny分享相关技术文章。

若发现以上文章有任何不妥,请联系我。

image
~

基于Spark DataFrame的图数据库GraphFrame:用Spark SQL查询Graph

发表于 2016-08-21

GraphFrame发布

GraphFrame基于Spark SQL的DataFrame,继承了DataFrame扩展性和高性能。并且可以提供支持Scala、Java和Python等语言的统一API。

什么是GraphFrame

GraphFrame是类似于Spark的GraphX库,支持图处理。但GraphFrame建立在Spark DataFrame之上,具有以下重要的优势:
支持Scala ,Java 和Python API:GraphFrame提供统一的三种编程语言APIs,而GraphX的所有算法支持Python和Java
方便、简单的图查询:GraphFrame允许用户使用Spark SQL和DataFrame的API查询
支持导出和导入图:GraphFrame支持DataFrame数据源,使得可以读取和写入多种格式的图,比如,Parquet、JSON和CSV格式。

社交网络的列子

社交网络中的人是以关系来互相连接的,我们能把这个网络看成一幅图,其中人看成顶点,人与人之间的关系看作是边,如图1所示:
此处输入图片的描述
图1
在社交网络上,每个人可能由年龄和名字,每个人之间的关系也有不同类型。如表1和表2
表1
此处输入图片的描述

表2
此处输入图片的描述

图查询示列

由于GraphFrame的顶点和边存储为DataFrame,可以用DataFrame或SQL来很简单的查询图。
比如,查询有多少年龄大于35的人?
g.vertices.filtr(“age > 35”)
比如,有多少人至少被2个人关注?
g.inDegrees.filter(“inDegree >=2”)

GraphFrames支持所有GraphX的算法,包括PageRank、Shortest Paths、Connected components、Strongly Connected components、Triangle count 和Label Propagation Algorithm(LPA)

GraphFrame和GraphX之间可以无损的来回转换。
val gx: Graph[Row, Row] = g.toGraphX()
val g2: GraphFrame = GraphFrame.fromGraphX(gx)
更相信的GraphFrame API文档见这里。

参考:


侠天,专注于大数据、机器学习和数学相关的内容,并有个人公众号:bigdata_ny分享相关技术文章。

若发现以上文章有任何不妥,请联系我。

image
~

Twitter数据平台的架构演化:分析数据的数据发现和消费

发表于 2016-08-21

导读:本文详细讲述Twitter数据平台的架构演化:分析数据的数据发现和消费。

介绍

Twitter数据平台维护数据系统来支持和管理各种业务的数据生产和消费,包括,公用报表指标(比如,月活跃或者天活跃),个性化推荐,A/B测试,广告营销等。 Twitter数据平台运维着一些全球最大的Hadoop集群,其中有几个集群超过1万个节点,存储着数百PB级数据集,每天有超过10万个日常job作业处理数十PB级数据量。Scalding用来在HDFS上进行数据清洗(ETL:Extract,Transform和Load),数据科学家和数据分析师使用Presto进行交互式查询。MySQL或者Vertica用来作普通的数据集聚合,然后Tableau仪表板展示。Manhattan是Twitter的分布式数据库,其为实时服务服务。

Twitter数据平台团队从刚开始的单个数据分析组,其仅仅拥有核心的数据集,到成百上千的员工(团队)产生和消费这些数据集。这意味着:数据源发现,数据源的完成链(例如,这些数据源是如何产生和消费的)的获取,不考虑数据源的格式、位置和工具的数据集消费和它们整个生命周期内的一致性管理,将成为一个比较现实的问题。

为了满足这些需求,数据平台团队开发出数据访问层(Data Access Layer (DAL)):

  • 数据发现:如何发现最重要的数据集?谁拥有这些数据集?数据集的语义和其它相关的元数据是什么?
  • 数据审计:数据集的创建这或者消费者是哪位?数据集是如何创建这些数据的?数据集的依赖和服务等级协议(SLAs)是什么?数据集的报警规则是什么?数据集和它们的依赖是否一致?数据集的生命周期是如何管理的?
  • 数据抽象:数据的逻辑描述是什么?数据的物理描述是什么?数据存储在哪里?数据副本在哪里?数据格式是什么?
  • 数据消费:各种客户端(比如,Scalding,Presto,Hive等等)是如何交互地使用数据平台的各数据集?

本文中将讨论DAL更高层次的设计和使用,DAL是如何符合整个大数据平台生态,以及分析一些实践和经验教训。

DAL架构设计

为了让数据抽象,DAL有一个逻辑数据集和物理数据集的概念。逻辑数据集代表着数据集要独立于存储类型、存储位置、存储格式和存储副本之外。一个逻辑数据集可以物化到多个存储位置,甚至可以存储到不同的存储系统上,比如,HDFS或者Vertica。物理数据集是和物理存储位置(比如,HDFS namenode,像Vertica或MySQL这样的数据库等)关联的,所有的分片(物理数据块)在物理存储上。根据它们的类型,分片可以是分区或者快照。消费的数据集的元数据(Metadata)存储在物理数据集层。
这种抽象的好处:
a):跨多种物理实现,分组聚合相同的逻辑数据集,更易数据集发现;
b):提供消费数据集所需要的所有信息,包括数据集存储格式,数据集存储位置,以及调用的客户端(比如,Scalding或者Presto)。DAL数据集附加元数据,使得数据发现和数据消费更容易(如下所示)。因为所有的数据访问都通过DAL层,我们使用DAL层获取所有数据集的生产和消费的完整链。(其实跟阿里内部的数据地图差不多的意义)。

下面的架构图显示DAL层是如何配合数据平台的架构:
此处输入图片的描述

在技术栈的底层,核心基础设施包括Hadoop集群和数据库(比如,Vertica,MySQL和Manhattan)。核心数据服务层包括数据访问层(DAL),checkpoit作业状态和依赖的应用状态管理服务,以及job作业延迟报警服务。建立在核心数据服务层之上的是数据生命周期管理,包括数据复制服务和数据删除服务,数据复制服务会管理跨Hadoop集群的数据复制;数据删除服务会根据数据过期策略来删除数据。数据处理工具包括前面提到的Scalding和Presto,也包括自建的ETL工具来实现不同后端(比如,HDFS,Vertica或者MySQL)间的数据转换。

数据展示的UI(外界称为EagleEye)通过核心数据服务层聚合元数据(Metadata),也作为Twitter数据入口的控制。EagleEye用来发现数据集和应用,以及展示它们之间依赖的关系图。

如何发现和消费数据集?

像前面涉及到的,DAL数据集带有额外的元数据,可以轻松做到数据发现和消费。Twitter数据平台团队使用下面的数据资源管理来发现和消费数据集。

发现一个数据集

数据平台提供的数据资源管理中 “Discover Data Sources”模块能发现使用过的数据集,或者搜索感兴趣的数据集。数据资源管理通过DAL层搜索这个数据集。
此处输入图片的描述

数据集的预览信息

如果数据资源管理找到了我们想查询的数据集,它将展示给使用者预览信息。如下图,数据集在HDFS上被找到,数据资源管理中可以看到数据拥有者的描述,以及通过一定的启发式计算数据集的整体健康状态。我们也能预览的元数据字段有数据集的拥有者,数据集的访问频率,代表数据schema的thrift类,HDFS上的物理位置。
此处输入图片的描述

我们也可以检验数据集的schema,包括用户对特殊字段添加的评论。类似的,schema也可以让其它系统(Vertica或者MySQL)发现。
此处输入图片的描述

接下来给个例子,下面给出的代码截图是使用Scalding的例子。注意到,对读者来说,数据存储格式和位置都经过抽象。当通过Scalding运行下面的代码,时间范围提供给DAL,DAL提供数据分片的位置和格式。DAL的Scalding客户端接收刚才的信息,以Hadoop的合适的split数目来构造合适的Cascading Tap。

此处输入图片的描述

数据集的完整链和依赖

数据集资源管理也可以查看生产和消费数据集的作业和作业的完整链。从图中可以看出,有一个job作业产生数据集(图中红框),同时有好几个job作业在消费这个数据集。红框中的数据的生成依赖HDFS上的好几个数据集。并且,如果其中的某个job作业生成数据集延迟了,将会发出告警。
此处输入图片的描述

实践 & 经验教训

在这样Twitter体量的公司,想简化数据集跨所有数据格式和存储系统进行消费是很难的。也有一些像Hive Metastore这样开源的工具可以解决数据抽象,但其只有一部分功能。其它功能,比如数据集的审计和依赖链,管理数据集过期和复制和数据更易消费,也是很重要。

在实现DAL时做出了设计上的选择:把DAL设计成一个抽象和消费层,而不是仅仅聚焦在数据发现和审计。这么做的目的是为了让DAL成为数据集真实的来源,这将帮助我们透明的转换数据格式(比如,从lzo压缩的Thrift转换到Parquet格式),帮助我们使用相同的元数据从各种工具中产生和消费数据(比如,Scalding和Presto),帮助我们进行job作业角色的迁移(因为job作业的所有者和团队角色的不断演化,发生的相当频繁),让数据集的过期管理和复制管理在同一个地方完成。

在DAL刚开始实现阶段,Twitter团队把DAL作为一种library,并且DAL可以直接喝后端数据库会话。这是相当脆弱的,有这么几个原因:安全很弱,因为证书不得不分发到各个客户端;每个客户端都直接连接到数据库是相当困难的;由于客户端的重新发布对所有用户来说,更新是非常缓慢的。数据平台团队移除了这个模块,构建了服务层。

数据平台团队开发DAL涉及到成千上万的job作业需要重新部署依赖(例如,从HDFS到DAL),这个过程中却有正在线上运行的产品。需要严密和严谨的工作而不中断这些job作业。如果仅仅在意数据依赖链和审计,那这个实现将是相当的简单和安全,因为作者可以通过异步或者离线处理。迁移是困难的,耗时的,做好的做法是增量式迁移。但作者知道元数据服务是每个数据平台都需要的,所以,强烈推荐首先要做的是构建一个数据平台基础。

参考


侠天,专注于大数据、机器学习和数学相关的内容,并有个人公众号:bigdata_ny分享相关技术文章。

若发现以上文章有任何不妥,请联系我。

image
~

Airbnb开源ReAir工具,提供PB级数据仓库的迁移和备份

发表于 2016-08-21

摘要:本篇讲述Airbnb的开源ReAir工具,提供PB级数据仓库的迁移和备份。

Airbnb大数据平台架构成为Airbnb公司提升产品决策的关键部分。其Hive数据仓库从2013年中旬的350 TB暴增到11 PB (2015年末统计的数据)。随着公司的成长,数据仓库的可靠性需求日益剧增。我们寻求迁移数据仓库,但现有的迁移工具要么在大数据仓库时有问题,要么就是有很明显的操作负荷,所以Airbnb开发了ReAir解决这种状况。这篇文章将详细介绍ReAir是如何工作的以及它是怎样轻松的实现PB级数据仓库的备份。

背景

最开始Airbnb所有数据存放在单个HDFS/HIve数据仓库。单个命名空间简单、易管理,然而产品复杂后之后即席查询就影响其可靠性。因此,我们把数据仓库分成了两个:一个数据仓库是支持关键产品的任务;另一个是专为即席查询用的。分离这么大的数据仓库将面临两个问题:我们如何容易的迁移庞大的数据仓库?;分离完之后,我们又该怎么保持数据的同步?为了解决这些问题,Airbnb开发ReAir项目并开源给社区。

ReAir对基于Hive元数据的数据仓库进行备份非常有效,可以实现PB级别的集群扩展。使用ReAir极其简单,你只要连接Hive的metastore Thrift服务,通过MapReduce进行数据复制。ReAir可以兼容Hive和Hadoop的各种版本,并支持单机模式操作——只有Hadoop和Hive是必须要有,MySQL DB是用来进行增量复制。由于ReAir同时处理数据和元数据,所以,只要复制任务一完成就可以进行表和分区的查询。

在没有ReAir的时候,迁移Hive数据仓库最典型的用法是启动一个DistCp,并且通过数据库操作手动管理元数据。这种方法既费力又容易出错,甚至会引起脏数据导致数据不一致,有时复制不断变化的数据仓库目录时也会出现问题。另外其它的迁移方法要求指定Hive版本,当我们需要从旧版本的Hive数据仓库迁移就变得异常困难。

ReAir工具包含两种迁移工具:批量迁移和增量式迁移。批量式迁移工具允许你一次复制指定的一系列表,它适合对一个集群的迁移。相对应的,增量式迁移工具可以跟踪数据仓库的变化,并复制生成的对象或者修改的对象。增量式迁移工具适合保持集群间数据的同步,它可以实现秒级数据同步。两种方式更详细的对比请见下面部分。

批量迁移

批量式迁移一般用来备份整个数据仓库。复制的速度和吞吐量取决于reducer的数量和吞吐量,在Airbnb公司,使用500个reducer复制2.2 PB数据仅仅用了24小时。

启动批量迁移的过程非常简单:用户允许shell命令启动一系列MR 任务。其执行协议是从源数据仓库复制Hive表和分区等实体到目的数据仓库。批量式复制过程是占有带宽的,它会探测源数据仓库和目的数据仓库,并只复制两者之间的不同文件。同时元数据也仅仅更新变化过的。这些策略可以保证ReAir工作的高效率。

批量式迁移也有一些挑战。最明显的一个是,产品数据仓库中实体(Hive表和分区)的大小不同,但是迁移的延迟并不能依赖于最大的一个实体。例如,一般的Hive表不到100个分区,然后最大的表有超过10万个分区。为了保证正常的延迟,需要并行运行迁移工作。

为了解决负载均衡的问题,批量式迁移运行一系列MR任务。批量迁移中两个最昂贵的操作是文件复制和元数据更新,所以这些步骤都是在shuffle阶段分布式进行。每个任务都会打印运行日志数据存储在HDFS,通过日志数据能清晰的看到任务的完成情况。

此处输入图片的描述

第一个MR任务从HDFS读取实体标识符并shuffle后均匀的发到reducer。reducer先验证各实体,再对要复制的HDFS目录和实体进行映射;第二个MR任务扫描第一个MR产生的文件目录,并在对应的目录下创建文件。对文件名称进行hash shuffle后发到reducer,一旦shuffle之后reducer执行复制操作;第三个MR任务处理Hive元数据的提交逻辑。

三阶段MR任务计划的扩展性和负载均衡都很好:复制1百万实体的2.2 PB数据消耗大改24小时;同步20 TB 数据量的更新仅花了一个小时。阶段1和阶段3的瓶颈在于Hive元数据MySQL数据库;而阶段2的瓶颈在于网络带宽。

对于我们的迁移,需要开发定制化的文件复制MR来处理HDFS上的数据。然而,对比DistCp这样通用化的工具,在测试ReAir过程中也发现了一些问题:
在复制百万个文件或者整个数据仓库时,MR任务初始化比较慢;
错误率较高,并且需要定制错误处理;
却少易用的日志分析功能。

为了解决这些问题,我们开发了两个MR任务来处理通用HDFS数据复制。第一个任务采用一个启发式的、文件夹遍历的多线程进行一系列分隔。一旦有足够的文件夹,mapper遍历这些文件夹生成文件列表来复制。文件名经过shuffle发送到reducer来决定是否有必要复制。第二个MR任务读取文件列表来复制,并通过一个shuffle进行分布式复制工作。

增量式迁移

在两个集群的情况下,我们需要在两个集群之间共享数据。例如,日志数据需要每天在生产集群聚会,但同时即席查询的用户也需要这些数据。批量式复制任务对于这种需求显得太重,这时需要两个数据仓库间可以按小时进行更新。即席查询集群的用户需要尽快同步生产集群的数据,因此有必要找到一个尽可能快的方法来更新新的内容。虽然有一些开源项目能解决这个问题,但由于Hive版本依赖等问题并不是很理想,后来我们开发增量式复制工具来保证即席查询集群和生产集群的数据同步。

增量式复制工具设计到实体(Hive表和分区)的记录变化,一旦发生变化尽快复制变化量。为了记录源集群上的变化,我们使用了Hive的钩子机制(hook函数)把查询成功的实体写入MySQL数据库。采用这种方法,我们可以跟踪生产集群的所有变化。HDFS上的变化或者元数据的更新都能触发复制。

一旦有了数据库里的变化的记录,我们需要一种方法来复制这些变化到即席查询集群。这个执行机制是通过一个Java服务实现,读取变化的日志中实体,并转化成一系列的行为集合。比如,在源集群成功创建一个表,会在目标集群上翻译成“复制表”的行为。Java进程将调用元数据,并启动MR任务执行。

由于源集群上的变化是序列化到日志里,它将在目的集群中以相同的顺序进行执行。但是,实际情况是,单独的复制一个表或者分区花费几秒钟或者几分钟实在太慢,后面改成多线程并行复制。为了处理并发,所有的行为基于并发限制形成DAG。通常限制都是对于同一个表,比如,在复制分区之前要建立好表。通过并发执行,复制的延迟降低到最小。

用增量式复制可以实现快速、可靠的复制。两个数据仓库之间的同步复制可以实现容灾恢复——如果一个集群宕机,另外一个集群还可以正常提供服务。对比批量式复制,增量式复制对数据仓库体量巨大但改变的数据量比较小的情况更有效率。当前在Airbnb,每天的数据量增长不到2 TB,所以增量式复制比较有意义。
使用批量式迁移和增量式迁移,可以快速的迁移两个集群。我们希望这些工具对社区也一样有用。

参考:


侠天,专注于大数据、机器学习和数学相关的内容,并有个人公众号:bigdata_ny分享相关技术文章。

若发现以上文章有任何不妥,请联系我。

image

Airbnb的大数据平台架构

发表于 2016-08-21

Airbnb成立于2008年8月,拥有世界一流的客户服务和日益增长的用户社区。随着Airbnb的业务日益复杂,其大数据平台数据量也迎来了爆炸式增长。
本文为Airbnb公司工程师James Mayfield分析的Airbnb大数据平台构架,提供了详尽的思想和实施。

Part 1:大数据架构背后的哲理

Airbnb公司提倡数据信息化,凡事以数据说话。收集指标,通过实验验证假设、构建机器学习模型和挖掘商业机会使得Airbnb公司高速、灵活的成长。

经过多版本迭代之后,大数据架构栈基本稳定、可靠和可扩展的。本文分享了Airbnb公司大数据架构经验给社区。后续会给出一系列的文章来讲述分布式架构和使用的相应的组件。James Mayfield说,“我们每天使用着开源社区提供的优秀的项目,这些项目让大家更好的工作。我们在使用这些有用的项目得到好处之后也得反馈社区。”

下面基于在Airbnb公司大数据平台架构构建过程的经验,给出一些有效的观点。

  • 多关注开源社区:在开源社区有很多大数据架构方面优秀的资源,需要去采用这些系统。同样,当我们自己开发了有用的项目也最好回馈给社区,这样会良性循环。
  • 多采用标准组件和方法:有时候自己造轮子并不如使用已有的更好资源。当凭直觉去开发出一种“与众不同”的方法时,你得考虑维护和修复这些程序的隐性成本。
  • 确保大数据平台的可扩展性:当前业务数据已不仅仅是随着业务线性增长了,而是爆发性增长。我们得确保产品能满足这种业务的增长。
  • 多倾听同事的反馈来解决问题:倾听公司数据的使用者反馈意见是架构路线图中非常重要的一步。
  • 预留多余资源:集群资源的超负荷使用让我们培养了一种探索无限可能的文化。对于架构团队来说,经常沉浸在早期资源充足的兴奋中,但Airbnb大数据团队总是假设数据仓库的新业务规模比现有机器资源大。

Part 2:大数据架构预览

这里是大数据平台架构一览图。
此处输入图片的描述
Airbnb数据源主要来自两方面:数据埋点发送事件日志到Kafka;MySQL数据库dumps存储在AWS的RDS,通过数据传输组件Sqoop传输到Hive“金”集群(其实就是Hive集群,只是Airbnb内部有两个Hive集群,分别为“金”集群和“银”集群,具体分开两个集群的原因会在文章末尾给出。)。

包含用户行为以及纬度快照的数据发送到Hive“金”集群存储,并进行数据清洗。这步会做些业务逻辑计算,聚合数据表,并进行数据校验。

在以上架构图中,Hive集群单独区分“金”集群和“银”集群大面上的原因是为了把数据存储和计算进行分离。这样可以保证灾难性恢复。这个架构中,“金”集群运行着更重要的作业和服务,对资源占用和即席查询可以达到无感知。“银”集群只是作为一个产品环境。

“金”集群存储的是原始数据,然后复制“金”集群上的所有数据到“银”集群。但是在“银”集群上生成的数据不会再复制到“金”集群。你可以认为 “银”集群是所有数据的一个超集。由于Airbnb大部分数据分析和报表都出自“银”集群,所以得保证“银”集群能够无延迟的复制数据。更严格的讲,对于“金”集群上已存在的数据进行更新也得迅速的同步到“银”集群。集群间的数据同步优化在开源社区并没有很好的解决方案,Airbnb自己实现了一个工具,后续文章会详细的讲。

在HDFS存储和Hive表的管理方面做了不少优化。数据仓库的质量依赖于数据的不变性(Hive表的分区)。更进一步,Airbnb不提倡建立不同的数据系统,也不想单独为数据源和终端用户报表维护单独的架构。以以往的经验看,中间数据系统会造成数据的不一致性,增加ETL的负担,让回溯数据源到数据指标的演化链变得异常艰难。Airbnb采用Presto来查询Hive表,代替Oracle、 Teradata、 Vertica、 Redshift等。在未来,希望可以直接用Presto连接Tableau。

另外一个值得注意的几个事情,在架构图中的Airpal,一个基于Presto,web查询系统,已经开源。Airpal是Airbnb公司用户基于数据仓库的即席SQL查询借口,有超过1/3的Airbnb同事在使用此工具查询。任务调度系统Airflow ,可以跨平台运行Hive,Presto,Spark,MySQL等Job,并提供调度和监控功能。Spark集群时工程师和数据分析师偏爱的工具,可以提供机器学习和流处理。S3作为一个独立的存储,大数据团队从HDFS上收回部分数据,这样可以减少存储的成本。并更新Hive的表指向S3文件,容易访问数据和元数据管理。

Part 2:Hadoop集群演化

Airbnb公司在今年迁移集群到“金和银”集群。为了后续的可扩展,两年前迁移Amazon EMR到 EC2实例上运行HDFS,存储有300 TB数据。现在,Airbnb公司有两个独立的HDFS集群,存储的数据量达11PB。S3上也存储了几PB数据。

下面是遇到的主要问题和解决方案:
A) 基于Mesos运行Hadoop
早期Airbnb工程师发现Mesos计算框架可以跨服务发布。在AWS c3.8xlarge机器上搭建集群,在EBS上存储3TB的数据。在Mesos上运行所有Hadoop、 Hive、Presto、 Chronos和Marathon。

基于Mesos的Hadoop集群遇到的问题:

  • Job运行和产生的日志不可见
  • Hadoop集群健康状态不可见
  • Mesos只支持MR1
  • task tracker连接导致性能问题
  • 系统的高负载,并很难定位
  • 不兼容Hadoop安全认证Kerberos

解决方法:不自己造轮子,直接采用其它大公司的解决方案。

B) 远程读数据和写数据
所有的HDFS数据都存储在持久性数据块级存储卷(EBS),当查询时都是通过网络访问Amazon EC2。Hadoop设计在节点本地读写速度会更快,而现在的部署跟这相悖。

Hadoop集群数据分成三部分存储在AWS一个分区三个节点上,每个节点都在不同的机架上。所以三个不同的副本就存储在不同的机架上,导致一直在远程的读数据和写入数据。这个问题导致在数据移动或者远程复制的过程出现丢失或者崩溃。

解决方法:使用本地存储的实例,并运行在单个节点上。

C) 在同构机器上混布任务
纵观所有的任务,发现整体的架构中有两种完全不同的需求配置。Hive/Hadoop/HDFS是存储密集型,基本不耗内存和CPU。而Presto和Spark是耗内存和CPU型,并不怎么需要存储。在AWS c3.8xlarge机器上持久性数据块级存储卷(EBS)里存储3 TB是非常昂贵的。

解决方法:迁移到Mesos计算框架后,可以选择不同类型的机器运行不同的集群。比如,选择AWS c3.8xlarge实例运行Spark。AWS后来发布了“D系列”实例。从AWS c3.8xlarge实例每节点远程的3 TB存储迁移数据到AWS d2.8xlarge 4 TB本地存储,这给Airbnb公司未来三年节约了上亿美元。

D) HDFS Federation
早期Airbnb公司使用Pinky和Brain两个集群联合,数据存储共享,但mappers和reducers是在每个集群上逻辑独立的。这导致用户访问数据需要在Pinky和Brain两个集群都查询一遍。并且这种集群联合不能广泛被支持,运行也不稳定。

解决方法:迁移数据到各HDFS节点,达到机器水平的隔离性,这样更容易容灾。

E) 繁重的系统监控
个性化系统架构的严重问题之一是需要自己开发独立的监控和报警系统。Hadoop、Hive和HDFS都是复杂的系统,经常出现各种bug。试图跟踪所有失败的状态,并能设置合适的阈值是一项非常具有挑战性的工作。

解决方法:通过和大数据公司Cloudera签订协议获得专家在架构和运维这些大系统的支持。减少公司维护的负担。Cloudera提供的Manager工具减少了监控和报警的工作。

最后陈述

在评估老系统的问题和低效率后进行了系统的修复。无感知的迁移PB级数据和成百上千的Jobs是一个长期的过程。作者提出后面会单独写相关的文章,并开源对于的工具给开源社区。

大数据平台的演化给公司减少大量成本,并且优化集群的性能,下面是一些统计数据:

  • 磁盘读写数据的速度从70 – 150 MB / sec到400 + MB / sec。
  • Hive任务提高了两倍的CPU时间
  • 读吞吐量提高了三倍
  • 写吞吐量提高了两倍
  • 成本减少百分之七十

本文为Airbnb公司工程师James Mayfield分析的Airbnb大数据平台构架。

参考:


侠天,专注于大数据、机器学习和数学相关的内容,并有个人公众号:bigdata_ny分享相关技术文章。

若发现以上文章有任何不妥,请联系我。

image
~

【Spark 2.0系列】:Spark Session API和Dataset API

发表于 2016-08-15
Dataset:Spark新的抽象层

Spark最原始的抽象基础是RDD(分布式弹性数据集),但是从Spark 2.0 开始,Dataset将成为Spark新的抽象层。所有的Spark开发者将使用Dataset API和Dataframe(Dataset子集)API编写代码,同时RDD API也还是可以用的,不过已降为low-level的API。

Dataframe API 在Spark 1.3时被引入,Dataset是Dataframe的超集。Dataset API和Dataframe API的使用带给Spark更好的性能和灵活性。Spark Streaming也将使用Dataset代替RDD。

Spark Session:Spark 2.0入口

在Spark早期版本,spark context是Spark的入口,RDD API通过context API创建。相应地,streaming由StreamingContext创建;SQL由sqlContext创建;hive由HiveContext创建。而到了Spark 2.0,DataSet和Dataframe API由Spark Session创建。

SparkSession包括SQLContext,HiveContext和StreamingContext的功能。Spark session实际起计算的还是spark context。

下面直接看代码吧。

创建SparkSession

使用工厂模式创建SparkSession。下面是创建SparkSession的代码:

1
2
3
4
val sparkSession = SparkSession.builder.
master("local")
.appName("spark session example")
.getOrCreate()

上面的代码类似于创建SparkContext和SQLContext。如果你需要创建hive context,你可以使用下面的代码创建SparkSession,并支持Hive。

1
2
3
4
5
val sparkSession = SparkSession.builder.
master("local")
.appName("spark session example")
.enableHiveSupport()
.getOrCreate()

enableHiveSupport开启Hive支持后就可以像HiveContext一样使用。

创建Spark Session后,可以来读取数据了。

使用Spark Session读取数据

使用Spark Session读取CSV数据:

1
2
val df = sparkSession.read.option("header","true").
csv("src/main/resources/sales.csv")

上面的代码与SQLContext类似,你可以复用原有SQLContext的代码。

WordCount

下面来个完整的WordCount例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#create SparkSession
val sparkSession = SparkSession.builder.
master("local")
.appName("example")
.getOrCreate()
#read data and convert to Dataset
import sparkSession.implicits._
val data = sparkSession.read.text("src/main/resources/data.txt").as[String]
#split and group by word
val words = data.flatMap(value => value.split("\\s+"))
val groupedWords = words.groupByKey(_.toLowerCase)
#count
val counts = groupedWords.count()
#print results
counts.show()

代码较简单,就不做解释了。

话外音

SQLContext和HiveContext在Spark 2.0中会继续使用,因为Spark是向后兼容的。但很明显,Spark官方文档建议以后使用SparkSession作为入口。


侠天,专注于大数据、机器学习和数学相关的内容,并有个人公众号:bigdata_ny分享相关技术文章。

若发现以上文章有任何不妥,请联系我。

image

基于社区发现算法和图分析Neo4j解读《权力的游戏》

发表于 2016-08-12

导读:几个月前,数学家 Andrew Beveridge和Jie Shan在数学杂志上发表《权力的网络》,主要分析畅销小说《冰与火之歌》第三部《冰雨的风暴》中人物关系,其已经拍成电视剧《权力的游戏》系列。他们在论文中介绍了如何通过文本分析和实体提取构建人物关系的网络。紧接着,使用社交网络分析算法对人物关系网络分析找出最重要的角色;应用社区发现算法来找到人物聚类。

其中的分析和可视化是用Gephi做的,Gephi是非常流行的图分析工具。但作者觉得使用Neo4j来实现更有趣。

导入原始数据到Neo4j

原始数据可从网络上下载,格式如下:

Source,Target,Weight
Aemon,Grenn,5
Aemon,Samwell,31
Aerys,Jaime,18
...

上面是人物关系的之邻接表以及关系权重。作者使用简单的数据模型:

1
(:Character {name})-[:INTERACTS]->(:Character {name})

带有标签Character的节点代表小说中的角色,用单向关系类型INTERACTS代表小说中的角色有过接触。节点属性会存储角色的名字name,两角色间接触的次数作为关系的属性:权重(weight)。

首先创建节点c,并做唯一限制性约束,c.name唯一,保证schema的完整性:

1
CREATE CONSTRAINT ON (c:Character) ASSERT c.name IS UNIQUE;

一旦约束创建即相应的创建索引,这将有助于通过角色的名字查询的性能。作者使用Neo4j的Cypher(Cypher是一种声明式图查询语言,能表达高效查询和更新图数据库)LOAD CSV语句导入数据:

1
2
3
4
5
LOAD CSV WITH HEADERS FROM "https://www.macalester.edu/~abeverid/data/stormofswords.csv" AS row
MERGE (src:Character {name: row.Source})
MERGE (tgt:Character {name: row.Target})
MERGE (src)-[r:INTERACTS]->(tgt)
SET r.weight = toInt(row.Weight)

这样得到一个简单的数据模型:

1
CALL apoc.meta.graph()

此处输入图片的描述

图1 :《权力的游戏》模型的图。Character角色节点由INTERACTS关系联结

我们能可视化整个图形,但是这并不能给我们很多信息,比如哪些是最重要的人物,以及他们相互接触的信息:

1
2
MATCH p=(:Character)-[:INTERACTS]-(:Character)
RETURN p

此处输入图片的描述

图2

人物网络分析

作者使用Neo4j的图查询语言Cypher来做《权力的游戏》图分析,应用到了网络分析的一些工具,具体见《网络,人群和市场:关于高度连接的世界》。

人物数量

万事以简单开始。先看看上图上由有多少人物:

1
MATCH (c:Character) RETURN count(c)

count(c)
107
概要统计

统计每个角色接触的其它角色的数目:

1
2
3
MATCH (c:Character)-[:INTERACTS]->()
WITH c, count(*) AS num
RETURN min(num) AS min, max(num) AS max, avg(num) AS avg_characters, stdev(num) AS stdev

min max avg_characters stdev
1 24 4.957746478873241 6.227672391875085
图(网络)的直径

网络的直径或者测底线或者最长最短路径:

1
2
3
4
5
6
// Find maximum diameter of network
// maximum shortest path between two nodes
MATCH (a:Character), (b:Character) WHERE id(a) > id(b)
MATCH p=shortestPath((a)-[:INTERACTS*]-(b))
RETURN length(p) AS len, extract(x IN nodes(p) | x.name) AS path
ORDER BY len DESC LIMIT 4

len path
6 [Illyrio, Belwas, Daenerys, Robert, Tywin, Oberyn, Amory]
6 [Illyrio, Belwas, Daenerys, Robert, Sansa, Bran, Jojen]
6 [Illyrio, Belwas, Daenerys, Robert, Stannis, Davos, Shireen]
6 [Illyrio, Belwas, Daenerys, Robert, Sansa, Bran, Luwin]

我们能看到网络中有许多长度为6的路径。

最短路径

作者使用Cypher 的shortestPath函数找到图中任意两个角色之间的最短路径。让我们找出凯特琳·史塔克(Catelyn Stark )和卓戈·卡奥(Kahl Drogo)之间的最短路径:

1
2
3
4
// Shortest path from Catelyn Stark to Khal Drogo
MATCH (catelyn:Character {name: "Catelyn"}), (drogo:Character {name: "Drogo"})
MATCH p=shortestPath((catelyn)-[INTERACTS*]-(drogo))
RETURN p

此处输入图片的描述

图3

所有最短路径

联结凯特琳·史塔克(Catelyn Stark )和卓戈·卡奥(Kahl Drogo)之间的最短路径可能还有其它路径,我们可以使用Cypher的allShortestPaths函数来查找:

1
2
3
4
// All shortest paths from Catelyn Stark to Khal Drogo
MATCH (catelyn:Character {name: "Catelyn"}), (drogo:Character {name: "Drogo"})
MATCH p=allShortestPaths((catelyn)-[INTERACTS*]-(drogo))
RETURN p

此处输入图片的描述

图4

关键节点

在网络中,如果一个节点位于其它两个节点所有的最短路径上,即称为关键节点。下面我们找出网络中所有的关键节点:

1
2
3
4
5
// Find all pivotal nodes in network
MATCH (a:Character), (b:Character)
MATCH p=allShortestPaths((a)-[:INTERACTS*]-(b)) WITH collect(p) AS paths, a, b
MATCH (c:Character) WHERE all(x IN paths WHERE c IN nodes(x)) AND NOT c IN [a,b]
RETURN a.name, b.name, c.name AS PivotalNode SKIP 490 LIMIT 10
|a.name |b.name |PivotalNode|

—|—|—|
|Aegon |Thoros |Daenerys |
|Aegon |Thoros |Robert |
|Drogo |Ramsay |Robb |
|Styr |Daario |Daenerys |
|Styr |Daario |Jon |
|Styr |Daario |Robert |
|Qhorin |Podrick|Jon |
|Qhorin |Podrick|Sansa |
|Orell |Theon |Jon |
|Illyrio|Bronn |Belwas |

从结果表格中我们可以看出有趣的结果:罗柏·史塔克(Robb)是卓戈·卡奥(Drogo)和拉姆塞·波顿(Ramsay)的关键节点。这意味着,所有联结卓戈·卡奥(Drogo)和拉姆塞·波顿(Ramsay)的最短路径都要经过罗柏·史塔克(Robb)。我们可以通过可视化卓戈·卡奥(Drogo)和拉姆塞·波顿(Ramsay)之间的所有最短路径来验证:

1
2
3
MATCH (a:Character {name: "Drogo"}), (b:Character {name: "Ramsay"})
MATCH p=allShortestPaths((a)-[:INTERACTS*]-(b))
RETURN p

此处输入图片的描述

图5

节点中心度

节点中心度给出网络中节点的重要性的相对度量。有许多不同的方式来度量中心度,每种方式都代表不同类型的“重要性”。

度中心性(Degree Centrality)

度中心性是最简单度量,即为某个节点在网络中的联结数。在《权力的游戏》的图中,某个角色的度中心性是指该角色接触的其他角色数。作者使用Cypher计算度中心性:

1
2
MATCH (c:Character)-[:INTERACTS]-()
RETURN c.name AS character, count(*) AS degree ORDER BY degree DESC

|character|degree|

—|—|
|Tyrion |36 |
|Jon |26 |
|Sansa |26 |
|Robb |25 |
|Jaime |24 |
|Tywin |22 |
|Cersei |20 |
|Arya |19 |
|Joffrey |18 |
|Robert |18 |
从上面可以发现,在《权力的游戏》网络中提利昂·兰尼斯特(Tyrion)和最多的角色有接触。鉴于他的心计,我们觉得这是有道理的。

加权度中心性(Weighted Degree Centrality)

作者存储一对角色接触的次数作为INTERACTS关系的weight属性。对该角色的INTERACTS关系的所有weight相加得到加权度中心性。作者使用Cypher计算所有角色的这个度量:

1
2
MATCH (c:Character)-[r:INTERACTS]-()
RETURN c.name AS character, sum(r.weight) AS weightedDegree ORDER BY weightedDegree DESC

|character|weightedDegree|

—|—|
|Tyrion |551 |
|Jon |442 |
|Sansa |383 |
|Jaime |372 |
|Bran |344 |
|Robb |342 |
|Samwell |282 |
|Arya |269 |
|Joffrey |255 |
|Daenerys |232 |

介数中心性(Betweenness Centrality)

介数中心性:在网络中,一个节点的介数中心性是指其它两个节点的所有最短路径都经过这个节点,则这些所有最短路径数即为此节点的介数中心性。介数中心性是一种重要的度量,因为它可以鉴别出网络中的“信息中间人”或者网络聚类后的联结点。
此处输入图片的描述

图6中红色节点是具有高的介数中心性,网络聚类的联结点。

为了计算介数中心性,作者使用Neo4j 3.x或者apoc库。安装apoc后能用Cypher调用其170+的程序:

1
2
3
4
5
MATCH (c:Character)
WITH collect(c) AS characters
CALL apoc.algo.betweenness(['INTERACTS'], characters, 'BOTH') YIELD node, score
SET node.betweenness = score
RETURN node.name AS name, score ORDER BY score DESC

|name    |score             |

—|—|
|Jon |1279.7533534055322|
|Robert |1165.6025171231624|
|Tyrion |1101.3849724234349|
|Daenerys|874.8372110508583 |
|Robb |706.5572832464792 |
|Sansa |705.1985623519137 |
|Stannis |571.5247305125714 |
|Jaime |556.1852522889822 |
|Arya |443.01358430043337|
|Tywin |364.7212195528086 |

紧度中心性(Closeness centrality)

紧度中心性是指到网络中所有其他角色的平均距离的倒数。在图中,具有高紧度中心性的节点在聚类社区之间被高度联结,但在社区之外不一定是高度联结的。
此处输入图片的描述

图7 :网络中具有高紧度中心性的节点被其它节点高度联结

1
2
3
4
MATCH (c:Character)
WITH collect(c) AS characters
CALL apoc.algo.closeness(['INTERACTS'], characters, 'BOTH') YIELD node, score
RETURN node.name AS name, score ORDER BY score DESC

|name   |score                |

—|—|
|Tyrion |0.004830917874396135 |
|Sansa |0.004807692307692308 |
|Robert |0.0047169811320754715|
|Robb |0.004608294930875576 |
|Arya |0.0045871559633027525|
|Jaime |0.004524886877828055 |
|Stannis|0.004524886877828055 |
|Jon |0.004524886877828055 |
|Tywin |0.004424778761061947 |
|Eddard |0.004347826086956522 |

使用python-igraph

Neo4j与其它工具(比如,R和Python数据科学工具)完美结合。我们继续使用apoc运行 PageRank和社区发现(community detection)算法。这里接着使用python-igraph计算分析。Python-igraph移植自R的igraph图形分析库。 使用pip install python-igraph安装它。

从Neo4j构建一个igraph实例

为了在《权力的游戏》的数据的图分析中使用igraph,首先需要从Neo4j拉取数据,用Python建立igraph实例。作者使用 Neo4j 的Python驱动库py2neo。我们能直接传入Py2neo查询结果对象到igraph的TupleList构造器,创建igraph实例:

1
2
3
4
5
6
7
8
9
10
from py2neo import Graph
from igraph import Graph as IGraph
graph = Graph()
query = '''
MATCH (c1:Character)-[r:INTERACTS]->(c2:Character)
RETURN c1.name, c2.name, r.weight AS weight
'''
ig = IGraph.TupleList(graph.run(query), weights=True)

现在有了igraph对象,可以运行igraph实现的各种图算法来。

PageRank

作者使用igraph运行的第一个算法是PageRank。PageRank算法源自Google的网页排名。它是一种特征向量中心性(eigenvector centrality)算法。

在igraph实例中运行PageRank算法,然后把结果写回Neo4j,在角色节点创建一个pagerank属性存储igraph计算的值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
pg = ig.pagerank()
pgvs = []
for p in zip(ig.vs, pg):
print(p)
pgvs.append({"name": p[0]["name"], "pg": p[1]})
pgvs
write_clusters_query = '''
UNWIND {nodes} AS n
MATCH (c:Character) WHERE c.name = n.name
SET c.pagerank = n.pg
'''
graph.run(write_clusters_query, nodes=pgvs)

现在可以在Neo4j的图中查询最高PageRank值的节点:

1
2
MATCH (n:Character)
RETURN n.name AS name, n.pagerank AS pagerank ORDER BY pagerank DESC LIMIT 10
|name    |pagerank            |

—|—|
|Tyrion |0.042884981999963316|
|Jon |0.03582869669163558 |
|Robb |0.03017114665594764 |
|Sansa |0.030009716660108578|
|Daenerys|0.02881425425830273 |
|Jaime |0.028727587587471206|
|Tywin |0.02570016262642541 |
|Robert |0.022292016521362864|
|Cersei |0.022287327589773507|
|Arya |0.022050209663844467|

社区发现(Community detection)

此处输入图片的描述

图8

社区发现算法用来找出图中的社区聚类。作者使用igraph实现的随机游走算法( walktrap)来找到在社区中频繁有接触的角色社区,在社区之外角色不怎么接触。

在igraph中运行随机游走的社区发现算法,然后把社区发现的结果导入Neo4j,其中每个角色所属的社区用一个整数来表示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
clusters = IGraph.community_walktrap(ig, weights="weight").as_clustering()
nodes = [{"name": node["name"]} for node in ig.vs]
for node in nodes:
idx = ig.vs.find(name=node["name"]).index
node["community"] = clusters.membership[idx]
write_clusters_query = '''
UNWIND {nodes} AS n
MATCH (c:Character) WHERE c.name = n.name
SET c.community = toInt(n.community)
'''
graph.run(write_clusters_query, nodes=nodes)

我们能在Neo4j中查询有多少个社区以及每个社区的成员数:

1
2
3
MATCH (c:Character)
WITH c.community AS cluster, collect(c.name) AS members
RETURN cluster, members ORDER BY cluster ASC
cluster members
0 [Aemon, Alliser, Craster, Eddison, Gilly, Janos, Jon, Mance, Rattleshirt, Samwell, Val, Ygritte, Grenn, Karl, Bowen, Dalla, Orell, Qhorin, Styr]
1 [Aerys, Amory, Balon, Brienne, Bronn, Cersei, Gregor, Jaime, Joffrey, Jon Arryn, Kevan, Loras, Lysa, Meryn, Myrcella, Oberyn, Podrick, Renly, Robert, Robert Arryn, Sansa, Shae, Tommen, Tyrion, Tywin, Varys, Walton, Petyr, Elia, Ilyn, Pycelle, Qyburn, Margaery, Olenna, Marillion, Ellaria, Mace, Chataya, Doran]
2 [Arya, Beric, Eddard, Gendry, Sandor, Anguy, Thoros]
3 [Brynden, Catelyn, Edmure, Hoster, Lothar, Rickard, Robb, Roose, Walder, Jeyne, Roslin, Ramsay]
4 [Bran, Hodor, Jojen, Luwin, Meera, Rickon, Nan, Theon]
5 [Belwas, Daario, Daenerys, Irri, Jorah, Missandei, Rhaegar, Viserys, Barristan, Illyrio, Drogo, Aegon, Kraznys, Rakharo, Worm]
6 [Davos, Melisandre, Shireen, Stannis, Cressen, Salladhor]
7 [Lancel]
角色“大合影”

《权力的游戏》的权力图。节点的大小正比于介数中心性,颜色表示社区(由随机游走算法获得),边的厚度正比于两节点接触的次数。
现在已经计算好这些图的分析数据,让我们对其进行可视化,让数据看起来更有意义。

Neo4j自带浏览器可以对Cypher查询的结果进行很好的可视化,但如果我们想把可视化好的图嵌入到其它应用中,可以使用Javascript可视化库Vis.js。从Neo4j拉取数据,用Vis.js的neovis.js构建可视化图。Neovis.js提供简单的API配置,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
var config = {
container_id: "viz",
server_url: "localhost",
labels: {
"Character": "name"
},
label_size: {
"Character": "betweenness"
},
relationships: {
"INTERACTS": null
},
relationship_thickness: {
"INTERACTS": "weight"
},
cluster_labels: {
"Character": "community"
}
};
var viz = new NeoVis(config);
viz.render();

其中:

  • 节点带有标签Character,属性name;
  • 节点的大小正比于betweenness属性;
  • 可视化中包括INTERACTS关系;
  • 关系的厚度正比于weight属性;
  • 节点的颜色是根据网络中社区community属性决定;
  • 从本地服务器localhost拉取Neo4j的数据;
  • 在一个id为viz的DOM元素中展示可视化。

译者介绍:侠天,专注于大数据、机器学习和数学相关的内容,并有个人公众号:bigdata_ny分享相关技术文章。
英文原文:Analyzing the Graph of Thrones

1234
侠天

侠天

侠天,专注于大数据、机器学习和数学相关的内容,并有个人公众号:bigdata_ny分享相关技术文章。

37 日志
微博 InfoQ
© 2018 侠天
由 Hexo 强力驱动
主题 - NexT.Mist