【云+社区年度征文】什么?500行就可以撸个CI系统?

某个平平淡淡的日子,趁着划水时间在逛g站,千篇一律的xx学习路线,已经不足以满足哀家了(其实并不是高深的研究不懂)。

就在准备切换知乎看故事的时候,一不小心瞅到了让哀家都觉得装逼的东西,对,没错,就是号称500行可以实现一些系统,而且可以运行!什么!本杠精在此,一定要数一数有没有五百行。

啊!

当前,在找茬之前,为了让杠来得更猛烈些,哀家决定挑一个亲手试验一下,如果连运行都不能,那肯定要吐槽一番。

废话不多说,先来给大家展示下这个500行就能实现很多系统功能的地址:500lines

大致的描述

带着愉悦的(找茬的)心情,下载了那本500lines,哦豁,还是全英文的,正好可以练习英语了(练习如何快速翻译)。不得不说,这本书虽然篇幅不错,但是内容挺硬核,看上去干货满满的样子,如果也想找茬学习的小伙伴,可以下载下来看看,网上也有部分翻译的,但是不全,可以搭配欧路词典哦「微信.jpg」

这里面很多实现都是用不同语言完成的,哀家最近正好在学习python,就挑了个python的来瞅瞅:对,就是下面这个持续集成系统。

目录

找好目标之后,开始安装文章实现这个系统,主要目的有两个:

  • 能否成功运行
  • 是否有500行

确定好之后,跟着作者的描述一步一步来。首先作者都会讲下这个实现的是什么,主要是实现开发完成猴能够验证新功能或错误修复是ok的。这个500行代码做的系统主要是用来测试新代码提交的一个专用系统,对于提交后的代码,持续集成系统负责验证,系统会获取新的更改,运行测试并且提交运行结果,还具有抗故障能力,如果发生故障,能够从该节点恢复,还要求 这个测试系统能够处理负载。啧啧,功能要求还挺多啊,不知道能不能实现。

后面作者也进行了一系列的讲解和介绍,这里就不一一展示了,直接跳到实现过程:

步骤果然和代码里一样简洁明了,就算是小白如我,也能做,不错不错,直接跟着一步一步来就行 。

测试目录

(注意 :下面的是测试代码,并不是CI系统代码)

没啥好说的 跟着来就行了

项目的目录结构如下:

目录结构

正式CI系统

这里是正式的CI系统代码(此处记得数一数有没有超过500行[龇牙笑.jpg])

CI项目结构

repo_observer.py(71 lines)

如果发现有新的提交,就会通知调度程序

代码语言:python
代码运行次数:0
复制
Cloud Studio 代码运行
"""
This is the repo observer.

It checks for new commits to the master repo, and will notify the dispatcher of
the latest commit ID, so the dispatcher can dispatch the tests against this
commit ID.
"""
import argparse
import os
import re
import socket
import SocketServer
import subprocess
import sys
import time
import helpers

def poll():
parser = argparse.ArgumentParser()
parser.add_argument("--dispatcher-server",
help="dispatcher host:port, "
"by default it uses localhost:8888",
default="localhost:8888",
action="store")
parser.add_argument("repo",
metavar="REPO",
type=str,
help="path to the repository this will observe")
args = parser.parse_args()
dispatcher_host, dispatcher_port = args.dispatcher_server.split(":")
while True:
try:
# call the bash script that will update the repo and check
# for changes. If there's a change, it will drop a .commit_id file
# with the latest commit in the current working directory
subprocess.check_output(["./update_repo.sh", args.repo])
except subprocess.CalledProcessError as e:
raise Exception("Could not update and check repository. Reason: %s" % e.output)

    if os.path.isfile(".commit_id"):
        # great, we have a change! let's execute the tests
        # First, check the status of the dispatcher server to see
        # if we can send the tests
        try:
            response = helpers.communicate(dispatcher_host,
                                           int(dispatcher_port),
                                           "status")
        except socket.error as e:
            raise Exception("Could not communicate with dispatcher server: %s" % e)
        if response == "OK":
            # Dispatcher is present, let's send it a test
            commit = ""
            with open(".commit_id", "r") as f:
                commit = f.readline()
            response = helpers.communicate(dispatcher_host,
                                           int(dispatcher_port),
                                           "dispatch:%s" % commit)
            if response != "OK":
                raise Exception("Could not dispatch the test: %s" %
                                response)
            print("dispatched!")
        else:
            # Something wrong happened to the dispatcher
            raise Exception("Could not dispatch the test: %s" %
                            response)
    time.sleep(5)

if name == "main":
poll()

dispatcher.py 测试调度器(172 lines)

代码语言:python
代码运行次数:0
复制
Cloud Studio 代码运行
"""
This is the test dispatcher.

It will dispatch tests against any registered test runners when the repo
observer sends it a 'dispatch' message with the commit ID to be used. It
will store results when the test runners have completed running the tests and
send back the results in a 'results' messagee

It can register as many test runners as you like. To register a test runner,
be sure the dispatcher is started, then start the test runner.
"""
import argparse
import os
import re
import socket
import SocketServer
import time
import threading
import helpers

Shared dispatcher code

def dispatch_tests(server, commit_id):
# NOTE: usually we don't run this forever
while True:
print("trying to dispatch to runners")
for runner in server.runners:
response = helpers.communicate(runner["host"],
int(runner["port"]),
"runtest:%s" % commit_id)
if response == "OK":
print("adding id %s" % commit_id)
server.dispatched_commits[commit_id] = runner
if commit_id in server.pending_commits:
server.pending_commits.remove(commit_id)
return
time.sleep(2)

class ThreadingTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
runners = [] # Keeps track of test runner pool
dead = False # Indicate to other threads that we are no longer running
dispatched_commits = {} # Keeps track of commits we dispatched
pending_commits = [] # Keeps track of commits we have yet to dispatch

class DispatcherHandler(SocketServer.BaseRequestHandler):
"""
The RequestHandler class for our dispatcher.
This will dispatch test runners against the incoming commit
and handle their requests and test results
"""

command_re = re.compile(r"(\w+)(:.+)*")
BUF_SIZE = 1024

def handle(self):
    # self.request is the TCP socket connected to the client
    self.data = self.request.recv(self.BUF_SIZE).strip()
    command_groups = self.command_re.match(self.data)
    if not command_groups:
        self.request.sendall("Invalid command")
        return
    command = command_groups.group(1)
    if command == "status":
        print("in status")
        self.request.sendall("OK")
    elif command == "register":
        # Add this test runner to our pool
        print("register")
        address = command_groups.group(2)
        host, port = re.findall(r":(\w*)", address)
        runner = {"host": host, "port": port}
        self.server.runners.append(runner)
        self.request.sendall("OK")
    elif command == "dispatch":
        print("going to dispatch")
        commit_id = command_groups.group(2)[1:]
        if not self.server.runners:
            self.request.sendall("No runners are registered")
        else:
            # The coordinator can trust us to dispatch the test
            self.request.sendall("OK")
            dispatch_tests(self.server, commit_id)
    elif command == "results":
        print("got test results")
        results = command_groups.group(2)[1:]
        results = results.split(":")
        commit_id = results[0]
        length_msg = int(results[1])
        # 3 is the number of ":" in the sent command
        remaining_buffer = self.BUF_SIZE - (len(command) + len(commit_id) + len(results[1]) + 3)
        if length_msg > remaining_buffer:
            self.data += self.request.recv(length_msg - remaining_buffer).strip()
        del self.server.dispatched_commits[commit_id]
        if not os.path.exists("test_results"):
            os.makedirs("test_results")
        with open("test_results/%s" % commit_id, "w") as f:
            data = self.data.split(":")[3:]
            data = "\n".join(data)
            f.write(data)
        self.request.sendall("OK")
    else:
        self.request.sendall("Invalid command")

def serve():
parser = argparse.ArgumentParser()
parser.add_argument("--host",
help="dispatcher's host, by default it uses localhost",
default="localhost",
action="store")
parser.add_argument("--port",
help="dispatcher's port, by default it uses 8888",
default=8888,
action="store")
args = parser.parse_args()

# Create the server
server = ThreadingTCPServer((args.host, int(args.port)), DispatcherHandler)
print('serving on %s:%s' % (args.host, int(args.port)))

# Create a thread to check the runner pool
def runner_checker(server):
    def manage_commit_lists(runner):
        for commit, assigned_runner in server.dispatched_commits.iteritems():
            if assigned_runner == runner:
                del server.dispatched_commits[commit]
                server.pending_commits.append(commit)
                break
        server.runners.remove(runner)

    while not server.dead:
        time.sleep(1)
        for runner in server.runners:
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            try:
                response = helpers.communicate(runner["host"],
                                               int(runner["port"]),
                                               "ping")
                if response != "pong":
                    print("removing runner %s" % runner)
                    manage_commit_lists(runner)
            except socket.error as e:
                manage_commit_lists(runner)

# this will kick off tests that failed
def redistribute(server):
    while not server.dead:
        for commit in server.pending_commits:
            print("running redistribute")
            print(server.pending_commits)
            dispatch_tests(server, commit)
            time.sleep(5)

runner_heartbeat = threading.Thread(target=runner_checker, args=(server,))
redistributor = threading.Thread(target=redistribute, args=(server,))
try:
    runner_heartbeat.start()
    redistributor.start()
    # Activate the server; this will keep running until you
    # interrupt the program with Ctrl+C or Cmd+C
    server.serve_forever()
except (KeyboardInterrupt, Exception):
    # if any exception occurs, kill the thread
    server.dead = True
    runner_heartbeat.join()
    redistributor.join()

if name == "main":
serve()

test_runner.py (173 lines)

代码语言:javascript
复制
"""
This is the test runner.

It registers itself with the dispatcher when it first starts up, and then waits
for notification from the dispatcher. When the dispatcher sends it a 'runtest'
command with a commit id, it updates its repository clone and checks out the
given commit. It will then run tests against this version and will send back the
results to the dispatcher. It will then wait for further instruction from the
dispatcher.
"""
import argparse
import errno
import os
import re
import socket
import SocketServer
import subprocess
import time
import threading
import unittest

import helpers

class ThreadingTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
dispatcher_server = None # Holds the dispatcher server host/port information
last_communication = None # Keeps track of last communication from dispatcher
busy = False # Status flag
dead = False # Status flag

class TestHandler(SocketServer.BaseRequestHandler):
"""
The RequestHandler class for our server.
"""

command_re = re.compile(r"(\w+)(:.+)*")

def handle(self):
    # self.request is the TCP socket connected to the client
    self.data = self.request.recv(1024).strip()
    command_groups = self.command_re.match(self.data)
    command = command_groups.group(1)
    if not command:
        self.request.sendall("Invalid command")
        return
    if command == "ping":
        print("pinged")
        self.server.last_communication = time.time()
        self.request.sendall("pong")
    elif command == "runtest":
        print("got runtest command: am I busy? %s" % self.server.busy)
        if self.server.busy:
            self.request.sendall("BUSY")
        else:
            self.request.sendall("OK")
            print("running")
            commit_id = command_groups.group(2)[1:]
            self.server.busy = True
            self.run_tests(commit_id,
                           self.server.repo_folder)
            self.server.busy = False
    else:
        self.request.sendall("Invalid command")

def run_tests(self, commit_id, repo_folder):
    # update repo
    output = subprocess.check_output(["./test_runner_script.sh",
                                      repo_folder, commit_id])
    print(output)
    # run the tests
    test_folder = os.path.join(repo_folder, "tests")
    suite = unittest.TestLoader().discover(test_folder)
    result_file = open("results", "w")
    unittest.TextTestRunner(result_file).run(suite)
    result_file.close()
    result_file = open("results", "r")
    # give the dispatcher the results
    output = result_file.read()
    helpers.communicate(self.server.dispatcher_server["host"],
                        int(self.server.dispatcher_server["port"]),
                        "results:%s:%s:%s" % (commit_id, len(output), output))

def serve():
range_start = 8900
parser = argparse.ArgumentParser()
parser.add_argument("--host",
help="runner's host, by default it uses localhost",
default="localhost",
action="store")
parser.add_argument("--port",
help="runner's port, by default it uses values >=%s" % range_start,
action="store")
parser.add_argument("--dispatcher-server",
help="dispatcher host:port, by default it uses "
"localhost:8888",
default="localhost:8888",
action="store")
parser.add_argument("repo", metavar="REPO", type=str,
help="path to the repository this will observe")
args = parser.parse_args()

runner_host = args.host
runner_port = None
tries = 0
if not args.port:
    runner_port = range_start
    while tries < 100:
        try:
            server = ThreadingTCPServer((runner_host, runner_port),
                                        TestHandler)
            print(server)
            print(runner_port)
            break
        except socket.error as e:
            if e.errno == errno.EADDRINUSE:
                tries += 1
                runner_port = runner_port + tries
                continue
            else:
                raise e
    else:
        raise Exception("Could not bind to ports in range %s-%s" % (range_start, range_start + tries))
else:
    runner_port = int(args.port)
    server = ThreadingTCPServer((runner_host, runner_port), TestHandler)
server.repo_folder = args.repo

dispatcher_host, dispatcher_port = args.dispatcher_server.split(":")
server.dispatcher_server = {"host": dispatcher_host, "port": dispatcher_port}
response = helpers.communicate(server.dispatcher_server["host"],
                               int(server.dispatcher_server["port"]),
                               "register:%s:%s" %
                               (runner_host, runner_port))
if response != "OK":
    raise Exception("Can't register with dispatcher!")

def dispatcher_checker(server):
    # Checks if the dispatcher went down. If it is down, we will shut down
    # if since the dispatcher may not have the same host/port
    # when it comes back up.
    while not server.dead:
        time.sleep(5)
        if (time.time() - server.last_communication) > 10:
            try:
                response = helpers.communicate(
                    server.dispatcher_server["host"],
                    int(server.dispatcher_server["port"]),
                    "status")
                if response != "OK":
                    print("Dispatcher is no longer functional")
                    server.shutdown()
                    return
            except socket.error as e:
                print("Can't communicate with dispatcher: %s" % e)
                server.shutdown()
                return

t = threading.Thread(target=dispatcher_checker, args=(server,))
try:
    t.start()
    # Activate the server; this will keep running until you
    # interrupt the program with Ctrl-C
    server.serve_forever()
except (KeyboardInterrupt, Exception):
    # if any exception occurs, kill the thread
    server.dead = True
    t.join()

if name == "main":
serve()

以上三个文件是主要文件,还有几个shell没展示出来,可以 先看下图关系流程,捋一捋:

作者画的很清晰,不需要哀家画蛇添足了

shell文件就不一一展示了,想要实现的,可以去git看看,里面有具体的代码实现。但是,shell的行数可以统计下:

行数
果然没过500

运行效果如下图:

代码语言:javascript
复制
.F

FAIL
test_fail (test_fail.TestFileFail)

Traceback (most recent call last)

File "/usr/local/var/www/zoe_projects/test_repo/test_repo_clone_runner/tests/test_fail.py", line 6, in test_fail
self.fail("I will fail")
AssertionError
I will fail


Ran 2 tests in 0.000s

FAILED (failures=1)

鉴定完毕,真香现场,短小精悍,功能可以。

杠精本精退场,不过还是很推荐这个500lines,里面有很多不错的项目,设计思路适合新手和对这方面感兴趣的学习。