神机喵算


  • 首页

  • 归档

  • 标签

《Deep Learning with Python》第一章 1.1 人工智能、机器学习和深度学习

发表于 2017-12-28

第一章 什么是深度学习?

本章涉及的知识点:

  • 基本概念的高层次(High-level)定义
  • 机器学习的发展历程
  • 深度学习兴起背后的关键因素以及未来的展望

过去几十年,人工智能(artificial intelligence,AI)一直是媒体强烈炒作的主题。机器学习、深度学习和人工智能也经常出现在无数非技术刊物上。人们构想将来智能聊天机器人、自动驾驶汽车和虚拟助理的工作生活的画面——在昏暗的灯光下,人类的工作很少,而大部分经济活动都有机器人或者AI智能实体。对于当前或者未来机器学习的从业者来讲,需要从吹捧过度的新闻中意识到改变世界的新产品。你们的将来将承担重要的任务:读完本书后,你将会成为开发AI智能实体的开发者。接着解决这几个问题:目前深度学习能实现什么?深度学习的意义如何?人类下一步该如何做?你相信关于人工智能的宣传吗?

本章介绍人工智能、机器学习和深度学习相关知识。

1.1 人工智能、机器学习和深度学习

首先,当人们讨论AI时要清楚谈论的是什么?什么是人工智能、机器学习和深度学习(见图1.1)?它们之间又有什么关系呢?

image

图1.1 人工智能、机器学习和深度学习

1.1.1 人工智能

人工智能诞生于1950年代,当时少数计算机科学的先行者开始探讨计算机是否能够像人类一样“思考”——这个问题到现在还在探索。其简明的定义如下:像人类一样自动化完成智力任务。比如,AI通常认为包括机器学习和深度学习的领域,但是也包含许多非学习的方法。例如,早期的棋类程序只涉及程序设定的硬编码规则,没用到机器学习。在相当长一段时期内,许多专家相信:程序处理足够大的基于知识的显式规则集合能达到人类水平的智能。这种方法称之为符号型AI(symbolic AI),它是1950年代到1980年代主流的AI,并在1980年代专家系统方面快速达到巅峰。

虽然符号型AI能解决定义明确的、逻辑性的问题,比如棋类问题,但是却不能解决更复杂、模糊类的问题,比如,图像分类、语音识别和语言翻译。随之而来的是机器学习这种新方法取代了符号型AI。

1.1.2 机器学习

在英国维多利亚时代,埃达·洛夫莱斯伯爵夫人(Ada Lovelace)是查尔斯·巴贝奇(Charles Babbage)的好友和合作者。巴贝奇是分析机(Analytical Engine)的发明者。该分析机是公认的第一个机械式通用电脑。虽然预言性的远见超越了当时的时代,但是,在1830年代到1840年代设计的分析机并不是真正意义上的通用电脑。因为通用计算机的概念那时尚未出现,它仅仅是一种自动的数学计算分析,因此命名为分析机。在1843年,埃达注记分析机,“分析机谈不上能创造什么东西。但是能做我们命令它做的任何工作……它的职责是帮助我们去实现已知的事情。”

在1950年,AI先行者阿兰·图灵(Alan Turing)在他的著作《计算机器和智能》中引用这个注记作为“洛夫莱斯伯爵夫人的异议”,并提出图灵测试(Turing test)的概念。图灵思考通用计算机是否有学习和创造的能力,并得出下面的结论:

机器学习起因于以下问题:对于计算机来说,除“我们命令它做的任务”之外,它能自我学习完成特定任务吗?计算机能做的工作会让我们感到意外吗?除了程序员人为制定规则的数据处理外,计算机从数据中能自我学习出规则吗?

上述问题打开了新编程范式的大门。在经典编程中,即符号型AI范式,人们输入规则(即为程序)和数据,根据这些规则处理数据,输出答案(见图1.2);在机器学习中,人们输入数据和该数据对应的答案,输出的是程序规则。这些学习来的规则能应用到新的数据上,并产生原创性的答案。

image

图1.2 机器学习:新的编程范式

机器学习系统是训练得到的,而不是用显式的编制。给机器学习一个任务相关的许多例子,它能发现这些例子的统计性的结构,并形成规则自动完成任务。例如,如果想给假期照片自动化打标签,你需要提出一个机器学习系统,输入许多打好标签的照片,该系统学习统计性规则并为指定的照片打标签。

虽然机器学习在1990年代才开始繁荣,但是由于硬件性能和数据大小的提升,机器学习很快变得非常流行,并成为AI最成功的子领域。机器学习与数学统计紧密相关,但是它在许多方面又有别于统计学。不像统计学,机器学习试图处理海量、复杂的数据集(比如,数百万张图片,包含数以万计的像素),然而经典统计分析(比如贝叶斯分析)将对此无能为力。因此,机器学习,特别是深度学习,展现相对较少的数学理论,更多的是工程化导向。

1.1.3 数据的特征学习

为了定义深度学习,以及理解深度学习和其它机器学习方法的不同,首先,需要知道机器学习算法在做什么。这里仅仅讲述机器学习在给定期望的数据例子下发现规则,执行数据处理的任务。所以为了实现机器学习,需要知道下面三件事:

  • 输入数据点(Input data points):例如,如果是语音识别的任务,这些数据点应该是语音文件。如果是图标标注的任务,数据点应该是图片;
  • 期望的输出样例(Examples of the expected output):语音识别的任务中,期望的输出样例是语音的手写文本;图像标注的任务中,期望的输出样例是“狗”,“猫”等等的标签;
  • 算法的评估标准:算法的评估标准是为了判断当前算法的输出值与期望的输出值之间的距离。评估标准可以反馈调节算法的工作,这个调节的步骤经常成为学习(learning)。

机器学习模型将输入数据集转化成有意义的输出,这个过程是从已有的输入和输出对的例子中学习到的。因此,机器学习和深度学习的中心问题是可读性的转化数据(transform data):换句话说,学习输入数据的有用的表征(representations)。数据表征使得数据更接近期望的输出。

那什么是数据的表征?其核心是数据的表现形式——表示(represent)或者编码(encode)数据。例如,彩色图片可以编码为RGB格式(红-绿-蓝)或者HSV格式(色相,饱和度,色调)。机器学习模型就是为了找到输入数据的合适表征——数据的转换是为了任务的更容易处理,比如,分类任务。

下面举一个例子。假设有x轴,y轴和一些数据点坐标(x,y),见图1.3:

image

图1.3 样本数据点

正如你所见,有一些白色的数据点,一些黑色的数据点。本例假设想实现一个算法:输入数据点的坐标(x,y),输出该数据点是黑色还是白色的。在本例中,

  • 输入是数据点的坐标;
  • 期望输出是数据点的颜色;
  • 算法评估标准:判断正确的数据点的百分比

此处的首要任务是数据集的新表征,该数据表征需要能清晰分离白色的点和黑色的点。本例使用的数据转换是坐标变换,见图1.4:

image

图1.4 坐标变换

在新的坐标系统中,数据点的坐标可以看成是数据集的一个新表征。并且效果非常棒!使用了数据集的新表征,黑/白分类问题就可以用很简单的规则解决了:“黑色数据点的x > 0,或者”白色数据点的x < 0“。”这个新表征方法基本解决了分类问题。

在本例中,人工定义了坐标变换。但是,如果尝试系统性地搜索所有不同的可能的坐标变换,并将正确分类数据点的百分比作为反馈,那就是在做机器学习。机器学习中的“学习”,是描述自动搜索更优的数据表征的过程。

所有机器学习算法都包含自动化寻找这么一个转换:该转换能根据给定的任务将数据集转化成有用的特征表示。这个操作可能是坐标变换,如前所见,或者线性投影(线性投影可能损失有用的信息),翻译,非线性操作(比如,当x > 0时,选中所有数据点),等等。机器学习算法通常不能创造性地搜索到这些转换;它们仅仅通过预定义的操作集合进行搜索,此过程也称为超参数空间(hypothesis space)。

那究什么是竟机器学习呢?从技术上来讲,在预定义的参数空间内,自动搜索输入数据集有效的表征,并以反馈信号做引导。这个简单的理念可以解决广大的人工职能任务,从语音识别到自动驾驶。

现在理解完机器学习的学习,下面开始探索是什么让深度学习比较特别。

1.1.4 何为深度学习的“深度”

深度学习是机器学习的一个特定子领域,其数据集的学习表征强调学习一系列连续的表征层,这些逐层的表征层不断地增加了有效的数据表征。深度学习的“深度”并不涉及更深度的学习方法;而是表示一系列连续的特征层的理念。数据模型的层数称为模型的深度(depth)。深度学习的其它叫法有,分层表征学习和多级表征学习。当前的深度学习经常涉及十层甚至上百层的连续特征表示层,它们都是从训练数据集自动学习的。其它机器学习的方法一般倾向于从训练数据学习一层或者两层的表征,因此,这种机器学习也称为浅层学习(shallow learning)。

在深度学习中,这些表征层通常是通过多层神经网络(neural network)模型学习得到的。神经网络来源于生物科学,深度学习的核心概念启发自对人类大脑的理解,但是深度学习模型并不是大脑的模型。没有证据表明大脑实现的学习机制被用在了当前的深度学习模型。你可能偶然发现某些顶级科学文章宣称深度学习像大脑一样工作或者模拟大脑,但是实际情况不是这样的。这可能对该领域的新人在理解深度学习上带来疑惑和困扰。你不需要搞得像大脑一样神秘,只管忘掉看到的关于深度学习和生物相关的假设。深度学习是一个从训练数据学习表征的数学框架。

深度学习算法到底如何进行表征学习呢?下面看一个几层深度网络的例子,其转换数字图片来识别图片中的数字,见图1.5:

image

图1.5 图片识别数字的分类神经网络

如图1.6所示,神经网络将数字图片转换为学习特征,对原始图片进行图像增强,从而识别出最终的结果。你可以认为神经网络经过多层连续的过滤器进行信息提取,最后得到结果。

image

图1.6 数字分类模型的深度学习表征

所以,什么是深度学习呢?从技术性上讲,深度学习是多级数据集表征学习。就这么简单的机制,经过足够多层的扩展,能够魔术般的解决问题。

1.1.5 三张图理解深度学习工作原理

此时,你已经知道机器学习是通过观察样本数据集的输入和目标,进而学习将输入(比如,图片)和目标(比如,“cat”标签)映射。你也知道深度神经网络是从样本数据集中学习数据转换(也即学习表征),通过多层数据转换层实现输入和目标的映射。下面看下这个学习过程具体是如何工作的?

layer处理输入数据的规范要求存储在layer的权重(weights)中,这些权重其实是一堆数值。从技术术语上讲,layer通过权重参数化实现了数据转换(权重有时也称layer的参数),见图1.7。本文中,学习(learning)意味着寻找神经网络中所有layer的权重值的集合,比如正确地将样本数据集的输入和相关的目标映射。这里需要注意的是,一个神经网络可能包含成千上万个参数。修改某个参数可能会影响其它所有的参数,那找到所有参数的正确值看似是个相当艰巨的任务。

image

图1.7 权重参数化的神经网络

为了控制神经网络的输出,需要度量模型输出和期望值之间的距离。这部分是神经网络模型的损失函数(loss function)的工作,有时也称观察函数(objective function)。损失函数输入神经网络模型的预测值和实际目标(期望输出),计算两者之间的距离值,评估神经网络在训练模型上的效果,见图1.8。

image

图1.8 度量神经网络模型输出的损失函数

深度学习的主要诀窍是,使用损失函数值作为反馈信号微调权重值,降低训练数据在某个方向上的损失值,见图1.9。前面的调节过程是神经网络的优化器(optimizer)的工作。优化器是由后向传播算法(Backpropagation)实现的,其是深度学习的核心算法,下一章惠详细讲解向传播算法是如何工作的。

image

图1.9 损失函数值调节权重值

一般神经网络权重值先随机初始化,这时神经网络完成的看起来仅仅是些随机数据转换,它输出的结果自然地远离理想值,损失函数值也相应很高。但是随着神经网络数据处理的推进,权重值也在正确的方向上逐步微调,损失值也在降低。前面描述的过程称为迭代训练(training loop)。重复迭代足够的训练次数后得到的权重值会最小化损失函数值。具有最小化损失值的神经网络模型输出与期望的目标值几乎相同。

1.16 深度学习的应用场景

虽然深度学习是机器学习中早期的子领域,但是其也就在2010年代才稍微兴盛起来。过去几十年,深度学习没什么进展,只在感知学习问题上有点成绩。

深度学习当前已经在机器学习难以出成绩的领域有了以下突破:

  • 图像分类
  • 语音识别
  • 手写字识别
  • 机器翻译
  • 文本语音转换
  • 智能数字设备,比如 Google Now和Amazon Alexa
  • 自动驾驶
  • 广告投放,比如Google,百度和微软的Bing
  • 网站搜索
  • 自然语言的问答
  • AlphaGo

人们仍在探索深度学习更广泛的领域,如果成功的话,深度学习可能在科学、软件开发等方面帮助人类开辟新世纪。

未完待续。。。

Enjoy!

翻译本书系列的初衷是,觉得其中把深度学习讲解的通俗易懂。不光有实例,也包含作者多年实践对深度学习概念、原理的深度理解。最后说不重要的一点,François Chollet是Keras作者。
声明本资料仅供个人学习交流、研究,禁止用于其他目的。如果喜欢,请购买英文原版。


侠天,专注于大数据、机器学习和数学相关的内容,并有个人公众号:bigdata_ny分享相关技术文章。

若发现以上文章有任何不妥,请联系我。

image

面部表情(表情包)识别

发表于 2017-12-28
动机

人类面部表情丰富,但可以总结归纳为7类基本表情: happy, sad, surprise, fear, anger, disgust, and neutral。面部表情是通过面部肌肉活动表达出来,有些比较微妙且复杂,包含了大量内心活动信息。通过面部表情识别,我们能简单而低成本地度量出观众/用户对内容和服务的态度。例如,零售商使用这些度量评估客户的满意度。健康医疗提供商能在治疗的过程根据病人的表情状态来提高服务。娱乐厂商能够监控观众的喜欢来持续的生产优质的内容。

训练过的人类很容易读懂其他人的情绪。事实上,只有14个月大的婴儿就可以区别出happy和sad的区别。但是计算机能够比人类在识别情绪上做的更好吗?为了找到答案,我们设计一个深度神经网络使得机器可以读懂人类情绪。换句话说,给机器以“眼”识别面部表情。

语料数据

训练模型的数据集使用的是Kaggle面部识别挑战赛的数据集(2013年)。它由35887张面部图片组成,48x48像素灰度图片,标注为7类基本表情: happy, sad, surprise, fear, anger, disgust, and neutral。
image
Figure 1. An overview of FER2013

当作者分析语料数据集时,发现“disgust”分类相对于其它分类不均衡(只有113张样本)。作者将两类相似的情感(disgust和anger)合并起来。为了防止数据倾斜,作者构建一个数据生成器 fer2013datagen.py,该生成器很容易分割训练集和保留数据集。本例使用28709张面部图片作为训练集,余下的图片作为测试集和验证集(每个数据集3589张)。这样我们获得了6类均衡的数据集,见图2,包含happy, sad, surprise, fear, anger, and neutral。

image
Figure 2. Training and validation data distribution.

算法模型

image
Figure 3. Mr. Bean, the model for the model.

深度学习在计算机视觉上是非常流行的技术。本文选择卷积神经网络(CNN)层作为构建基础创建模型架构。CNN是有名的模仿人脑工作的模型。本文使用憨豆先生的图片作为示例来解释如何将图像赋值给卷积神经网络模型。

典型的卷积神经网络包涵输入层,卷积层,稠密层(比如,全联接层)和输出层(见图4)。这些层按序组合,在 Keras中,使用Sequential()函数创建模型,再把其它层加入进来。
image
Figure 4. Facial Emotion Recognition CNN Architecture (modification from Eindhoven University of Technology-PARsE).

输入层
  • 输入层需要预处理,输入固定维度的数据。所以图片需先预处理再传入输入层。作者使用 OpenCV(计算机视觉库)做图像面部识别。OpenCV的haar-cascade_frontalface_default.xml文件包含预训练的过滤器,使用Adaboost算法能快速找到面部并裁剪。
  • 使用cv2.cvtColor函数将裁剪面部图片转化为灰度图,并使用cv2.resize改变图片大小为48x48像素。处理完的面部图片,相比于原始的(3,48,48)三色RGB格式“瘦身”不少。同时也确保传入输入层的图片是(1,48,48)的numpy数组。
卷积层
  • numpy数组传入Convolution2D层,指定过滤层的数量作为超参数。过滤层(比如,核函数)是随机生成权重。每个过滤层,(3,3)的感受野,采用权值共享与原图像卷积生成feature map。
  • 卷积层生成的feature map代表像素值的强度。例如,图5,通过过滤层1和原始图像卷积生成一个feature map,其它过滤层紧接着进行卷积操作生成一系列feature map。
    image
    Figure 5. Convolution and 1st max-pooling used in the network

  • 池化(Pooling)是一种降低维度的技术,常用于一个或者多个卷积层之后。池化操作是构建CNN的重要步骤,因为增加的多个卷积层会极大的影响计算时间。本文使用流行的池化方法MaxPooling2D,其使用(2,2)窗口作用于feature map求的最大像素值。池化后图像降低4个维度。

稠密层
  • 稠密层(比如,全联接层)是模仿人脑传输神经元的方式。它输入大量输入特征和变换特征,通过联接层与训练权重相连。
    image
    Figure 6. Neural network during training: Forward propagation (left) to Backward propagation (right).

  • 模型训练时权重前向传播,而误差是反向传播。反向传播起始与预测值和实际值的差值,计算所需的权重调整大小反向传回到每层。采用超参数调优手段(比如,学习率和网络密度)控制训练速度和架构的复杂度。随着灌入更多的训练数据,神经网络能够使得误差最小化。

  • 一般,神经网络层/节点数越多,越能捕捉到足够的信号。但是,也会造成算法模型训练过拟合。应用dropout可以防止训练模型过拟合。Dropout随机选择部分节点(通常,占总节点数的百分比不超过50%),并将其权重置为0。该方法能有效的控制模型对噪声对敏感度,同时也保留架构的复杂度。
输出层
  • 本文的输出层使用softmax激励函数代替sigmoid函数,将输出每类表情的概率。
  • 因此,本文的算法模型能显示出人脸表情组成的详细组成概率。随后会发现没必要将人类表情表示为单个表情。本文采用的是混合表情来精确表示特定情感

注意,没有特定的公式能建立一个神经网络保证对各种场景都有效。不同的问题需要不同的模型架构,产生期待的验证准确度。这也是为什么说神经网络是个“黒盒算法”。但是也不要太气馁,模型训练的时间会让你找到最佳模型,获得最大价值。

小结

刚开始创建了一个简单的CNN深度学习模型,包括一个输入层,三个卷积层和一个输出层。事实证明,简单的算法模型效果比较差。准确度0.1500意味着仅仅是随机猜测的结果(1/6)。简单的网络结构导致不能有效的判别面部表情,那只能说明要“深挖”。。。
image
下面稍微修改下三部分的组合,增加模型的复杂度:

  • 卷积层的数量和配置
  • 稠密层的数量和配置
  • 稠密层的dropout占比

使用AWS的GPU计算(g2.2xlarge)训练和评估组合的算法模型。这次极大的减少了训练时间和模型调优的效率。最后的网络模型是九层,每三层卷积层接着一个max-pooling层,见图7。
image
Figure 7. Final model CNN architecture.

模型验证

image

结果

最后的CNN模型交叉验证准确度是58%,其具有积极意义。因为人类面部表情经常由多个基本表情组合,仅仅用单一表情是很难描述。本例中,当训练模型预测不准确时,正确的标签一般是第二相似的表情,见图8(浅蓝色标签)。
image
Figure 8. Prediction of 24 example faces randomly selected from test set.

分析

image
Figure 9. Confusion matrix for true and prediction emotion counts.

仔细看下每个表情的预测结果。图9是测试集的模型预测的混淆矩阵。矩阵给出表情预测的数量和多分类模型的效果展示:

  • 该模型很好的鉴别出正表情(happy和surprised),预测准确度高。大约7000张训练集中,happy表情的准确度达到76.7%。surprised表情预测准确度为69.3%。
  • 平均意义上讲,本例的神经网络模型对负表情的预测效果较差。sad表情只有39.7%的准确度,并且该网络模型频繁的误判别angry、fear和neutral表情。另外,当预测sad表情和neutral表情时经常搞混,可能是因为这两个表情很少出现。
  • 误分类预测小于3的频率
    image
    Figure 10. Correct predictions on 2nd and 3rd highest probable emotion.
计算机视觉

随着池化层数量的增加,下游神经网络管道的feature map越来越抽象。图11和图12可视化第二次和第三次max-pooling池化层后的feature map。

分析和可视化卷积神经网络内层输出的代码见,https://github.com/JostineHo/mememoji/blob/master/data_visualization.ipynb。
image
Figure 11. 第二个max-pooling池化层后的CNN (64-filter) feature maps。
iamge
Figure 12. 第三个max-pooling池化层后的CNN (128-filter) feature maps。

关于作者

Jostine Ho是数据科学家和深度学习研究者。她感兴趣于计算机视觉和自动化解决现实世界中具体问题。她毕业于德克萨斯大学奥斯汀分校,取得石油系统工程专业硕士学位。

参考
  1. “Dataset: Facial Emotion Recognition (FER2013)” ICML 2013 Workshop in Challenges in Representation Learning, June 21 in Atlanta, GA.
  2. “Andrej Karpathy’s Convolutional Neural Networks (CNNs / ConvNets)” Convolutional Neural Networks for Visual Recognition (CS231n), Stanford University.
  3. Srivastava et al., 2014. “Dropout: A Simple Way to Prevent Neural Networks from Overfitting”, Journal of Machine Learning Research, 15:1929-1958.
  4. Duncan, D., Shine, G., English, C., 2016. “Report: Facial Emotion Recognition in Real-time” Convolutional Neural Networks for Visual Recognition (CS231n), Stanford University.

Enjoy!

侠天,专注于大数据、机器学习和数学相关的内容,并有个人公众号:bigdata_ny分享相关技术文章。

若发现以上文章有任何不妥,请联系我。

image

Python:BeautifulSoup爬虫手把手实例教程

发表于 2017-12-28

Web爬虫能自动抽取数据,并以人们容易理解的方式展现。本文章将举个金融市场的例子,但是Web爬虫能做的远不止于此。

如果你是一个狂热的投资者,当你需要的股票价格跨多个网站,那获取每天的股票价格是相当痛苦。本文通过创建一个Web爬虫自动从互联网上检索股指数据。

开始

我们将使用Python和BeautifulSoup编写爬虫:

  • 对于Mac用户,Python已预装好。打开终端,输入python —version,你回看到Python版本是2.7.x;
  • 对于Windows用户,请按官方文档安装Python

接下来使用pip安装BeautifulSoup库,在terminal中执行:

1
2
easy_install pip
pip install BeautifulSoup4

Note:如果执行以上命令失败,请尝试加上sudo再次执行。

基础知识

在看代码之前,先理解一下HTML网页基本知识和爬取规则。

HTML标签

如果你已经熟悉HTML标签,可以跳过此部分。

1
2
3
4
5
6
7
8
9
<!DOCTYPE html>
<html>
<head>
</head>
<body>
<h1> First Scraping </h1>
<p> Hello World </p>
<body>
</html>

上面是HTML网页基本的语法,网页中的每个代表一个功能块:

  1. <!DOCTYPE html>:HTML文档必须以<!DOCTYPE html>标签为起点;
  2. HTML文档包含在<html> 和</html>标签之间;
  3. HTML文档的元数据和脚本声明放在<head> 和 </head>标签之间;
  4. HTML文档的正文部分放在 <body> 和 </body>标签内;
  5. <h1> 到 <h6>标签定义标题;
  6. <p> 标签定义段落;

其它常用的标签有,<a> :超链接,<table> :表格;<tr> 表格的行,<td>表格的列。

有时,HTML标签也带有 id 或者 class 属性。 id 属性是指定HTML标签的唯一id,其在HTML文档内取值唯一。 class 属性是表示同一类HTML标签。我们可以使用两个属性进行数据定位。

W3Schools上可以学到更多关于HTML tag,id和class的知识。

爬取规则
  • 爬取网站的第一步是,阅读网站的爬取协议或者声明。一般情况,爬取的数据不能用于商业目的。
  • 不能太频繁的请求网站数据,不然会被列入黑名单。最好能让你的程序模拟人的行为。一般设为每秒访问一个页面为最佳。
  • 网站的布局会随时改变,确保改写你的代码能重新爬取该网页。
检查(Inspect)网页

下面拿Bloomberg Quote网站举例:

当前炒股非常火,假设某个炒股者关注股票市场,想得到股指(S&P 500)名字和价格。首先在浏览器打开该网页,使用浏览器的检查器(inspector)来检查网页。

imge

鼠标悬停在价格的位置,你会看到价格周围蓝色的方框。当点击它时,相关的网页在浏览器控制台上显示被选中。

image

上面的结果显示,该股指的价格包含在HTML标签的一级, <div class="basic-quote"> → <div class="price-container up"> → <div class="price">。

相似的,如果将鼠标悬停在“S&P 500 Index”并点击名字,会看到它包含在<div class="basic-quote"> 和 <h1 class="name">。

imge

现在我们知道可以通过class 标签辅助定位所需爬取的数据。

代码实践

知道所要爬取的数据位置,那就动起手来开始爬虫。

首先,导入所需的编程库:

1
2
3
# import libraries
import urllib2
from bs4 import BeautifulSoup

接着,声明网页URL变量:

1
2
# specify the url
quote_page = ‘http://www.bloomberg.com/quote/SPX:IND'

然后,使用Python urllib2获取HTML网页:

1
2
# query the website and return the html to the variable ‘page’
page = urllib2.urlopen(quote_page)

最后,把网页解析成BeautifulSoup格式:

1
2
# parse the html using beautiful soap and store in variable `soup`
soup = BeautifulSoup(page, ‘html.parser’)

上面soup变量获得网页的HTML内容。我们将从其中抽取数据。

还记得所需爬取数据的唯一层吗?BeautifulSoup能帮我们获取这些层,并使用find()抽取其内容。因为HTML的class标签是唯一的,所以可简单的查询 <div class="name">。

1
2
# Take out the <div> of name and get its value
name_box = soup.find(‘h1’, attrs={‘class’: ‘name’})

通过 text函数获取数据:

1
2
name = name_box.text.strip() # strip() is used to remove starting and trailing
print name

同样地,我们获取股指价格:

1
2
3
4
# get the index price
price_box = soup.find(‘div’, attrs={‘class’:’price’})
price = price_box.text
print price

当运行程序,将会打印S&P 500股指。

image

导出Excel CSV文件

获取数据之后进行存储,Excel逗号分隔的格式看起来不错。你能用Excel打开查看数据。

首先,导入Python的csv模块和datetime模块:

1
2
import csv
from datetime import datetime

将爬取的数据存储到一个csv文件:

1
2
3
4
# open a csv file with append, so old data will not be erased
with open(‘index.csv’, ‘a’) as csv_file:
writer = csv.writer(csv_file)
writer.writerow([name, price, datetime.now()])

执行程序后,你可以打开一个index.csv文件,看到如下内容:

image

如果每天执行该程序,你无需打开网站即可获得S&P 500股指价格。

进阶使用

这时你可以轻松爬取一个股指,下面举例一次抽取多个股指数据。

首先,更新quote_page变量为一个URL数组:

1
quote_page = [‘http://www.bloomberg.com/quote/SPX:IND', ‘http://www.bloomberg.com/quote/CCMP:IND']

然后,将爬取代码加一个for循环,其处理每个URL并将数据存储到data:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# for loop
data = []
for pg in quote_page:
# query the website and return the html to the variable ‘page’
page = urllib2.urlopen(pg)
# parse the html using beautiful soap and store in variable `soup`
soup = BeautifulSoup(page, ‘html.parser’)
# Take out the <div> of name and get its value
name_box = soup.find(‘h1’, attrs={‘class’: ‘name’})
name = name_box.text.strip() # strip() is used to remove starting and trailing
# get the index price
price_box = soup.find(‘div’, attrs={‘class’:’price’})
price = price_box.text
# save the data in tuple
data.append((name, price))

同时,更改存储csv文件部分代码:

1
2
3
4
5
6
# open a csv file with append, so old data will not be erased
with open(‘index.csv’, ‘a’) as csv_file:
writer = csv.writer(csv_file)
# The for loop
for name, price in data:
writer.writerow([name, price, datetime.now()])

现在实现一次爬取两个股指价格。

Enjoy!


侠天,专注于大数据、机器学习和数学相关的内容,并有个人公众号:bigdata_ny分享相关技术文章。

若发现以上文章有任何不妥,请联系我。

image

Flink开启流处理技术新潮流:解决流处理event time和消息乱序

发表于 2017-12-28

写在之前:此文翻译自:how-apache-flink-enables-new-streaming-applications,做了少许改动,感谢原作者。

速度是成功的一重要要素,流处理技术的速度使得其越来越受青睐。在现实世界中,数据产品总是以持续不断的处理面目示人,比如,web服务日志,移动应用用户行为,或者传感器数据。

到目前为止,大部分数据处理架构技术栈都建立在有限的、静态的数据假设之上。现代流处理技术在不断地努力,通过模拟和处理现实世界的event,而最理想的模拟情况是把数据看作“streams”。Flink不但实现“streams”流,而且具有开创性的技术点。本篇先来讲述Flink解决消息乱序和event time窗口。

消息乱序和event time窗口

在讨论解决消息乱序问题之前,需先定义时间和顺序。在流处理中,时间的概念有两个:

  • Event time :Event time是事件发生的时间,经常以时间戳表示,并和数据一起发送。带时间戳的数据流有,Web服务日志、监控agent的日志、移动端日志等;
  • Processing time :Processing time是处理事件数据的服务器时间,一般是运行流处理应用的服务器时钟。

许多流处理场景中,事件发生的时间和事件到达待处理的消息队列时间有各种延迟:

  1. 各种网络延迟;
  2. 数据流消费者导致的队列阻塞和反压影响;
  3. 数据流毛刺,即,数据波动;
  4. 事件生产者(移动设备、传感器等)离线;

上述诸多原因会导致队列中的消息频繁乱序。事件发生的时间和事件到达待处理的消息队列时间的不同随着时间在不断变化,这常被称为时间偏移(event time skew),表示成:“processing time – event time”。

对大部分应用来讲,基于事件的创建时间分析数据比基于事件的处理时间分析数据要更有意义。Flink允许用户定义基于事件时间(event time)的窗口,而不是处理时间。

Flink使用事件时间 clock 来跟踪事件时间,其是以watermarks来实现的。watermarks是Flink 源流基于事件时间点生成的特殊事件。 T 时间点的watermarks意味着,小于 T 的时间戳的事件不会再到达。Flink的所有操作都基于watermarks来跟踪事件时间。

下图描述Flink是如何计算事件时间窗口。当watermarks到达时窗口计算会被触发,并更新事件时间clock:

很明显,左上角watermarks W(4)快要到达,出现计算窗口T1-T4;右上角因为消息有乱序(事件时间为3的事件排在事件时间为7的后面),同时出现两个计算窗口T1-T4和T4-T8;左下角watermarks W(4)触发计算窗口演化,小于事件时间4的事件不再到达;右下角参考前面解读。

基于事件时间的Pipeline会产生更精确的结果,因为一旦有相应事件时间的事件到达会尽快计算;而相对于周期性的批量处理来讲,基于事件时间的数据流pipeline会更早的计算出结果,并且更精确(批量处理不能很好的处理跨batch的消息乱序)。

结合事件时间和实时pipeline

事件时间pipeline会因为必要的事件时间过程而导致一定的延迟。有时延迟太大导致无法获得实时结果,这时就得增加延迟短的结果。

Flink是一个流处理框架,能毫秒级处理事件,它能在同一个应用中综合低延迟的实时pipeline和事件时间pipeline,列子如下:

  1. 基于单个事件的低延迟报警。如果某类事件被识别,需要发出报警信息;
  2. 基于处理时间窗口的实时dashboard,能够聚合秒级的事件数;
  3. 基于事件时间的精确统计
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/**
* Main entry point.
*/
public static void main(String[] args) throws Exception {
// create environment and configure it
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.registerType(Statistic.class);
env.registerType(SensorReading.class);
env.setParallelism(4);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// create a stream of sensor readings, assign timestamps, and create watermarks
DataStream<SensorReading> readings = env
.addSource(new SampleDataGenerator())
.assignTimestamps(new ReadingsTimestampAssigner());
// path (1) - low latency event-at a time filter
readings
.filter(reading -> reading.reading() > 100.0)
.map( reading -> "-- ALERT -- Reading above threshold: " + reading )
.print();
// path (2) - processing time windows: Compute max readings per sensor group
// because the default stream time is set to Event Time, we override the trigger with a
// processing time trigger
readings
.keyBy( reading -> reading.sensorGroup() )
.window(TumblingTimeWindows.of(Time.seconds(5)))
.trigger(ProcessingTimeTrigger.create())
.fold(new Statistic(), (curr, next) ->
new Statistic(next.sensorGroup(), next.timestamp(), Math.max(curr.value(), next.reading())))
.map(stat -> "PROC TIME - max for " + stat)
.print();
// path (3) - event time windows: Compute average reading over sensors per minute
// we use a WindowFunction here, to illustrate how to get access to the window object
// that contains bounds, etc.
// Pre-aggregation is possible by adding a pre-aggregator ReduceFunction
readings
// group by, window and aggregate
.keyBy(reading -> reading.sensorId() )
.timeWindow(Time.minutes(1), Time.seconds(10))
.apply(new WindowFunction<SensorReading, Statistic, String, TimeWindow>() {
@Override
public void apply(String id, TimeWindow window, Iterable<SensorReading> values, Collector<Statistic> out) {
int count = 0;
double agg = 0.0;
for (SensorReading r : values) {
agg += r.reading();
count++;
}
out.collect(new Statistic(id, window.getStart(), agg / count));
}
})
.map(stat -> "EVENT TIME - avg for " + stat)
.print();
env.execute("Event time example");
}

Flink提供的窗口触发条件包括处理时间clock,事件时间clock,以及数据流内容。

Flink是如何度量时间?

下面看下Flink是如何处理时间的,在这点上与其它老的流处理系统有啥不同。

一般意义上讲,时间是用clock度量的。最简单的clock称为wall clock,它是集群中服务器执行流处理作业的间隔clock。wall clock是用来跟踪处理时间的。

为了跟踪事件时间,我们需要集群机器间相同的clock。Flink是通过watermarks机制实现的。一个watermarks是指在真实事件流时间点发生的事件(比如,上午10点),那么到现在为止上午10点前的事件不会再到达。事件时间clock(event time clock)跟踪时间比wall clock要粗粒度,但是更准确。

还有第三种clock,叫做system clock。它是用来保证流处理系统的“exactly-once“语义的。Flink跟踪作业的处理是通过barriers(栏栅),并进行snapshot。barriers与watermarks类似,不同之处在于,barriers是由Flink的master机器的wall clock 生成, 而watermarks是由真实世界的时间生成。同样,Spark Streaming的micro-batche schedule是基于Spark receiver的wall clock 。

下图完美展现刚才讲的各种时间:

Worker 1和Worker 2机器上并行执行对数据源和窗口的操作作业。事件上的数字代表时间戳,方块的颜色代表不同的key(灰色流向窗口1,紫色流向窗口2)。数据源从队列中读取事件(有分区,通过key分区),把他们分发到正确的窗口。窗口定义为基于事件时间的时间窗口(Flink包含时间窗口和count窗口)。我能看到Worker 1、Worker 2和Master机器的wall clock不同(缺乏时间同步,具体看ntp),分别为10,8,7。数据源发出watermarks,当前的watermarks时间戳为4。这意味着,event time clock是4,这时进行并行计算。Master(JobManager)对数据源做barriers,并对计算做snapshot。系统时间此时为7,checkpoint为第七个。

下面对流处理框架中的三种clock进行总结:

  • event time clock:度量事件流的时间,粗粒度;
  • system clock:度量计算的过程。实际上是协调者机器的wall clock;
  • wall clock :度量处理时间。

下面也给出老的流处理系统的弊端:

  1. 计算不准确:因为真实世界的事件发生顺序与处理的顺序经常不一致;
  2. 计算结果强依赖当前时间;
  3. 系统参数配置会影响程序的语义:比如,增加checkpoin的间隔。

老的流处理系统的这些缺点让它们没法获得准确的结果(至少是可控的准确度)。

而Flink完全分离这三种clock:

  1. 基于event time clock的watermarks跟踪事件流时间,允许用户定义基于事件时间的窗口;
  2. system clock与event time clock完全解藕,跟踪计算过程和全局snapshot,不对外暴露api,仅仅用来分布式系统的协调;
  3. 处理时间是用的机器的wall clock,暴露给用户支持处理时间窗口。

相关文章:

Enjoy!


侠天,专注于大数据、机器学习和数学相关的内容,并有个人公众号:bigdata_ny分享相关技术文章。

若发现以上文章有任何不妥,请联系我。

image

Kafka使用总结:Producer

发表于 2017-12-28

Kafka作为消息中间件是各公司平台架构绕不开的话题。

不管你是把Kafka作为队列,还是消息通道,都需要在应用中通过producer写数据到Kafka,再用consumer从Kafka中消费。应用往Kafka写数据的原因有很多:用户行为分析、日志存储、异步通信等。多样化的使用场景带来了多样化的需求:消息是否能丢失?是否容忍重复?消息的吞吐量?消息的延迟?

这么苛刻的要求Kafka能满足吗?

image

Kafka Producer

首先,创建ProducerRecord必须包含Topic和Value,key和partition可选。然后,序列化key和value对象为ByteArray,并发送到网络。

接下来,消息发送到partitioner。如果创建ProducerRecord时指定了partition,此时partitioner啥也不用做,简单的返回指定的partition即可。如果未指定partition,partitioner会基于ProducerRecord的key生成partition。producer选择好partition后,增加record到对应topic和partition的batch record。最后,专有线程负责发送batch record到合适的Kafka broker。

当broker收到消息时,它会返回一个应答(response)。如果消息成功写入Kafka,broker将返回RecordMetadata对象(包含topic,partition和offset);相反,broker将返回error。这时producer收到error会尝试重试发送消息几次,直到producer返回error。

Producer实战
构造Kafka Producer

创建Properties对象,配置producer参数。根据Properties创建producer对象。Kafka producer必选参数有3个:

  • bootstrap.servers :Kafka broker的列表,包含host和port。此处不必包含Kafka集群所有的broker,因为producer会通过其它broker查询到所需信息。但至少包含2个broker;
  • key.serializer:序列化key参数,值为类名,org.apache.kafka.common.serialization.Serializer接口的实现;
  • value.serializer:序列化value参数,值为类名,使用方式同key.serializer。

最简代码实现如下:

1
2
3
4
5
6
7
private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.String-
Serializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serializa-
tion.StringSerializer");
producer = new KafkaProducer<String, String>(kafkaProps);

创建Properties对象,key和value为String类型,选用Kafka自带的StringSerializer。通过属性配置可以控制Producer的行为。

实例化producer后,接着发送消息。这里主要有3种发送消息的方法:

  • 立即发送:只管发送消息到server端,不care消息是否成功发送。大部分情况下,这种发送方式会成功,因为Kafka自身具有高可用性,producer会自动重试;但有时也会丢失消息;
  • 同步发送:通过send()方法发送消息,并返回Future对象。get()方法会等待Future对象,看send()方法是否成功;
  • 异步发送:通过带有回调函数的send()方法发送消息,当producer收到Kafka broker的response会触发回调函数

以上所有情况,一定要时刻考虑发送消息可能会失败,想清楚如何去处理异常。

通常我们是一个producer起一个线程开始发送消息。为了优化producer的性能,一般会有下面几种方式:单个producer起多个线程发送消息;使用多个producer。

下面开始详细展示上面所提到的三种发送消息的方法,以及各种类型错误的处理方式。

发送消息到Kafka

最简单的方法如下:

1
2
3
4
5
6
7
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products","France");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}

创建ProducerRecord对象,Producer使用send()方法发送ProducerRecord。send()方法会返回带有RecordMetadata的Future对象,这里只简单的忽略返回值,所以我们并不会知道消息是否发送成功;

但即使如此简单,Producer发送消息到Kafka也仍然得处理些异常:当序列化消息失败会抛出SerializationException;buffer溢出会抛出BufferExhaustedException;当发送线程终止会抛出InterruptException。

同步发消息
1
2
3
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
producer.send(record).get();

这里,我们使用Future.get()方法等待Kafka的状态返回。Producer可以实现自己的Future来处理Kafka broker返回的异常。如果Producer发送消息成功,它会返回RecordMetadata对象(可用来检索消息的offset)。

Kafka Producer一般有两类错误。可重试错误会通过重试发送消息解决。比如,连接重连可解决连接错误;partition重新选举leader可解决“no leader”错误。Kafka Producer能配置重试次数,超过重试次数还不能解决的会抛出错误。另外一类就是不能通过重试处理的错误,比如,消息大小太大,这种情况下Kafka Producer会立即报错。

异步发送消息

如果应用和Kafka集群间的网络质量太差,那么同步发送消息的方式发送每条消息后需要等待较长时间才收到应答。这对高并发海量消息发送简直就是灾难,因为等待应答的时间远超过消息发送时间。另外,有些app压根就不要求返回值。况且,即使发送消息失败了,只要写下对应的错误日志即可。

为了异步发送消息,同时可以处理错误。Producer支持带有回调函数的发送消息方法。

1
2
3
4
5
6
7
8
9
10
11
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
} }
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback());

使用回调函数的前提是实现org.apache.kafka.clients.producer.Callback 接口。如果Kafka返回错误,onCompletion捕获到非null异常。示例代码仅仅打印出了异常信息,实际应用开发需根据实际情况添加业务逻辑处理。

序列化

在前面的列子中,可看出Producer配置必须指定序列化方法(serializer,默认是String serializer)。

这里将讲解如何构建定制化的序列化器,然后介绍Avro序列化器。

定制序列化器

当你需要发送到Kafka的对象非String和Integer,那你要么自己实现对应的序列化器,要么使用像Avro、Thrift或者Protobuf之类的业界通用的序列化库。这里强烈推荐使用这些工业化的通用的序列化库。

为了让大家理解序列化器的工作原理,这里还是先讲讲如何构建定制化的序列化器。下面先建一个简单的Customer类:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Customer {
private int customerID;
private String customerName;
public Customer(int ID, String name) {
this.customerID = ID;
this.customerName = name;
}
public int getID() {
return customerID;
}
public String getName() {
return customerName;
} }

接着创建Customer类的序列化器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import org.apache.kafka.common.errors.SerializationException;
import java.nio.ByteBuffer;
import java.util.Map;
public class CustomerSerializer implements Serializer<Customer> {
@Override
public void configure(Map configs, boolean isKey) {
// nothing to configure
}
@Override
/**
We are serializing Customer as:
4 byte int representing customerId
4 byte int representing length of customerName in UTF-8 bytes (0 if name is
Null)
N bytes representing customerName in UTF-8
**/
public byte[] serialize(String topic, Customer data) {
try {
byte[] serializedName;
int stringSize;
if (data == null)
return null;
else {
if (data.getName() != null) {
serializeName = data.getName().getBytes("UTF-8");
stringSize = serializedName.length;
} else {
serializedName = new byte[0];
stringSize = 0;
}
}
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
buffer.putInt(data.getID());
buffer.putInt(stringSize);
buffer.put(serializedName);
return buffer.array();
} catch (Exception e) {
throw new SerializationException("Error when serializing Customer to byte[] " + e);
}
}
@Override
public void close() {
// nothing to close
}
}
Avro的序列化

Avro详细说明见官方文档,这里只列出部分要用到的特性。

Avro Schema是用Json描述,如下:

1
2
3
4
5
6
7
8
{"namespace": "customerManagement.avro",
"type": "record",
"name": "Customer",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": "null"}
] }

Avro依赖模式(Schema)来实现数据结构定义,所以读写Avro文件都得依赖其Schema。Kafka中Schema Registry提供元数据的存储和解析。那Producer的序列化和Consumer的反序列化都会去Schema Registry读取对应的Schema。

image

Avro的使用有两种:一种是使用Avro Schema生成的类(官方提供生成工具,比如,avro-tools-1.7.0.jar);一种是直接Avro Schema。Kafka Producer使用Avro序列化器的方式与其它序列化器相同。下面先说使用Avro Schema生成的类的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", schemaUrl);
String topic = "customerContacts";
int wait = 500;
Producer<String, Customer> producer = new KafkaProducer<String, Customer>(props);
// We keep producing new events until someone ctrl-c
while (true) {
Customer customer = CustomerGenerator.getNext();
System.out.println("Generated customer " + customer.toString());
ProducerRecord<String, Customer> record =
new ProducerRecord<>(topic, customer.getId(), customer);
producer.send(record);
}

其中,schema.registry.url是schema存储的位置,KafkaAvroSerializer 是Avro的序列化器,Customer 是生成的类。

如果你想直接使用Avro Schema,方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", url);
String schemaString = "{\"namespace\": \"customerManagement.avro\",
\"type\": \"record\", " +
"\"name\": \"Customer\"," +
"\"fields\": [" +
"{\"name\": \"id\", \"type\": \"int\"}," +
"{\"name\": \"name\", \"type\": \"string\"}," +
"{\"name\": \"email\", \"type\": [\"null\",\"string
\"], \"default\":\"null\" }" +
"]}";
Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(schemaString);
for (int nCustomers = 0; nCustomers < customers; nCustomers++) {
String name = "exampleCustomer" + nCustomers;
String email = "example " + nCustomers + "@example.com";
GenericRecord customer = new GenericData.Record(schema);
customer.put("id", nCustomer);
customer.put("name", name);
customer.put("email", email);
ProducerRecord<String, GenericRecord> data =
new ProducerRecord<String, GenericRecord>("customerContacts", name, customer);
producer.send(data);
} }

未完待续。。。

Enjoy!


侠天,专注于大数据、机器学习和数学相关的内容,并有个人公众号:bigdata_ny分享相关技术文章。

若发现以上文章有任何不妥,请联系我。

image

Linux性能利器Htop:完胜top、strace

发表于 2017-12-28

写在之前:此文翻译自:https://peteris.rocks/blog/htop,做了少许改动,感谢原作者。

长久以来,我只知Linux有个神器htop,却不知道htop的各项指标的内涵。

比如,2核的服务器 CPU利用率为 50%,那为啥load average 却显示 1.0?那接下来开始捋捋。。。

俗话说得好,好记性不如个烂笔头。

Htop on CentOS

来个htop全身照:

image

  • Uptime

uptime:系统运行的时长。

当然,你可以用uptime:

1
2
$ uptime
12:17:58 up 111 days, 31 min, 1 user, load average: 0.00, 0.01, 0.05

uptime从 /proc/uptime文件获取信息:

1
9592411.58 9566042.33

前者数字(9592411.58)代表系统运行的秒值,后者(9566042.33)代表服务器空闲秒数。一般多核系统后者秒值会比系统的uptime值大,因为它是取和。作者是怎么知道这个原因的呢?通过跟踪 uptime 程序运行打开的文件,这里是用的 strace 工具:

1
strace uptime

从strace输出中grep查找系统调用的open 函数。由于strace标准输出内容较多,可以使用2>&1重定向:

1
2
3
4
5
$ strace uptime 2>&1 | grep open
...
open("/proc/uptime", O_RDONLY) = 3
open("/var/run/utmp", O_RDONLY|O_CLOEXEC) = 4
open("/proc/loadavg", O_RDONLY) = 4

其中,包括前面提到的 /proc/uptime文件。这说明我们可以使用 strace -e open uptime来代替strace uptime 2>&1 | grep open。Linux的uptime命令提供易读、宜用的方法。

  • Load average

除了uptime,有三个数字表示 load average:

1
2
$ uptime
12:59:09 up 32 min, 1 user, load average: 0.00, 0.01, 0.03

load average值是从 /proc/loadavg文件获得,同时你也可以用 strace 验证。

1
2
$ cat /proc/loadavg
0.00 0.01 0.03 1/120 1500

前三个数字分别表示最近1分钟、5分钟、15分钟的CPU和IO的利用率。第四行显示当前正在运行的进程数和进程总数,最后一行显示最近使用的进程ID。

下面讲下进程ID。当你启动一个新进程,它将会分到一个ID数字。进程ID通常是递增的,除非进程退出后进程ID重新复用。特殊进程ID 1 是属于/sbin/init ,系统启动时即分配。

让我们再来看下 /proc/loadavg文件的内容,然后后台启动 sleep 命令,这时会输出进程ID。

1
2
3
4
$ cat /proc/loadavg
0.00 0.01 0.03 1/123 1566
$ sleep 10 &
[1] 1567

所以,1/123 代表一个进程正在运行,总共有123个进程运行过。

当运行cat /dev/urandom > /dev/null (重复生成随机数)时,你会发现有 2 个进程在运行:

1
2
3
4
$ cat /dev/urandom > /dev/null &
[1] 1639
$ cat /proc/loadavg
1.00 0.69 0.35 2/124 1679

这里的两个进程是:随机数生成、cat /proc/loadavg,同时load average值也在增加。

System load average是runnable或uninterruptable状态的进程数的平均数。所以上面的 load average为 1(平均1个运行的进程),是因为作者演示的服务器是单核CPU,一次跑一个进程那CPU的利用率是100%。如果服务器是双核,那CPU利用率就是50%。双核CPU的利用率如果是100%,那load average 将会是2.0。CPU的核数可以从htop的左上角看到,或者运行 nproc。

  • Processes

在htop的右上角显示进程数和多少个进程正在运行。但是htop使用 Tasks 代表进程(注:Tasks 是进程的一个别名)。

在htop中,使用键盘上的Shift+H组合键也可轻松看到线程数Tasks: 23, 10 thr,使用Shift+K组合键可以看到内核线程数,Tasks: 23, 40 kthr。

  • Process ID / PID

每个进程启动都会分配一个唯一的进程ID,称作进程ID或者PID。如果你在bash中使用 (&)在后台执行,你将会看到输出的PID。

1
2
$ sleep 1000 &
[1] 12503

如果你手滑没看到,那也可以在bash中使用 $!内建变量查看到最近一次后台运行的PID:

1
2
$ echo $!
12503

进程ID是非常有用的,具体为什么可以看维基百科。

procfs是伪文件系统,procfs可以让用户的程序通过读取文件的方法从Linux内核中获取信息。它经常被挂载在 /proc/下,伪装的看起来像个正规文件目录,你也可以使用 ls 和 cd命令。

某进程相关的所有信息都放在 /proc/<pid>/下:

1
2
3
4
5
6
7
$ ls /proc/12503
attr coredump_filter fdinfo maps ns personality smaps task
auxv cpuset gid_map mem numa_maps projid_map stack uid_map
cgroup cwd io mountinfo oom_adj root stat wchan
clear_refs environ limits mounts oom_score schedstat statm
cmdline exe loginuid mountstats oom_score_adj sessionid status
comm fd map_files net pagemap setgroups syscall

比如, /proc/\/cmdline 会让你知道这个进程是如何启动的:

1
2
$ cat /proc/12503/cmdline
sleep1000$

正确的查看姿势是(因为命令是用\0 分隔):

1
2
3
$ od -c /proc/12503/cmdline
0000000 s l e e p \0 1 0 0 0 \0
0000013

或者:

1
2
3
4
5
6
$ tr '\0' '\n' < /proc/12503/cmdline
sleep
1000
$ strings /proc/12503/cmdline
sleep
1000

进程的进程目录还包含有链接(link),比如: cwd指向工作目录,exe指向可执行的二进制文件:

1
2
3
$ ls -l /proc/12503/{cwd,exe}
lrwxrwxrwx 1 ubuntu ubuntu 0 Jul 6 10:10 /proc/12503/cwd -> /home/username
lrwxrwxrwx 1 ubuntu ubuntu 0 Jul 6 10:10 /proc/12503/exe -> /bin/sleep

以上就是htop,top, ps这些诊断工具是为啥可以获取到一个进程的详细信息的,/proc/\/\。

  • Process tree

在htop中使用F5 即可看到进程树,当然也可以用ps f:

1
2
3
4
$ ps f
PID TTY STAT TIME COMMAND
12472 pts/0 Ss 0:00 -bash
12684 pts/0 R+ 0:00 \_ ps f

或者 pstree

1
2
3
4
5
6
7
8
9
10
$ pstree -a
init
├─atd
├─cron
├─sshd -D
│ └─sshd
│ └─sshd
│ └─bash
│ └─pstree -a
...

从这里你就可以知道为啥 bash 或者 sshd 是其它进程的父进程。

/sbin/init 作为系统启动进程,进程ID为1,接着是SSH的守护进程 sshd(当你用ssh连接到服务器),接着是 bash shell。

  • Process user

每个进程属于一个user,user以数值ID代表:

1
2
3
4
$ sleep 1000 &
[1] 2045
$ grep Uid /proc/2045/status
Uid: 1000 1000 1000 1000

可以用id命令发现更多关于此user的信息:

1
2
$ id 1000
uid=1000(ubuntu) gid=1000(ubuntu) groups=1000(ubuntu),4(adm)

通过如下证明id是从/etc/passwd 和 /etc/group 文件获取信息的:

1
2
3
$ strace -e open id 1000
open("/etc/passwd", O_RDONLY|O_CLOEXEC) = 3
open("/etc/group", O_RDONLY|O_CLOEXEC) = 3

查看/etc/passwd 和 /etc/group 文件的内容:

1
2
3
4
5
6
7
8
$ cat /etc/passwd
root:x:0:0:root:/root:/bin/bash
daemon:x:1:1:daemon:/usr/sbin:/usr/sbin/nologin
xxx:x:1000:1000:Ubuntu:/home/ubuntu:/bin/bash
$ cat /etc/group
root:x:0:
adm:x:4:syslog,ubuntu
xxx:x:1000:

passwd文件内没有密码,那密码存储在哪里呢?实际存在/etc/shadow

1
2
3
4
$ sudo cat /etc/shadow
root:$6$mS9o0QBw$P1ojPSTexV2PQ.Z./rqzYex.k7TJE2nVeIVL0dql/:17126:0:99999:7:::
daemon:*:17109:0:99999:7:::
ubuntu:$6$GIfdqlb/$ms9ZoxfrUq455K6UbmHyOfz7DVf7TWaveyHcp.:17126:0:99999:7:::

如果你想以root用户来运行程序,得用sudo:

1
2
3
4
5
6
7
8
$ id
uid=1000(ubuntu) gid=1000(ubuntu) groups=1000(ubuntu),4(adm)
$ sudo id
uid=0(root) gid=0(root) groups=0(root)
$ sudo -u ubuntu id
uid=1000(ubuntu) gid=1000(ubuntu) groups=1000(ubuntu),4(adm)
$ sudo -u daemon id
uid=1(daemon) gid=1(daemon) groups=1(daemon)

如果你想登录到另外一个用户并启动各种命令,使用sudo bash 或者 sudo -u user bash。

当你不想输入密码登录服务器,则可以增加user到 /etc/sudoers 文件。

1
2
$ echo "$USER ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers
-bash: /etc/sudoers: Permission denied

你会发现只有root用户可以操作。

1
2
$ sudo echo "$USER ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers
-bash: /etc/sudoers: Permission denied

咋回事呢?还是不行。。。

当你以root权限执行echo命令追加一行到/etc/sudoers ,仍然使用的原user。

通常有两种解决方法:

  1. echo "$USER ALL=(ALL) NOPASSWD: ALL" | sudo tee -a /etc/sudoers
  2. sudo bash -c "echo '$USER ALL=(ALL) NOPASSWD: ALL' >> /etc/sudoers"

第一种,tee -a追加标准输入到文件,这时执行以root权限;

第二种,我们以root用户执行bash,用 (-c) 以root执行整个命令。注意双引号/单引号,因为 $USER变量转义的问题。

当你想更改密码时,可用 passwd,也可用 /etc/shadow 文件,这个文件必须用root权限:

1
2
$ ls -l /etc/shadow
-rw-r----- 1 root shadow 1122 Nov 27 18:52 /etc/shadow

passwd 如何才能被常规user执行往具有保护权限的文件写入?

当你启动一个进程时,那这个进程属于你的用户,即使这个可执行文件的拥有者是其它user。

你能改变文件的权限:

1
2
$ ls -l /usr/bin/passwd
-rwsr-xr-x 1 root root 54256 Mar 29 2016 /usr/bin/passwd

注意 s 字符,它是sudo chmod u+s /usr/bin/passwd实现的,意味着能以拥有者root的身份运行可执行文件。

你使用 find /bin -user root -perm -u+s会发现一个setuid 可执行文件。同理,对用户组可以用 (g+s)。

  • Process state

接下来看下htop中进程状态列,其用字母 S 表示。

下面是进程列的可能取值:

R 运行状态(running)或者运行队列中的就绪状态(runnable)
S 中断睡眠(等待事件完成)
D 非中断睡眠(常为IO)
Z 僵尸进程,无效进程但是未被父进程回收
T 被控制信号停止
t 跟踪时被调试者停止
X 死亡状态

注意,当你运行ps时,它将也会显示子状态,比如Ss,R+,Ss+,等等.

1
2
3
4
5
6
7
$ ps x
PID TTY STAT TIME COMMAND
1688 ? Ss 0:00 /lib/systemd/systemd --user
1689 ? S 0:00 (sd-pam)
1724 ? S 0:01 sshd: vagrant@pts/0
1725 pts/0 Ss 0:00 -bash
2628 pts/0 R+ 0:00 ps x

R - 运行状态或者运行队列中的就绪状态

在这种状态下,进程正在运行或者在运行队列中等待运行。

那运行的都是啥呢?

当你编译所写的源代码,生成的机器码是CPU指令集,并保存为可执行文件。当你启动程序时,该程序被加载进内存,然后CPU执行这些指令集。

从根本上来说,CPU是在执行指令,换句话说,处理数字。

S - 中断睡眠

这意味着该进程的指令不能在CPU上立即执行。相反地,该进程等待某个事件或者条件发生。当事件发生,系统内核设置进程状态为运行状态。

本例是GNU的coreutils软件包中的sleep工具。它将睡眠指定秒数。

1
2
3
4
5
6
7
$ sleep 1000 &
[1] 10089
$ ps f
PID TTY STAT TIME COMMAND
3514 pts/1 Ss 0:00 -bash
10089 pts/1 S 0:00 \_ sleep 1000
10094 pts/1 R+ 0:00 \_ ps f

所以这是一个中断睡眠。那如何中断该进程?通过发送控制信号。

你能在 htop 中点击 F9 ,然后在左侧菜单中选择一个信号发送。

发送的信号中最有名的是kill。因为kill是一个系统调用,其能发送信号给进程。程序/bin/kill能从用户空间做系统调用,默认的信号是使用TERM,该信号要求进程中止或者杀死。

信号其实只是一个数字,但是数字太难记住,所以我们常说对应的名字。信号名字用大写表示,并用SIG前缀。

常用的信号有INT, KILL, STOP, CONT, HUP。

让我们发送INT(也称作,SIGINT或者2或者 Terminal interrupt )中断睡眠。

1
2
$ kill -INT 10089
[1]+ Interrupt sleep 1000

当你在键盘上敲击CTRL+C 也会产生上面同样的效果。 bash 将发送前台进程 SIGINT 信号。

顺便提一下,在 bash中,虽然大部分操作系统都有 /bin/kill ,但是 kill 是一个内建命令。这是为什么呢?如果你创建的进程达到限制条件,它允许进程被kill。

实现相同功能的命令:

  • kill -INT 10089
  • kill -2 10089
  • /bin/kill -2 10089

另外一个有用的信号是 SIGKILL (也被称作 9)。你可以使用该信号kill掉不响应的进程,省的你发狂的按 CTRL+C 键盘。

当编写程序时,你能设置信号handler函数,该函数将在进程收到信号时被调用。换句话说,你能捕获信号,然后做点什么事。例如,清理或者优雅的关闭进程。所以发送 SIGINT 信号(用户想中断一个进程)和SIGTERM (用户想中止一个进程)并不意味着进程被中止。

当你运行Python脚本,你会发现一个意外:

1
2
3
4
5
$ python -c 'import sys; sys.stdin.read()'
^C
Traceback (most recent call last):
File "<string>", line 1, in <module>
KeyboardInterrupt

你可以告诉内核强制中止一个进程,使用发送 KILL信号:

1
2
3
4
$ sleep 1000 &
[1] 2658
$ kill -9 2658
[1]+ Killed sleep 1000

D - 非中断睡眠

不像中止睡眠进程那么简单,你不能用信号唤醒该进程。这就是为什么许多人喊怕看到这个状态。你不能kill该进程,因为kill意味着通过发送SIGKILL 信号给该进程。

如果进程必须等待不中断或者事件被期望快速发生,那这个状态被使用,比如读写磁盘。但是这仅仅发生一秒分之一。

引用自StackOverflow

不中断进程经常等到I/O出现页缺失(page fault)。进程/任务不能在这种状态下中断,因为它不能处理任何信号;如果中断了,另外一个页缺失将会发生,会返回到原始位置。

换句话说,如果你在使用NFS(网络文件系统)时出现中断,那得好久才恢复。

或者,以我的经验看,意味着进程正在交换许多小内存。

让我们试着一个进程进入不中断睡眠。

8.8.8.8 是Google提供的公共DNS服务。他们没有一个开放的NFS,但是也不能阻止试验。

1
2
3
4
$ sudo mount 8.8.8.8:/tmp /tmp &
[1] 12646
$ sudo ps x | grep mount.nfs
12648 pts/1 D 0:00 /sbin/mount.nfs 8.8.8.8:/tmp /tmp -o rw

如何找出刚才发生了什么? strace!

strace上面ps的输出命令:

1
2
3
$ sudo strace /sbin/mount.nfs 8.8.8.8:/tmp /tmp -o rw
...
mount("8.8.8.8:/tmp", "/tmp", "nfs", 0, ...

所以 mount 系统调用正在阻塞进程。

如果想看看发生了什么,你能运行带intr 选项的 mount 命令来中断: sudo mount 8.8.8.8:/tmp /tmp -o intr。

Z - 僵尸进程,无效进程但是未被父进程回收

当一个进程以 exit退出时,它还有子进程,此时子进程变成了僵尸进程。

  • 如果僵尸进程存在一小会,那相当正常;
  • 僵尸进程存在很长时间可能导致程序bug;
  • 僵尸进程不消耗内存,仅仅是一个进程ID;
  • 僵尸进程不能被kill ;
  • 发生SIGCHLD 信号能让父进程回收僵尸进程;
  • kill 僵尸进程的父进程能摆脱父进程和其僵尸进程

下面写个C程序的例子展示下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
int main() {
printf("Running\n");
int pid = fork();
if (pid == 0) {
printf("I am the child process\n");
printf("The child process is exiting now\n");
exit(0);
} else {
printf("I am the parent process\n");
printf("The parent process is sleeping now\n");
sleep(20);
printf("The parent process is finished\n");
}
return 0;
}

安装GNU C编译器(GCC):

1
sudo apt install -y gcc

编译代码并运行:

1
2
gcc zombie.c -o zombie
./zombie

查看进程树:

1
2
3
4
5
6
7
$ ps f
PID TTY STAT TIME COMMAND
3514 pts/1 Ss 0:00 -bash
7911 pts/1 S+ 0:00 \_ ./zombie
7912 pts/1 Z+ 0:00 \_ [zombie] <defunct>
1317 pts/0 Ss 0:00 -bash
7913 pts/0 R+ 0:00 \_ ps f

我们得到了僵尸进程。当父进程退出,僵尸进程也退出。

1
2
3
4
5
$ ps f
PID TTY STAT TIME COMMAND
3514 pts/1 Ss+ 0:00 -bash
1317 pts/0 Ss 0:00 -bash
7914 pts/0 R+ 0:00 \_ ps f

如果用while (true) ; 代替 sleep(20) ,僵尸进程将正常退出。

使用exit时,该进程所有的内存和资源被释放,其它的进程可以继续使用。

为什么要保留僵尸进程存在呢?

父进程使用 wait系统调用找出其子进程退出码(信号 handler)。如果一个进程睡眠,它需要等待唤醒。

为什么不简单的强制进程唤醒和kill掉?当你厌倦小孩时,你不会把他扔垃圾桶。这里的原因相同。坏事情总会发生的。

T - 被控制信号停止

打开两个终端窗口,使用 ps u能查看到用户的进程:

1
2
3
4
5
$ ps u
USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
ubuntu 1317 0.0 0.9 21420 4992 pts/0 Ss+ Jun07 0:00 -bash
ubuntu 3514 1.5 1.0 21420 5196 pts/1 Ss 07:28 0:00 -bash
ubuntu 3528 0.0 0.6 36084 3316 pts/1 R+ 07:28 0:00 ps u

忽略 -bash 和ps u进程。

现在在其中一个终端窗口运行cat /dev/urandom > /dev/null 。其进程状态为 R+ ,意味着正在运行。

1
2
3
$ ps u
USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
ubuntu 3540 103 0.1 6168 688 pts/1 R+ 07:29 0:04 cat /dev/urandom

按 CTRL+Z 停止该进程:

1
2
3
4
5
$ # CTRL+Z
[1]+ Stopped cat /dev/urandom > /dev/null
$ ps aux
USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
ubuntu 3540 86.8 0.1 6168 688 pts/1 T 07:29 0:15 cat /dev/urandom

该进程的状态现在为 T。

在第一个终端运行 fg 可以重新恢复该进程。

另外一种停止进程的方法是用 kill 发送 STOP 信号给进程。然后使用 CONT 信号可让进程恢复执行。

t - 跟踪时被调试者停止

首先,安装GNU调试器(gdb):

1
sudo apt install -y gdb

监听网络端口1234的入网连接:

1
2
$ nc -l 1234 &
[1] 3905

状态显示睡眠状态,那意味着该进程在等待网络传入数据。

1
2
3
$ ps u
USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
ubuntu 3905 0.0 0.1 9184 896 pts/0 S 07:41 0:00 nc -l 1234

运行调试器,并与进程ID为3905的进程关联:

1
sudo gdb -p 3905

你会发现这个进程的状态变为t,这意味着调试器正在跟踪该进程。

1
2
3
$ ps u
USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
ubuntu 3905 0.0 0.1 9184 896 pts/0 t 07:41 0:00 nc -l 1234
  • Process time

Linux是一个多任务的操作系统。这意味着,即使机器只有一个PCU,也能在同一个时间点运行多个进程。你可以通过SSH连接服务器查看 htop 输出,同时你的web服务也在通过网络传输博客内容给读者。

那系统是如何做到在单个CPU上一个时间点只执行一个指令呢?答案是时间共享。

一个进程运行“一点时间”,然后挂起;这时另外一个等待的进程运行“一点时间”。进程运行的这“一点时间”称为时间片(time slice)。

时间片通常是几毫秒。所以只要服务器系统的负载不高,你是注意不到的。

这也就可以解释为什么平均负载(load average)是运行进程的平均数了。如果你的服务器只有一个核,平均负载是1.0,那CPU的利用率达到100%。如果平均负载高于1.0,那意味着等待运行的进程数大于CPU能承载运行的进程数。所以会发现服务器宕机或者延迟。如果负载小雨1.0,那意味着CPU有时会处于空闲状态,不做任何工作。

这也给你一个提示:为什么一个运行了10秒的进程的运行时间有时会高于或者低于准确的10秒。

  • Process niceness and priority

当运行的task数比可用的CPU核数要多时,你必须找个方法决定接下来哪个task运行哪个task保持等待。这其实是 task scheduler的职责。

Linux内核的scheduler负责选择运行进程队列中哪个进程接下来运行,依赖于内核使用的scheduler算法。

一般你不能影响scheduler,但是让它知道哪个进程更重要。

Nice值(NI) 是表示用户空间进程优先级的数值,其代表静态优先级。Nice值的范围是-20~+19,拥有Nice值越大的进程的实际优先级越小(即Nice值为+19的进程优先级最小,为-20的进程优先级最大),默认的Nice值是0。Nice值增加1,降低进程10%的CPU时间。

priority(优先级,PRI)是Linux内核级的优先级,其代表动态优先级。该优先级范围从0到139,0到99表示实时进程,100到139表示用户空间进程优先级。

你可以改变Nice值让内核考虑该进程优先级,但是不能改变priority。

Nice值和priority之间的关系如下:

1
PR = 20 + NI

所以 PR = 20 + (-20 to +19) 的值是0到39,映射为100到139。

在启动进程前设置该进程的Nice值:

1
nice -n niceness program

当程序已经正在运行,可用 renice改变其Nice值:

1
renice -n niceness -p PID

下面是CPU使用颜色代表的意义:

蓝色:低优先级线程(nice > 0)

绿色:常规优先级线程

红色:内核线程

  • 内存使用 - VIRT/RES/SHR/MEM

进程给人的假象是只有一个进程使用内存,其实这是通过虚拟内存实现的。

进程没有直接访问物理内存,而是拥有虚拟地址空间,Linux内核将虚拟内存地址转换成物理内存或者映射到磁盘。这就是为什么看起来进程能够使用的内存比电脑真实的内存要多。

这里说明的是,想准确计算一个进程占用多少内存并不是那么直观。你也想计算共享内存或者磁盘映射内存吗?htop 显示的一些信息能帮助你估计内存使用量。

内存使用颜色代表的意义:

绿色:Used memory

蓝色:Buffers

橘黄色:Cache

VIRT/VSZ - 虚拟内存

task使用的虚拟内存总量。它包含代码、数据和共享内存(加上调出内存到磁盘的分页和已映射但未使用的分页)。

VIRT 是虚拟内存使用量。它包括所有的内存,含内存映射文件。

如果应用请求1GB内存,但是内存只有1MB,那 VIRT显示1GB。如果 mmap映射的是一个1GB 文件, VIRT也显示1GB。

大部分情况下, VIRT并不是一个太有用的数字。

RES/RSS - 常驻内存大小

task使用的非交换的物理内存。

RES是常驻内存使用量。

RES相比于 VIRT,能更好的表征进程的内存使用量:

不包含交换出的内存;

不包含共享内存

如果一个进程使用1GB内存,并调用fork()函数,fork的结果是两个进程的 RES 都是1GB,但是实际上只使用了1GB内存。因为Linux采用的是copy-on-write机制。

SHR - 共享内存大

task使用的共享内存总量。

简单的反应进程间共享的内存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
int main() {
printf("Started\n");
sleep(10);
size_t memory = 10 * 1024 * 1024; // 10 MB
char* buffer = malloc(memory);
printf("Allocated 10M\n");
sleep(10);
for (size_t i = 0; i < memory/2; i++)
buffer[i] = 42;
printf("Used 5M\n");
sleep(10);
int pid = fork();
printf("Forked\n");
sleep(10);
if (pid != 0) {
for (size_t i = memory/2; i < memory/2 + memory/5; i++)
buffer[i] = 42;
printf("Child used extra 2M\n");
}
sleep(10);
return 0;
}
1
2
3
fallocate -l 10G
gcc -std=c99 mem.c -o mem
./mem
1
2
3
4
5
6
7
8
Process Message VIRT RES SHR
main Started 4200 680 604
main Allocated 10M 14444 680 604
main Used 5M 14444 6168 1116
main Forked 14444 6168 1116
child Forked 14444 5216 0
main Child used extra 2M 8252 1116
child Child used extra 2M 5216 0

MEM% - 内存使用量占比

task当前使用的内存占比。

该值为 RES 除以RAM总量。

如果 RES 是400M,你有8GB的RAM,MEM% 是 400/8192*100 = 4.88%。

“庖丁解牛”式问诊Linux启动全过程

本文使用Digital Ocean droplet Ubuntu服务器启动过程为“蓝本”,详细解说Linux启动涉及的所有进程。

Ubuntu系统引导启动的进程都有哪些?你都需要它们吗?

下面是在全新Digital Ocean droplet 的Ubuntu(16.04.1 LTS x64)服务器上启动系统。

image

  • /sbin/init

/sbin/init程序,也称init,调度除boot进程外所有的进程,配置用户环境。

init启动后,将成为所有系统自动启动进程的父进程或者祖父进程。

它是systemd吗?

1
2
$ dpkg -S /sbin/init
systemd-sysv: /sbin/init

答案是,yes。

如果kill掉/sbin/init会发生什么呢?什么也不会发生,哈哈。

  • https://wiki.ubuntu.com/SystemdForUpstartUsers
  • https://www.centos.org/docs/5/html/5.1/Installation_Guide/s2-boot-init-shutdown-init.html
  • /lib/systemd/systemd-journald

systemd-journald进程是一个系统服务,其收集、存储log数据。它基于接收的log信息创建和维护结构化、索引journal。

换句话讲。

journald主要的变化之一,是采用优化的log存储替代简单文本log文件。使得系统管理员访问相应的log信息更有效。journald引入数据库式log的集中存储能力。

你可以使用 journalctl 命令查询log文件。

1
2
3
4
5
6
7
8
9
journalctl _COMM=sshd logs by sshd
journalctl _COMM=sshd -o json-pretty logs by sshd in JSON
journalctl --since "2015-01-10" --until "2015-01-11 03:00"
journalctl --since 09:00 --until "1 hour ago"
journalctl --since yesterday
journalctl -b logs since boot
journalctl -f to follow logs
journalctl --disk-usage
journalctl --vacuum-size=1G

相当酷有木有!

看起来不能移除或者disable该服务,但是你可以关闭logging。

  • https://www.freedesktop.org/software/systemd/man/systemd-journald.service.html
  • https://www.digitalocean.com/community/tutorials/how-to-use-journalctl-to-view-and-manipulate-systemd-logs
  • https://www.loggly.com/blog/why-journald/
  • https://ask.fedoraproject.org/en/question/63985/how-to-correctly-disable-journald/
  • /sbin/lvmetad -f

lvmetad守护进程缓存LVM元数据(metadata),所以LVM命令不用扫描磁盘就能读取元数据。

元数据缓存的优势,在于扫描磁盘是非常耗时的,并且可能中断系统和磁盘的正常工作。

那什么才是LVM(Logical Volume Management,逻辑卷管理)呢?

你可以认为逻辑卷管理LVM是动态分区(dynamic partitions),意味着你能在正在运行的系统上用命令行创建/重设大小/删除(create/resize/delete)LVM分区(用LVM的话讲是逻辑卷):无须重启操作系统来让内核感知新建或者重设大小的分区。

听起来像,如果你正在使用LVM服务,那得保留该服务。

1
2
$ lvscan
$ sudo apt remove lvm2 -y --purge
  • http://manpages.ubuntu.com/manpages/xenial/man8/lvmetad.8.html
  • http://askubuntu.com/questions/3596/what-is-lvm-and-what-is-it-used-for
  • /lib/systemd/udevd

systemd-udevd监听Linux内核uevent事件(uevent是内核空间和用户空间之间通信的机制,主要用于热插拔事件(hotplug))。对于每个事件,systemd-udevd都会根据udev规则执行匹配的指定设备。

udev是Linux内核的设备管理器。其作为devfsd和hotplug的升级,udev主要管理/dev目录下的设备节点。

所以该服务会管理 /dev。

作者不太确定是否一定要运行在虚拟机上。

  • https://www.freedesktop.org/software/systemd/man/systemd-udevd.service.html
  • https://wiki.archlinux.org/index.php/udev
  • /lib/systemd/timesyncd

systemd-timesyncd是使用远程网络时间协议来同步本地系统时钟的系统服务。

所以该服务是来替代ntpd的。

1
2
3
4
5
6
7
8
$ timedatectl status
Local time: Fri 2016-08-26 11:38:21 UTC
Universal time: Fri 2016-08-26 11:38:21 UTC
RTC time: Fri 2016-08-26 11:38:20
Time zone: Etc/UTC (UTC, +0000)
Network time on: yes
NTP synchronized: yes
RTC in local TZ: no

查看一下服务器上打开的端口:

1
2
3
4
5
$ sudo netstat -nlput
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN 2178/sshd
tcp6 0 0 :::22 :::* LISTEN 2178/sshd

以前在Ubuntu 14.04上打开的端口如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
$ sudo apt-get install ntp -y
$ sudo netstat -nlput
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN 1380/sshd
tcp6 0 0 :::22 :::* LISTEN 1380/sshd
udp 0 0 10.19.0.6:123 0.0.0.0:* 2377/ntpd
udp 0 0 139.59.256.256:123 0.0.0.0:* 2377/ntpd
udp 0 0 127.0.0.1:123 0.0.0.0:* 2377/ntpd
udp 0 0 0.0.0.0:123 0.0.0.0:* 2377/ntpd
udp6 0 0 fe80::601:6aff:fxxx:123 :::* 2377/ntpd
udp6 0 0 ::1:123 :::* 2377/ntpd
udp6 0 0 :::123 :::* 2377/ntpd

https://www.freedesktop.org/software/systemd/man/systemd-timesyncd.service.html

https://wiki.archlinux.org/index.php/systemd-timesyncd

  • /usr/sbin/atd -f

atd将作业加入队列稍后执行。atd通过at将业务加入队列。

at和批量从标准输入输出或者指定文件读命令并稍后执行。

cron命令调度作业周期性重复运行,at只在指定时间运行一次。

1
2
3
4
5
6
$ echo "touch /tmp/yolo.txt" | at now + 1 minute
job 1 at Fri Aug 26 10:44:00 2016
$ atq
1 Fri Aug 26 10:44:00 2016 a root
$ sleep 60 && ls /tmp/yolo.txt
/tmp/yolo.txt

不需要使用的话可以卸载:

1
sudo apt remove at -y --purge
  • http://manpages.ubuntu.com/manpages/xenial/man8/atd.8.html
  • http://manpages.ubuntu.com/manpages/xenial/man1/at.1.html
  • http://askubuntu.com/questions/162439/why-does-ubuntu-server-run-both-cron-and-atd
  • /usr/lib/snapd/snapd

Snappy Ubuntu Core是带有事务性更新的Ubuntu版本,其和当前的Ubuntu具有相同的library的最小服务器镜像,但是以更简单的机制来提供应用。

很显然,它是一个简化版的deb包,分发的所有依赖都在单个snap中。

作者从来不用snappy在服务器上发布或者分发应用,所以可以卸载:

1
sudo apt remove snapd -y --purge
  • https://developer.ubuntu.com/en/snappy/
  • https://insights.ubuntu.com/2016/06/14/universal-snap-packages-launch-on-multiple-linux-distros/
  • /usr/bin/dbus-daemon

在计算机中,D-Bus或者DBus是进程间通信( inter-process communication,IPC)和远程过程调用(remote procedure call,RPC)机制,它允许在同一台机器上并发运行的多个计算机程序(进程)通信。

作者觉得当你需要桌面环境时要启动该服务,当你只是在服务器上运行web应用则可以卸载:

1
sudo apt remove dbus -y --purge

然而,当你想看下时间是否通过NTP同步,发现了问题:

1
2
$ timedatectl status
Failed to create bus connection: No such file or directory
  • https://en.wikipedia.org/wiki/D-Bus
  • /lib/systemd/systemd-logind

systemd-logind是管理用户登录的系统服务。

  • https://www.freedesktop.org/software/systemd/man/systemd-logind.service.html
  • /usr/sbin/cron -f

cron守护进程执行调度计划命令。

-f 保持运行在前台,不以守护进程运行。

你可以使用cron周期性调度任务运行。

crontab -e 编辑cron的配置文件,在Ubuntu上可以用 /etc/cron.hourly,/etc/cron.daily等配置。

你可以使用下面的方法查看cron的log文件:

  • grep cron /var/log/syslog
  • journalctl _COMM=cron
  • journalctl _COMM=cron --since="date" --until="date"

如果你不想使用cron时,可以停止并disable该服务:

1
2
sudo systemctl stop cron
sudo systemctl disable cron

当使用 apt remove cron 删除cron服务时,会发现其会安装postfix:

1
2
3
4
5
$ sudo apt remove cron
The following packages will be REMOVED:
cron
The following NEW packages will be installed:
anacron bcron bcron-run fgetty libbg1 libbg1-doc postfix runit ssl-cert ucspi-unix

看起来cron服务需要邮件客户端(MTA)发送邮件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ apt show cron
Package: cron
Version: 3.0pl1-128ubuntu2
...
Suggests: anacron (>= 2.0-1), logrotate, checksecurity, exim4 | postfix | mail-transport-agent
$ apt depends cron
cron
...
Suggests: anacron (>= 2.0-1)
Suggests: logrotate
Suggests: checksecurity
|Suggests: exim4
|Suggests: postfix
Suggests: <mail-transport-agent>
...
exim4-daemon-heavy
postfix
  • https://help.ubuntu.com/community/CronHowto
  • https://www.digitalocean.com/community/tutorials/how-to-use-cron-to-automate-tasks-on-a-vps
  • http://unix.stackexchange.com/questions/212355/where-is-my-logfile-of-crontab
  • /usr/sbin/rsyslogd -n

Rsyslogd是提供消息日志的系统组件。

换句话说,rsyslogd将日志写入 /var/log/ 目录下,比如 /var/log/auth.log 是SSH登陆的权限日志。

rsyslogd的配置文件是/etc/rsyslog.d。

你也可以配置rsyslogd发送log文件到远程服务器,实现日志log中心化。

你也可以在后台脚本中使用 logger 命令将消息日志写入 /var/log/syslog 。

1
2
3
4
5
#!/bin/bash
logger Starting doing something
# NFS, get IPs, etc.
logger Done doing something

但是,前面已经有 systemd-journald 服务在运行了,那还需 rsyslogd 吗?

Rsyslog 和 Journal服务是存在于系统中的两个log日志应用,它们功能不同。大部分情况下,需要同时结合两者的功能。比如,创建结构化的消息并存储到文件数据库。通信接口需要Rsyslog提供输入和输出模块,通信socket由Journal提供。

所以呢?看样子还是暂时保留吧。

  • http://manpages.ubuntu.com/manpages/xenial/man8/rsyslogd.8.html
  • http://manpages.ubuntu.com/manpages/xenial/man1/logger.1.html
  • https://wiki.archlinux.org/index.php/rsyslog
  • https://www.digitalocean.com/community/tutorials/how-to-centralize-logs-with-rsyslog-logstash-and-elasticsearch-on-ubuntu-14-04
  • https://access.redhat.com/documentation/en-US/Red_Hat_Enterprise_Linux/7/html/System_Administrators_Guide/s1-interaction_of_rsyslog_and_journal.html
  • /usr/sbin/acpid

acpid,是高级配置与电源接口(Advanced Configuration and Power Interface,ACPI)事件守护进程。

acpid设计用来通知ACPI事件的用户空间程序,其在系统启动时已启动,并默认以后台进程运行。

计算机中的高级配置与电源接口,提供处理电源相关事件的开源标准。操作系统可以处理计算机硬件的发现和配置,可以进行电源管理。比如,将未使用的组件置为睡眠,进行状态监控。

但是本例中使用的虚拟机,不需要挂起/继续。

这里删除该服务,看下会发生什么。

1
sudo apt remove acpid -y --purge

作者可以成功执行 reboot重启 droplet,但是执行 halt 后必须通过web接口关闭虚拟机。

  • http://manpages.ubuntu.com/manpages/xenial/man8/acpid.8.html
  • https://en.wikipedia.org/wiki/Advanced_Configuration_and_Power_Interface
  • /usr/bin/lxcfs /var/lib/lxcfs/

Lxcfs主要是以lxc容器为用户提供fuse文件系统。在Ubuntu 15.04上,默认提供两个功能:一是,一些/proc文件的视图;二是,过滤访问主机的cgroup文件系统。

总之,在Ubuntu 15.04上你能用通用的方式( lxc-create)创建容器。创建的容器使用uptime、top等能得出“正确”结果。

不用LXC容器时可以移除该服务:

1
sudo apt remove lxcfs -y --purge
  • https://insights.ubuntu.com/2015/03/02/introducing-lxcfs/
  • https://www.stgraber.org/2016/03/31/lxcfs-2-0-has-been-released/
  • /usr/lib/accountservice/accounts-daemon

账户管理AccountsService包提供一系列D-Bus接口查询和管理用户账户信息。其是基于usermod(8),useradd(8) 和userdel(8) 命令实现的。

作者想知道移除该服务会出现什么问题。当移除DBus时, timedatectl失效。

1
sudo apt remove accountsservice -y --purge
  • http://www.linuxfromscratch.org/blfs/view/systemd/gnome/accountsservice.html
  • /sbin/mdadm

Linux组件mdadm是管理RAID设备的管理和监控软件。

其名字是源于md(multiple device,多设备)节点管理(administers),它替代之前的mdctl。原始的名字是 “Mirror Disk”,随着功能的增加名字随之改变。

RAID是将多块硬盘看作是一块硬盘的方法。RAID的目的有两个:1)扩展磁盘驱动容量:RAID 0。如果你有2个500GB的HDD,则总的容量即为1TB;2)防止驱动失败时数据丢失。比如RAID 1,RAID 5, RAID 6和RAID 10。

可以用如下命令移除:

1
sudo apt remove mdadm -y --purge
  • https://en.wikipedia.org/wiki/Mdadm
  • https://help.ubuntu.com/community/Installation/SoftwareRAID
  • http://manpages.ubuntu.com/manpages/xenial/man8/mdadm.8.html
  • /usr/lib/policykit-1/polkitd –no-debug

polkitd:PolicyKit守护进程。

polkit:授权管理。

有点类似是细粒度的sudo权限控制。你能允许非权限用户做某些root用户的操作。比如,桌面计算机中的Linux重启计算机。

这里是运行的服务器,可以移除该服务:

1
sudo apt remove policykit-1 -y --purge
  • http://manpages.ubuntu.com/manpages/xenial/man8/polkitd.8.html
  • http://manpages.ubuntu.com/manpages/xenial/man8/polkit.8.html
  • http://www.admin-magazine.com/Articles/Assigning-Privileges-with-sudo-and-PolicyKit
  • https://wiki.archlinux.org/index.php/Polkit#Configuration
  • /usr/sbin/sshd -D

sshd(OpenSSH Daemon),ssh的守护进程。

指定-D选项时,sshd不断开,也不成为守护进程。这样会更容易监控sshd。

  • http://manpages.ubuntu.com/manpages/xenial/man8/sshd.8.html
  • /sbin/iscsid

iscsid是系统守护进程,处理iSCSI配置和管理连接。从帮助页看到:

iscsid实现iSCSI协议的控制路径,和一些设备管理。比如,守护进程能配置成在服务器启动时自动重启,该功能基于持久化iSCSI数据库。

可以移除该服务:

1
sudo apt remove open-iscsi -y --purge
  • /sbin/agetty –noclear tty1 linux

agetty是alternative Linux getty的缩写。

getty,是”get tty”的简写,通常从 /etc/inittab 启动,允许用户从终端 (TTYs)登录。它会提示输入用户名,运行’login’ 程序授权用户登录。

早期getty存在传统Unix系统,其管理一系列连接到主机上的终端(电传打字机,Teletype machine)连接。其中tty是Teletype的缩写,后来代指各种文字终端。

如果在物理服务器上,你可以使用getty登录。在Digital Ocean,你可以点击droplet详情上的Console,通过浏览器和终端交互(可能是VNC连接)。

在过去,系统启动后你会看到一大泼tty启动(一般配置在/etc/inittab);不过现在都用systemd代替。

下面移除 agetty启动的配置文件:

1
2
sudo rm /etc/systemd/system/getty.target.wants/getty@tty1.service
sudo rm /lib/systemd/system/getty@.service

然后重启服务器,仍然可以通过SSH连接服务器,但是不能通过Digital Ocean的web控制台登录服务器啦。

image

  • http://manpages.ubuntu.com/manpages/xenial/man8/getty.8.html
  • https://en.wikipedia.org/wiki/Getty_(Unix))
  • http://0pointer.de/blog/projects/serial-console.html
  • http://unix.stackexchange.com/questions/56531/how-to-get-fewer-ttys-with-systemd
  • sshd: root@pts/0 & -bash & htop

sshd: root@pts/0 意味着root用户在#0伪终端 (pts)创建啦SSH会话。

bash 是指使用的shell。

但是为啥bash 开头有个破折号呢?Reddit上有解释:

bash 开头的破折号是因为bash 以login shell模式启动时(取得bash 时需要完整的登录流程)。启动login shell模式有两种方式:使用“-”参数启动;使用”—login”选项启动。它们都会加载配置文件。

htop 是交互式进程查看工具。

接着移除下面的服务:

1
2
3
4
5
6
7
8
sudo apt remove lvm2 -y --purge
sudo apt remove at -y --purge
sudo apt remove snapd -y --purge
sudo apt remove lxcfs -y --purge
sudo apt remove mdadm -y --purge
sudo apt remove open-iscsi -y --purge
sudo apt remove accountsservice -y --purge
sudo apt remove policykit-1 -y --purge

用htop查看会出现如下图:

image

再次挑战“极限”情况:

1
2
3
4
5
6
sudo apt remove dbus -y --purge
sudo apt remove rsyslog -y --purge
sudo apt remove acpid -y --purge
sudo systemctl stop cron && sudo systemctl disable cron
sudo rm /etc/systemd/system/getty.target.wants/getty@tty1.service
sudo rm /lib/systemd/system/getty@.service

再用htop查看:

image

未完待续。。。

Enjoy!


侠天,专注于大数据、机器学习和数学相关的内容,并有个人公众号:bigdata_ny分享相关技术文章。

若发现以上文章有任何不妥,请联系我。

image

分布式服务框架选型:面对Dubbo,阿里巴巴为什么选择了HSF?

发表于 2017-05-19

阿里巴巴集团内部使用的分布式服务框架 HSF(High Speed Framework,也有人戏称“好舒服”)已经被很多技术爱好者所熟知,目前已经支撑着近 2000 多个应用的运行。

其对应早期的开源项目 Dubbo(因为某些原因,Dubbo 项目在 2012 年年底,阿里巴巴就停止了对此开源项目的更新),则更是在互联网领域有着非常高的知名度和广泛的使用。

本文通过对阿里巴巴 HSF 服务框架的介绍,让大家能对这类分布式服务框架架构设计、运行原理,以及如何实现作为一个 SOA 架构需要满足的各个特性有一个清晰的认识。

HSF 服务框架主要组件

1、服务提供者

在服务框架中真正提供服务功能实现的应用实例,为了保障服务提供的高可用性,一般均是集群部署。

每一个 HSF 的应用均是以 War 包的形式存在,运行在阿里巴巴优化定制后的 Tomcat 容器中,在 Tomcat 容器层已经集成了 HSF 服务框架对服务提供者或服务调用者进行配置服务器发现、服务注册、订阅、失效转移等相关功能,所以不管是在服务提供者还是调用者开发时,只需要进行服务相关的配置操作,应用中无需引入任何 HSF 相关的 Jar 依赖包。

考虑到应用故障的隔离、更方便的服务管控,目前淘宝内部大部分应用的部署方式还是一个虚拟机(对应一个操作系统)运行一个 Tomcat 容器,每个 Tomcat 运行一个服务应用,随着近几年以 Docker 容器技术的发展和流行,现在阿里巴巴内部也正在进行应用容器化部署的工作,让服务器的资源利用更加科学和高效。

2、服务调用者

作为服务的消费者,大多数也是以 WAR 应用包的方式运行在 Tomcat 容器中,在阿里巴巴集团内部也有一部分是基于 C/C++、PHP、Node.js 等语言开发的服务调用者。

3、地址服务器

在 HSF 服务框架中,地址服务器肩负着给服务提供者和服务调用者提供部署环境中所有配置服务器和 Diamond 服务器的服务器列表信息,是由 Nginx( 是一个高性能的 HTTP 和反向代理服务器)提供该服务能力。

在部署 HSF 服务环境时,会将整个环境中的配置服务器集群(服务器 IP 列表)和 Diamond 服务器集群信息设置在地址服务器上,在实际生产部署中,也会部署多台地址服务器提供负载均衡和高可用性的服务,服务提供者和调用者通过统一域名的方式访问这些地址服务器(比如“xxx.tbsite.net”),通过 DNS 轮询实现地址服务器访问的高可用性。

4、配置服务器

配置服务器在 HSF 框架中主要负责记录环境内所有服务发布(服务提供者的 IP 地址和服务端口信息)和服务订阅(服务调用者的 IP 地址和服务端口信息)信息,并将服务相关信息推送到服务节点上。为了追求服务发布和订阅的推送效率,所有的服务发布和订阅信息均是保存在内存中。

配置服务器与所有服务者提供者和调用者均是长连接,采用心跳的方式可监控到各服务运行节点的状况,一旦出现服务提供者服务节点出现故障时,会自动推送更新后(将出问题的服务提供者服务节点信息从列表中删除)的服务提供者列表给相关的服务调用者端。

在生产环境中,会部署多台配置服务器,用于服务发布、订阅、推送的负载均衡,在多台配置服务器间会进行实时的数据同步,保证服务发布和订阅信息尽快能同步到各服务节点上。

某种程度上,配置服务器在 HSF 框架中扮演了服务调用调度的指挥官,通过给服务调用者端推送不同的服务提供者列表就可以轻易的调整服务调用的路由,这一特性在淘宝平台实现单元化(即某一客户在访问淘宝时,访问请求一旦路由到某一个淘宝机房后,在淘宝上进行的所有业务的操作均可以在该机房完成,而无需访问其他机房的服务,也是实现异地多活的基础)、异地多活起到了至关重要的作用。

5、Diamond 服务器

本质上,Diamond 服务器是一个通用的统一配置管理服务,类似 ZooKeeper,给应用提供统一的配置设置和推送服务,使用场景非常广泛,在阿里巴巴内部有很多的产品在需要进行配置的保存和获取时都会使用 Diamond 服务器。

在 HSF 服务框架中,则主要承担了服务调用过程中对于服务调用安全管控的规则、服务路由权重、服务 QPS 阀值等配置规则的保存,所有的信息均是持久化保存到了后端的 MySQL 服务器中,在生产环境中,会有多台 Diamond 服务器提供负载均衡的服务。

使用 Diamond 服务器进行服务相关设置的典型场景如下:

  • 通过设置白名单(服务调用者所在服务节点 IP 地址)的方式设置某些服务或服务中的方法只能让特定 IP 地址的服务器调用;
  • 通过用户认证的方式控制服务是否能够调用;
  • 按照不同的服务器权重设置服务调用者对多个服务提供者服务节点的访问;
  • 设置某些服务的 QPS 能力上限值,一旦该服务的 QPS 达到该阀值,则拒绝服务的继续调用,这也是实现服务限流的技术实现,在平台进行大促或秒杀场景时,保障平台的稳定的重要屏障。

通过这样规则的设置,Diamond 除了将这些规则保存在自身的数据库中外,会自动将这些规则推送到相关的服务节点上(实际实现上是服务节点会定时从 Diamond 服务器上同步相关配置信息),使这些规则能立即在服务运行环境中生效。

如图 3-5 所示是阿里巴巴 HSF 服务框架的工作原理,按照服务注册发布、服务订阅、服务规则推送、最终服务提供者和服务调用者间的服务交互的顺序说明了 HSF 服务框架中每个组件在整个框架中所扮演的角色。

img

图 3-5 HSF 服务框架工作原理示意图

1)服务节点对配置服务器列表的获取。

服务调用者和服务提供者在随着 Tomcat 容器启动后,会以域名(比如“xxx.tbsite.net”)的方式获取到可用的地址服务器,通过向地址服务器分别发送获取配置服务器和 Diamond 服务器服务 IP 列表请求的方式,在容器启动完成后,就已经在该服务节点上获取到了配置服务器和 Diamond 服务器的 IP 列表信息。整个过程如图 3-5 中的步骤①②。

2)服务的注册发布。

作为服务提供者,当获取到配置服务器的服务器列表后,则向配置服务器发送当前应用中包含的服务提供者相关信息(这些信息均是从应用的配置文件中获取到,比如服务的接口类全名、服务版本、所属服务组等信息),联同当前服务器的 IP 地址、服务端口等信息进行服务注册发布,如图 3-5 中的步骤③。这个步骤在每一个有服务提供的应用启动时都会自动执行,比如现在有 5 个提供同一服务的应用启动后,此时在配置服务器上就已经保存了提供这一服务的 5 个服务器相关信息。

3)服务的订阅。

当作为服务调用者的应用启动时,同样在完成配置服务器列表的获取后,就进行与配置服务器的交互,发送服务消费者相关信息(同样包含了服务的接口全名,服务版本、所属服务组)到配置服务器进行服务的订阅,此时在配置服务器上会通过服务接口全名+服务版本作为匹配条件在当前配置服务器的内存中进行搜索,一旦获取到对应的服务注册信息,则将对应的服务提供者的服务器组 IP 地址及端口返回给服务调用者所在的应用节点上,此时也就完成了服务调用者端对于它所需要调用的服务提供者服务器列表信息,用于在服务真正交互时使用。服务订阅过程如图 3-5 中的步骤④⑤。

4)服务规则的推送(如果需要)。

如果没有上文提到对于服务安全管控、流量控制等需求的时候,对于 Diamond 服务器的使用并不是必需的,在有这样的需求场景时,可通过 Diamond 提供的规则设置界面,可以对指定服务的服务提供者和调用者设置相关的规则,一旦保存规则后,则此规则配置将会在 5 秒内推送到与所设置服务相关的服务节点上。如图 3-5 中的步骤⑥。

5)服务交互。

在应用进行业务请求处理过程中,出现了服务调用者对服务提供者的调用时,服务调用者会从已经保存在该应用节点上的服务提供者服务器列表中选择(阿里巴巴内部使用随机模式)其中一台进行服务请求的发送,服务交互期间完全是服务调用者和服务提供者间两台服务器间的,无需通过中间服务器的中转,这就是相比于“中心化” ESB 模式,所有服务交互都需要“中心” ESB 进行服务路由,而当前这种架构称为“去中心化”的主要原因。如图 3-5 中的步骤⑦。

阿里巴巴的分布式服务框架核心是以服务化的方式构建整个应用体系的同时,要保证在高并发的情况下,服务具备高效交互、高可用性和扩展能力。接下来对于 HSF 框架如何给服务提供以上能力具体加以说明。

1、HSF 框架采用 Netty + Hession 数据序列化协议实现服务交互

HSF 框架中采用如今流行的网络通信框架 Netty 加上 Hession 数据序列化协议实现 HSF 服务间的交互,主要考虑点是在大并发量时,服务交互性能达到最佳。这类 RPC 协议采用多路复用的 TCP 长连接方式,在服务提供者和调用者间有多个服务请求同时调用时会共用同一个长连接,即一个连接交替传输不同请求的字节块。它既避免了反复建立连接开销,也避免了连接的等待闲置从而减少了系统连接总数,同时还避免了 TCP 顺序传输中的线头阻塞(head-of-line blocking)问题。

Hessian 是 HSF 框架中默认使用的数据序列化协议,在数据量较小时性能表现出众,Hessian 的优点是精简高效,同时可以跨语言使用,目前支持 Java, C++, .net, Python, ruby 等语言。另外 Hessian 可以充分利用 Web 容器的成熟功能,在处理大量用户访问时很有优势,在资源分配、线程排队、异常处理等方面都可以由 Web 容器保证。

HSF 框架同时也支持切换使用 Java 序列化,Hession 相比 JDK 标准的序列化方式(即基于 Serializable 接口的标准序列化),在典型场景中,其序列化时间开销可能缩短 20 倍。虽然 Hessian 不是最快的序列化协议,但它对于复杂业务对象的序列化正确率、准确性相较于最稳定的 Java 序列化并不逊色太多。

业界还有一些比 Hessian 更快的序列化协议,但它们相对于 Hessian 在复杂场景下的处理能力还是会差一些,所以 Hessian 是在性能和稳定性同时考虑下最优的序列化协议。

阿里巴巴当时在对多种通信协议和数据序列化组件等测试中,Netty + Hession 的组合在互联网高并发量的场景下,特别是在 TPS 上达到 10w 以上时,性能和效率远比 REST 或者 Web Service 高。

2、HSF 框架的容错机制

因为要保证服务的高可用性,所以在生产环境部署中一定会有多个应用实例作为服务提供者提供某一相同服务。

基于之前所提到的服务框架的运行原理的说明,在进行服务调用时,服务调用者端已经保存了它所需要调用的服务提供者的服务器列表信息(如图 3-6 中为例,则保存了三台服务提供者所在服务器的列表)。

当采用随机方式获取其中一台进行服务交互时(如图 3-6 步骤①),不管是第一台服务器已经某种故障造成了服务请求无法响应,还是该服务器已经接收了服务请求,在进行服务请求处理过程中出现了服务器故障(比如宕机、网络问题),造成该服务器没有在规定的时间(一般服务调用会设置到期时间)返回服务处理的结果,服务调用者端则会获取到服务调用失败的反馈(如图 3-6 步骤②)。

在 HSF 服务调用的代码中会立即从剩下的服务提供者服务器列表中选择另外一个服务器再次进行服务请求(如图 3-6 步骤③),这一次这个服务提供者实例正常提供了此次服务的请求(如图 3-6 步骤④),从而保证了在个别服务提供者出现故障时,完全不会影响该服务正常提供服务。

img

图 3-6 HSF 服务框架实现服务高可用性原理示意图

同时,因为配置服务器是采用长连接的方式与服务节点进行网络通讯,一旦发现有服务提供者实例出现故障,配置服务器在秒级就会感知到(如图 3-6 步骤⑤),此时会将出问题这台服务提供者的信息从该服务的服务器列表中删除,并将更新后的服务器列表采用推送的方式同步给与该服务相关的所有服务调用者端(如图 3-6 步骤⑥),这样当下次服务调用者再进行此服务的调用时,就不会因为随机的方式再次对已经停止服务提供的服务器发起服务的调用。

3、HSF 框架的线性扩展支持

作为 HSF 框架设计之初,最为重要的一个特性就是服务能力的可扩展性。也就是真正的做到某个服务的业务处理能力能随着服务器资源的增加得到线性的增长。

其实在传统架构中一直也会强调平台的扩展能力,但均会程度不一的出现服务节点数量到达一定量后,出现阻碍平台服务能力扩展的问题,有的是出现网络传输的瓶颈、也有服务节点接入数量上的限制,前文所描述的 ESB 架构带来的“雪崩”效应也均是这类架构给服务能力的扩展带来影响的原因所在。

如图 3-7 中所描述的场景,当服务面对较大的服务调用压力或将要面临如天猫双11大促、秒杀等活动前,已有的服务提供者各服务器水位(CPU、内存、IO等)处于比较高的情况或现有服务能力满足不了业务访问量的要求时,则需要通过增加服务节点数量的方式提升该服务的服务处理能力。

img

图 3-7 HSF 服务框架对服务能力线性扩展支持1

此时,只需要通过增加该服务的服务提供者实例(如图 3-8 所示,增加了一个服务),基于 HSF 框架的运行机制,新增加的服务提供者实例一旦应用启动完成后,可在几秒内开始进行服务请求的处理(主要完成服务注册发布、更新后服务列表推送到服务调用者端),从而达到分担其他服务器实例压力的作用,实现服务能力整体水位恢复到正常的状态(如图 3-9)。

img

图 3-8 HSF服务框架对服务能力线性扩展支持2

img

图 3-9 HSF 服务框架对服务能力线性扩展支持3

正是基于 HSF 框架这一特性,从而真正实现了只要增加服务实例就能实现该服务能力扩展的目标,目前在阿里巴巴共享服务事业部中多个服务中心在天猫双 11 那天各自所部署的服务实例节点数量均超过 2000,即同一个服务由超过 2000 个服务实例同时提供负载均衡的服务。

本文由公众号《高可用架构》原创。


侠天,专注于大数据、机器学习和数学相关的内容,并有个人公众号:bigdata_ny分享相关技术文章。

若发现以上文章有任何不妥,请联系我。

image

Savepoint:Flink让时光倒流

发表于 2016-12-05

现在互联网产品对数据的实时性要求极其强烈,比如,某电商产品的推荐系统,当一个用户点击页面就会在秒级内给出相应的推荐页面。进而,实时流处理技术讨论变得越加频繁《各大主流流处理框架大比拼》和《实时流处理框架选型:就应该这样“拉出来遛遛”》,比如,延迟性、吞吐量、watermark…

接下来,进入主题:Flink实时流处理中的“reprocess data”。

相信很多同行经常遇到以下几种case:

  • 开发新feature或者bug修复,程序新版本上线;
  • 不同版本产品的A/B test;
  • 评估和实现在新处理框架下的应用迁移,或者迁移到不同的集群

以上所有情况都可以使用Flink的savepoint功能实现。

Savepoint是什么

简而言之,Flink的savepoint是一个全局的、一致性的快照(snapshot)。其包含两方面:

  • 数据源所有数据的位置;
  • 并行操作的状态

“全局一致”是指所有的输入源数据在指定的位置,所有的并行操作的状态都被完全checkpoint了。注意理解这句话,可以多读几遍回味一下。

如果你的应用在过去某个时间点做了savepoint,那你随时可以从前面的savepoint更新发布应用。这时,新的应用会从savepoint中的操作状态进行初始化,并从savepoint的数据源位置开始重新处理所有数据。

Flink的savepoint是完全不依赖的,所以你每个应用可以有N个savepoint,你可以回退到多个位置重新开始你的应用(可以是不同的应用,如下图所示)。这个功能在流处理应用是相当强大的。

image

有读者会觉得上图似曾相识,其实你可能想到了Flink的checkpoint,这时是不是有点糊涂了,那savepoint和checkpoint到底啥关系呢?详细答案会在后续某篇文章揭晓,这里先简单说下:checkpoint是Flink实现容错的,savepoint仅仅只是checkpoint的一个扩展。如果checkpoint开启,那Flink会周期性的创建所有操作状态的checkpoint。savepoint和checkpoint最大的不同是,checkpoint会按时间间隔自动创建,而savepoint需要手动触发。

为了让 “reprocess data”得到更精确的结果,那我们不得不提event-time和processing-time或者ingestion-time的区别,这也是在各个流处理技术里常提到的时间语义。不过这里先不展开,后续也会有文章专门讲到。为了让 “reprocess data”得到更精确的结果需要使用event-time,因为依赖processing-time或者ingestion-time的应用会根据当前的wall-clock时间来处理。

如何实现savepoint

实际上,使用savepoint的前提有以下几点:

  • 开始checkpoint;
  • 可重复使用的数据源(e.g., Apache Kafka,Amazon Kinesis,或者文件系统);
  • 所有保存的状态需继承Flink的管理状态接口;
  • 合适的state backend配置

做到了这几点,那你可以通过CLI命令行实现savepoint并重新从savepoint开始应用:

  1. 创建savepoint

首先,获取Flink所有正在运行的job list:

1
2
3
user$ flink list
------------Running/Restarting Jobs------------
12.04.2016 16:20:33 : job_id : 12345678 (RUNNING)

接着,使用刚才获取到的job ID创建savepoint:

1
user$ flink savepoint job_id

这时你可以选择取消正在运行的job(可选操作):

1
user$ flink cancel job_id
  1. 从savepoint开启job
1
user$ flink run -d -s hdfs://savepoints/1 directory/your-updated-application.jar
如果更新应用,该咋办?

修改的应用从一个savepoint开始需要考虑以下两种情况:

  • 用户自定义逻辑的改变,比如,MapFunction;
  • 应用的拓扑的改变,比如,增加或者移除操作

如果你的情况属于上面描述的第一类,那不需要做其他额外处理。但是,第二种情况,Flink要求修改前后的操作要能匹配上,这样才好使用保存的操作状态。这时你需要手动在原始和更新的应用中分配操作ID,因为没有操作ID是不可能改变应用的拓扑,所以最好要尽可能的分配操作ID,如下:

1
2
3
4
5
6
7
8
9
10
11
DataStream stream = env.
// Stateful source (e.g. Kafka) with ID
.addSource(new StatefulSource())
.uid(“source-id”)
.shuffle()
// The stateful mapper with ID
.map(new StatefulMapper())
.uid(“mapper-id”)
// Stateless sink (no specific ID required)
stream.print()
总结

Savepoint是Flink与其它流处理技术的独特之处,要好好的利用起来。

不过Flink的savepoint使用也有诸多限制,后续有机会再讲到,但相对于Spark Streaming的checkpoint来说还是高级了不少。

PS:虽然Spark项目的star数比Flink多一个数量级,但Flink在某些feature上的开发和布局比Spark更快,感觉Flink开发者在最近代表着实时流处理和离线大数据技术的方向,看好Flink。

参考:
[1] http://data-artisans.com/turning-back-time-savepoints

Enjoy!


侠天,专注于大数据、机器学习和数学相关的内容,并有个人公众号:bigdata_ny分享相关技术文章。

若发现以上文章有任何不妥,请联系我。

image

Kafka Cluster优化两三事

发表于 2016-12-05

写在之前:本文将讲述Kafka Cluster配置和优化。

Kafka Cluster(相对于单个server)最大的优点:可扩展性和容错性。

image

​ Kafka集群简图

Kafka Broker个数

决定Kafka集群大小的因素有以下几点:

  • 磁盘容量:首先考虑的是所需保存的消息所占用的总磁盘容量和每个broker所能提供的磁盘空间。如果Kafka集群需要保留 10 TB数据,单个broker能存储 2 TB,那么我们需要的最小Kafka集群大小 5 个broker。此外,如果启用副本参数,则对应的存储空间需至少增加一倍(取决于副本参数)。这意味着对应的Kafka集群至少需要 10 个broker。
  • 请求量:另外一个要考虑的是Kafka集群处理请求的能力。这主要取决于对Kafka client请求的网络处理能力,特别是,有多个consumer或者网路流量不稳定。如果,高峰时刻,单个broker的网络流量达到80%,这时是撑不住两个consumer的,除非有两个broker。再者,如果启用了副本参数,则需要考虑副本这个额外的consumer。也可以扩展多个broker来减少磁盘的吞吐量和系统内存。
Kafka Broker配置

同一个Kafka集群的所有broker机器必须满足以下两个参数:

  • 所有broker机器需配置相同的zookeeper连接参数(.connect)。这决定了Kafka集群存储的元数据位置;
  • 所有broker机器需配置唯一的broker id( .id)。如果一个集群下的两个broker配置了相同的broker id,则第二个broker启动时会失败并报错。
操作系统优化

大部分Linux发布版本默认的内核参数配置能让大部分应用工作的相当好。但对于实际的Kafka broker场景来说,做稍些改变会提升broker性能。主要涉及的配置:虚拟内存、网络和磁盘挂载(用来存储log segment),一般在/etc/sysctl.conf (CentOS系统)。

  • Virtual Memory

一般来说,Linux的虚拟内存会根据系统负载自动调整。内存页(page)swap到磁盘会显著的影响Kafka的性能,并且Kafka重度使用page cache,如果VM系统swap到磁盘,那说明没有足够的内存来分配page cache。

避免swap的一种方式是设置swap空间为0。但是,swap会在系统崩溃时提供安全机制,或者会在out of memory的情况下阻止操作系统 kill 掉进程。由于这个原因,推荐 vm.swappiness参数设置为一个非常低的值:1 。这个参数表示 VM系统中的多少百分比用来作为swap空间。

另外一种方式是通过内核调节“脏页”(注:“脏页”会被刷到磁盘上)。Kafka依赖磁盘I/O性能来提高producer的响应时间。这也是为什么通常优先把log segment功能放在可以快速响应的磁盘中(比如,SSD)。这样使得flush进程把“脏数据”写入磁盘前,“脏页”数目就减少了,可以设置vm.dirty_background_ratio(表示占用系统内存的百分比)参数的值为 10 以下。大部分应用场景下,vm.dirty_background_ratio设置为 5 就够用了,要注意了:这个参数值不能设置为 0 ,因为设置为 0 后会引起内核持续刷“脏页”,使得内核的buffer write功能没法施展。

“脏页”的总量可以通过vm.dirty_ratio 来改变,默认值是 20 (此处也是百分比),这个值的设置范围较大,一般建议设置 60 到 80 为合理的值。但是vm.dirty_ratio 参数也引来了不小的风险,会造成大量unflush的数据在硬刷到磁盘时产生较长的I/O停顿。如果vm.dirty_ratio 值设置的较大时,强烈建议Kafka开启备份功能,以备系统崩溃。

在设置了这些参数后,需要监控Kafka集群运行时“脏页”的数量,当前“脏页”数量可由如下方式查看(/proc/vmstat文件):

1
2
3
# cat /proc/vmstat | egrep "dirty|writeback" nr_dirty 3875
nr_writeback 29
nr_writeback_temp 0
  • 磁盘

除了考虑磁盘硬件本身和RAID配置外,磁盘的filesystem对Kafka集群的影响最大。虽然有许多filesystem,但最常用的是EXT4或者XFS。在这里XFS文件系统比EXT4稍好,具体原因Google下。

另外一点是,建议开启mount的noatime mount选项。文件系统在文件被访问、创建、修改等的时候会记录文件的一些时间戳,比如:文件创建时间(ctime)、最近一次修改时间(mtime)和最近一次访问时间(atime)。默认情况下,atime的更新会有一次读操作,这会产生大量的磁盘读写,然而atime对Kafka完全没用。

  • 网络

Linux发布版本的网络参数对高网络流量不适用。对于Kafka集群,推荐更改每个socket发送和接收buffer的最大内存:net.core.wmem_default 和 net.core.rmem_default 为128 kb,net.core.wmem_max 和net.core.rmem_max 为 2 Mb。另外一个socket参数是TCP socket的发送和接收buffer: net.ipv4.tcp_wmem 和 net.ipv4.tcp_rmem。

Kafka集群稳定

主要涉及到GC、数据中心布局和ZK使用:

  • GC调优

调GC是门手艺活,幸亏Java 7引进了G1 垃圾回收,使得GC调优变的没那么难。G1主要有两个配置选项来调优:MaxGCPauseMillis和InitiatingHeapOccupancyPercent,具体参数设置可以参考Google,这里不赘述。

Kafka broker能够有效的利用堆内存和对象回收,所以这些值可以调小点。对于 64Gb内存,Kafka运行堆内存5Gb,MaxGCPauseMillis和InitiatingHeapOccupancyPercent 分别设置为 20毫秒和 35。

Kafka的启动脚本使用的不是 G1回收,需要在环境变量中加入:

1
2
3
# export JAVA_HOME=/usr/java/jdk1.8.0_51
# export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true"
# /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
  • 数据中心布局

原则上Kafka broker不建议都在一个机架上,为了容灾,但现实情况大部分公司做不到,此处略去。

  • Zookeeper

Kafka集群利用ZK来存储broker、topic和partition的元数据信息。

在Kafka 0.9.0之前,consumer利用ZK来直接存储consumer group的信息,包括topic的消费情况、每个partition消费的周期性commit。在0.9.0版本,提供新的consumer接口利用Kafka broker来管理。

Consumer可以选择使用Zk或者Kafka来提交 offset和 提交间隔。如果consumer使用ZK管理offset,那每个consumer在每个partition的每个时间间隔写入ZK。合理的offset提交间隔是1分钟,但如果一个Kafka集群有大量的consumer消费时,这个ZK流量将是巨大的。所以如果ZK不能处理大流量,那只能把offset提交间隔设大,但同时也带来丢数据的风险。最保险的建议是使用Kafka来提交offset。

另外,建议Kafka集群不要和其他应用使用同一组ZK,因为Kafka对于ZK的延迟和超时是相当敏感的,ZK的不通将会导致Kafka的不可预测性。

总结

Kafka在各大互联公司应用广泛,希望上述Kafka集群调优对各位有帮助。

PS:最近在负责招聘,有Hadoop、Spark、Flink、Kafka、Storm等相关经验的优秀人才,请联系我或者后台留言。

Enjoy!


侠天,专注于大数据、机器学习和数学相关的内容,并有个人公众号:bigdata_ny分享相关技术文章。

若发现以上文章有任何不妥,请联系我。

image

机器学习可视化系统完结篇:模型评估和参数调优

发表于 2016-10-24

机器学习可视化系统完结篇:模型评估和参数调优

写在之前:前两篇讲述了特征分析:《可视化图表让机器学习“biu”的一样简单:特征分析》和模型选择:《机器学习模型选择如此简单》。

本篇文章详细阐述机器学习模型评估和参数调优。将主要围绕两个问题来阐述:

  1. “知其所以然”:当你选择的一个机器学习模型运行时,你要知道它是如何工作的;
  2. “青出于蓝”:更进一步,你得知道如何让此机器学习模型工作的更优。

模型评估的方法

一般情况来说,F1评分或者R平方(R-Squared value)等数值评分可以告诉我们训练的机器学习模型的好坏。也有其它许多度量方式来评估拟合模型。

你应该猜出来,我将提出使用可视化的方法结合数值评分来更直观的评判机器学习模型。接下来的几个部分将分享一些有用的工具。

首先想声明的,单单一个评分或者一条线,是无法完全评估一个机器学习模型。偏离真实场景来评估机器学习模型(’good’ or ‘bad’)都是“耍流氓”。某个机器学习模型若可“驾驭”小样本数据集生成最多预测模型(即,命中更多预测数据集)。如果一个拟合模型比其它拟合过的模型形式或者你昨天的预测模型能够得到更好的结果,那即是好(’good’)。

下面是一些标准指标: confusion_matrix,mean_squared_error, r2_score,这些可以用来评判分类器或者回归的好坏。表格中给出的是Scikit-Learn中的函数以及描述:

评估分类模型:

指标 描述 Scikit-learn函数
Precision 精准度 from sklearn.metrics import precision_score
Recall 召回率 from sklearn.metrics import recall_score
F1 F1值 from sklearn.metrics import f1_score
Confusion Matrix 混淆矩阵 from sklearn.metrics import confusion_matrix
ROC ROC曲线 from sklearn.metrics import roc
AUC ROC曲线下的面积 from sklearn.metrics import auc

评估回归模型:

指标 描述 Scikit-learn函数
Mean Square Error (MSE, RMSE) 平均方差 from sklearn.metrics import mean_squared_error
Absolute Error (MAE, RAE) 绝对误差 from sklearn.metrics import mean_absolute_error, median_absolute_error
R-Squared R平方值 from sklearn.metrics import r2_score

下面开始使用Scikit-Learn的可视化工具来更直观的展现模型的好坏。

评估分类模型

我们评估分类器是判断预测值时否很好的与实际标记值相匹配。正确的鉴别出正样本(True Positives)或者负样本(True Negatives)都是True。同理,错误的判断正样本(False Positive,即一类错误)或者负样本(False Negative,即二类错误)。

注意:True和False是对于评价预测结果而言,也就是评价预测结果是正确的(True)还是错误的(False)。而Positive和Negative则是样本分类的标记。

通常,我们希望通过一些参数来告知模型评估如何。为此,我们使用混淆矩阵。

混淆矩阵

Confusion Matrix

幸运的是,Scikit-Learn提供内建函数(sklearn.metrics.confusion_matrix)来计算混淆矩阵。输入数据集实际值和模型预测值作为参数,输出即为混淆矩阵,结果类似这样:

1
2
[[1238 19] # True Positives = 1238, False Negatives = 19
[ 2 370]] # False Positives = 2, True Negatives = 370
分类报告

分类报告除了包括混淆矩阵,也增加了其它优势,比如,混淆矩阵会标示样例是否被正确鉴别,同时也提供precision,recall和 F1 值三种评估指标。

1
2
3
from sklearn.metrics import classification_report
print(classification_report(y_true, y_pred, target_names=target_names))

更进一步,可以对Scikit-Learn的内建函数做些加强,比如,使用带颜色区分的热力图,它将帮助我们的眼睛更容易的辨别预测成功(橘黄色)和失败(灰色)。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from matplotlib import colors
from matplotlib.colors import ListedColormap
ddl_heat = ['#DBDBDB','#DCD5CC','#DCCEBE','#DDC8AF','#DEC2A0','#DEBB91',\
'#DFB583','#DFAE74','#E0A865','#E1A256','#E19B48','#E29539']
ddlheatmap = colors.ListedColormap(ddl_heat)
def plot_classification_report(cr, title=None, cmap=ddlheatmap):
title = title or 'Classification report'
lines = cr.split('\n')
classes = []
matrix = []
for line in lines[2:(len(lines)-3)]:
s = line.split()
classes.append(s[0])
value = [float(x) for x in s[1: len(s) - 1]]
matrix.append(value)
fig, ax = plt.subplots(1)
for column in range(len(matrix)+1):
for row in range(len(classes)):
txt = matrix[row][column]
ax.text(column,row,matrix[row][column],va='center',ha='center')
fig = plt.imshow(matrix, interpolation='nearest', cmap=cmap)
plt.title(title)
plt.colorbar()
x_tick_marks = np.arange(len(classes)+1)
y_tick_marks = np.arange(len(classes))
plt.xticks(x_tick_marks, ['precision', 'recall', 'f1-score'], rotation=45)
plt.yticks(y_tick_marks, classes)
plt.ylabel('Classes')
plt.xlabel('Measures')
plt.show()
cr = classification_report(y_true, y_pred)
plot_classification_report(cr)

Classification Report

看起来挺容易,对不?发现分类热力图的另外一个好处,它可以让我们看出一类错误 VS 二类错误。但有一个缺陷,它并不能垮模型进行比较,而这对评估拟合模型是相当重要的。因为这个原因,接下来将使用第二篇文章中的classify和regress代码。

下面的get_preds函数将输出一个实际标记值和预测值的二元组,这个二元组将会使得后续的跨模型的可视化比较变得容易:

1
2
3
4
5
6
7
8
9
10
11
12
13
def get_preds(attributes, targets, model):
'''
Executes classification or regression using the specified model
and returns expected and predicted values.
Useful for comparison plotting!
'''
splits = cv.train_test_split(attributes, targets, test_size=0.2)
X_train, X_test, y_train, y_test = splits
model.fit(X_train, y_train)
y_true = y_test
y_pred = model.predict(X_test)
return (y_true,y_pred)
ROC曲线

另一种评估分类模型的方法是ROC(Receiver Operating Characteristic)曲线。我们能从Scikit-Learn 指标模块中import roc_curve,计算 true positive率和false positive 率的数值。我们也可以画出ROC曲线来权衡模型的敏感性和特异性。

下面的代码将画出ROC,Y轴代表true positive率,X轴代表false positive 率。同时,我们也可以增加同时比较两种不同的拟合模型,这里看到的是 KNeighborsClassifier 分类器远胜 LinearSVC 分类器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def roc_compare_two(y, yhats, models):
f, (ax1, ax2) = plt.subplots(1, 2, sharey=True)
for yhat, m, ax in ((yhats[0], models[0], ax1), (yhats[1], models[1], ax2)):
false_positive_rate, true_positive_rate, thresholds = roc_curve(y,yhat)
roc_auc = auc(false_positive_rate, true_positive_rate)
ax.set_title('ROC for %s' % m)
ax.plot(false_positive_rate, true_positive_rate, \
c='#2B94E9', label='AUC = %0.2f'% roc_auc)
ax.legend(loc='lower right')
ax.plot([0,1],[0,1],'m--',c='#666666')
plt.xlim([0,1])
plt.ylim([0,1.1])
plt.show()
y_true_svc, y_pred_svc = get_preds(stdfeatures, labels, LinearSVC())
y_true_knn, y_pred_knn = get_preds(stdfeatures, labels, KNeighborsClassifier())
actuals = np.array([y_true_svc,y_true_knn])
predictions = np.array([y_pred_svc,y_pred_knn])
models = ['LinearSVC','KNeighborsClassifier']
roc_compare_two(actuals, predictions, models)

ROC_AUC Curve

在ROC空间,ROC曲线越凸向左上方向效果越好;越靠近对角线,分类器越趋向于随机分类器。

同时,我们也会计算曲线下的面积(AUC),可以结合上图。如果AUC的值达到0.80,那说明分类器分类非常准确;如果AUC值在0.60~0.80之间,那分类器还算好,但是我们调调参数可能会得到更好的性能;如果AUC值小于0.60,那就惨不忍睹了,你得好好分析下咯。

评估回归模型

对于混凝土数据集试验一些不同的机器学习模型,然后评判哪种更好。在第二篇文章中,我们使用的平均方差和 R 平方值,比如:

1
2
Mean squared error = 116.268
R2 score = 0.606

这些数值是有用的,特别是对不同的拟合模型比较平均方差和 R 平方值。但是,这是不够的,它不能告诉我们为什么一个模型远胜于另外一个;也不能告诉我们如何对模型调参数提高评分。接下来,我们将看到两种可视化的评估技术来帮助诊断模型有效性:预测错误曲线 和 残差曲线。

预测错误曲线

为了知道我们的模型预测值与期望值到底有多接近,我们将拿混凝土数据集(混凝土强度)做例子,画出其期望值和模型预测值曲线。下面是不同回归模型的错误曲线:Ridge, SVR 和RANSACRegressor。

1
2
3
4
5
6
7
8
9
10
11
12
13
def error_compare_three(mods,X,y):
f, (ax1, ax2, ax3) = plt.subplots(3, sharex=True, sharey=True)
for mod, ax in ((mods[0], ax1),(mods[1], ax2),(mods[2], ax3)):
predicted = cv.cross_val_predict(mod[0], X, y, cv=12)
ax.scatter(y, predicted, c='#F2BE2C')
ax.set_title('Prediction Error for %s' % mod[1])
ax.plot([y.min(), y.max()], [y.min(), y.max()], 'k--', lw=4, c='#2B94E9')
ax.set_ylabel('Predicted')
plt.xlabel('Measured')
plt.show()
models = np.array([(Ridge(),'Ridge'), (SVR(),'SVR'), (RANSACRegressor(),'RANSAC')])
error_compare_three(models, features, labels)

Visualizing error in regression models

从这里可以很清晰的看出预测值和期望值的关系。同时也发现线性回归模型效果好。

残差曲线

残差是数据集每个实例的实际标记值和预测值之间的差值。通过画出一系列实例的残差,可以帮助我们检测它们是否随机错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def resids_compare_three(mods,X,y):
f, (ax1, ax2, ax3) = plt.subplots(3, sharex=True, sharey=True)
plt.title('Plotting residuals using training (blue) and test (green) data')
for m, ax in ((mods[0], ax1),(mods[1], ax2),(mods[2], ax3)):
for feature in list(X):
splits = cv.train_test_split(X[[feature]], y, test_size=0.2)
X_tn, X_tt, y_tn, y_tt = splits
m[0].fit(X_tn, y_tn)
ax.scatter(m[0].predict(X_tn),m[0].predict(X_tn)-y_tn,c='#2B94E9',s=40,alpha=0.5)
ax.scatter(m[0].predict(X_tt), m[0].predict(X_tt)-y_tt,c='#94BA65',s=40)
ax.hlines(y=0, xmin=0, xmax=100)
ax.set_title(m[1])
ax.set_ylabel('Residuals')
plt.xlim([20,70]) # Adjust according to your dataset
plt.ylim([-50,50])
plt.show()
models = np.array([(Ridge(),'Ridge'), (LinearRegression(),'Linear Regression'), (SVR(),'SVR')])
resids_compare_three(models, features, labels)

Plotting residuals in regression models

Bias VS Variance

每种评估器都有是有利有弊。

首先 Error = Bias + Variance。Error反映的是整个模型的准确度,Bias反映的是模型在样本上的输出与真实值之间的误差,即模型本身的精准度,Variance反映的是模型每一次输出结果与模型输出期望之间的误差,即模型的稳定性。

机器学习可视化调参

在文章开篇,我们提出了两个问题:我们如何知道一个机器学习模型可以工作?我们如何让这个模型工作(运行)的更好?

接下来,我们将回答第二个问题。如果你有注意,我们用的模型都是使用Scikit-Learn 默认的参数。对于我们的大部分拟合模型来讲,评分已经相当好了。但有时并没有那么幸运,这时我们就得自己调参数。

# 可视化训练和验证模型

如何选择最好的模型参数呢?一种方法是,用单一参数的不同值去验证一个模型的评估分数。让我们拿SVC 分类器来试验,通过调不同的gama值来画出训练值和测试纸的曲线。

我们的关注点是训练值和测试值都高的点。如果两者都低,那是欠拟合(underfit);如果训练值高但是测试值低,那说明是过拟合(overfit)。

下面的代码画出来的曲线是拿信用卡数据集来做例子,这里用的 6折交叉验证。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def plot_val_curve(features, labels, model):
p_range = np.logspace(-5, 5, 5)
train_scores, test_scores = validation_curve(
model, features, labels, param_name='gamma', param_range=p_range,
cv=6, scoring='accuracy', n_jobs=1
)
train_scores_mean = np.mean(train_scores, axis=1)
train_scores_std = np.std(train_scores, axis=1)
test_scores_mean = np.mean(test_scores, axis=1)
test_scores_std = np.std(test_scores, axis=1)
plt.title('Validation Curve')
plt.xlabel('$\gamma$')
plt.ylabel('Score')
plt.semilogx(p_range, train_scores_mean, label='Training score', color='#E29539')
plt.semilogx(p_range, test_scores_mean, label='Cross-validation score', color='#94BA65')
plt.legend(loc='best')
plt.show()
X = scale(credit[['limit','sex','edu','married','age','apr_delay']])
y = credit['default']
plot_val_curve(X, y, SVC())

Validation curve

Grid Search

对于超参数调优,大部分人使用的grid search。Grid search是一种暴力调参方法,即遍历所有可能的参数值。

对于信用卡数据集使用 SVC模型,我们通过试验不同内核系数gama来提高预测准确性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from sklearn.grid_search import GridSearchCV
def blind_gridsearch(model, X, y):
C_range = np.logspace(-2, 10, 5)
gamma_range = np.logspace(-5, 5, 5)
param_grid = dict(gamma=gamma_range, C=C_range)
grid = GridSearchCV(SVC(), param_grid=param_grid)
grid.fit(X, y)
print(
'The best parameters are {} with a score of {:0.2f}.'.format(
grid.best_params_, grid.best_score_
)
)
features = credit[['limit','sex','edu','married','age','apr_delay']]
labels = credit['default']
blind_gridsearch(SVC(), features, labels)

但是,grid search需要我们理解哪些参数是合适的,参数的意义,参数是如何影响模型的以及参数的合理的搜索范围来初始化搜索。

这里,我们使用 visual_gridsearch 代替 blind_gridsearch 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def visual_gridsearch(model, X, y):
C_range = np.logspace(-2, 10, 5)
gamma_range = np.logspace(-5, 5, 5)
param_grid = dict(gamma=gamma_range, C=C_range)
grid = GridSearchCV(SVC(), param_grid=param_grid)
grid.fit(X, y)
scores = [x[1] for x in grid.grid_scores_]
scores = np.array(scores).reshape(len(C_range), len(gamma_range))
plt.figure(figsize=(8, 6))
plt.subplots_adjust(left=.2, right=0.95, bottom=0.15, top=0.95)
plt.imshow(scores, interpolation='nearest', cmap=ddlheatmap)
plt.xlabel('gamma')
plt.ylabel('C')
plt.colorbar()
plt.xticks(np.arange(len(gamma_range)), gamma_range, rotation=45)
plt.yticks(np.arange(len(C_range)), C_range)
plt.title(
"The best parameters are {} with a score of {:0.2f}.".format(
grid.best_params_, grid.best_score_)
)
plt.show()
visual_gridsearch(SVC(), features, labels)

Validation accuracy as a function of gamma and C

visual_gridsearch 的方法可以帮助我们理解不同的模型参数下的精确值。但是超参数调优的路程很长,好些人为此研究了几十年。

结论

这是可视化机器学习部分的最后一篇,可视化在机器学习的过程占用重要的角色。许多工具都提供这个功能,比如, Scikit-Learn ,Matplotlib , Pandas ,Bokeh 和 Seaborn。

希望我写的对部分人有用,如果是这样,请让我知道,谢谢。

Enjoy!


侠天,专注于大数据、机器学习和数学相关的内容,并有个人公众号:bigdata_ny分享相关技术文章。

若发现以上文章有任何不妥,请联系我。

image

1234
侠天

侠天

侠天,专注于大数据、机器学习和数学相关的内容,并有个人公众号:bigdata_ny分享相关技术文章。

37 日志
微博 InfoQ
© 2018 侠天
由 Hexo 强力驱动
主题 - NexT.Mist