本文由 发布,转载请注明出处,如有问题请联系我们! 发布时间: 2021-05-21[源码解析] 并行分布式框架 Celery 之 Lamport 逻辑时钟 & Mingle

加载中

[源代码分析] 并行处理分布式框架 Celery 之 Lamport 逻辑性数字时钟 & Mingle

Celery是一个简易、灵便且靠谱的,解决很多信息的分布式架构,致力于并行处理的多线程每日任务序列,另外也适用线程同步。文中详细介绍 Celery 的Lamport 逻辑性数字时钟 & Mingle。

[源代码分析] 并行处理分布式框架 Celery 之 Lamport 逻辑性数字时钟 & Mingle

文件目录
  • [源代码分析] 并行处理分布式框架 Celery 之 Lamport 逻辑性数字时钟 & Mingle
    • 0x00 引言
    • 0x01 逻辑性数字时钟
      • 1.1 来由
      • 1.2 什么是逻辑数字时钟
      • 1.3 为何必须逻辑性数字时钟
      • 1.4 Lamport 逻辑性数字时钟
    • c002 Lamport 数字时钟 in Kombu
    • c003 应用 clock
      • 3.1 Kombu mailbox
      • 3.2 Celery 运用
      • 3.3 EventDispatcher
    • c004 Mingle
      • 4.1 界定
      • 4.2 Sync 全过程
        • 4.2.1 进行同歩
          • 4.2.1.1 revoked task
          • 4.2.1.2 inspect.hello
        • 4.2.2 别的worker 回应
        • 4.2.3 接到后同歩
        • 4.2.4 怎么使用 revoked
    • c0EE 私人信息
    • c0FF 参照

0x00 引言

Celery是一个简易、灵便且靠谱的,解决很多信息的分布式架构,致力于并行处理的多线程每日任务序列,另外也适用线程同步。文中详细介绍 Celery 的Lamport 逻辑性数字时钟 & Mingle。

文中为 Celery 最终一篇。下面有2~3篇单独文章内容,随后会开一个新系列产品,敬请关注。

所有联接以下:

[源代码剖析] 消息队列 Kombu 之 mailbox

[源代码剖析] 消息队列 Kombu 之 Hub

[源代码剖析] 消息队列 Kombu 之 Consumer

[源代码剖析] 消息队列 Kombu 之 Producer

[源代码剖析] 消息队列 Kombu 之 运行全过程

[源代码分析] 消息队列 Kombu 之 基本上构架

[源代码分析] 并行处理分布式框架 Celery 之构架 (1)

[源代码分析] 并行处理分布式框架 Celery 之构架 (2)

[源代码分析] 并行处理分布式框架 Celery 之 worker 运行 (1)

[源代码分析] 并行处理分布式框架 Celery 之 worker 运行 (2)

[源代码分析] 分布式系统每日任务序列 Celery 之运行 Consumer

[源代码分析] 并行处理分布式系统每日任务序列 Celery 之 Task是什么

[从源代码学设计]celery 之 推送Task & AMQP

[源代码分析] 并行处理分布式系统每日任务序列 Celery 之 消費动态性步骤

[源代码分析] 并行处理分布式系统每日任务序列 Celery 之 多进程实体模型

[源代码剖析] 分布式系统每日任务序列 Celery 线程同步实体模型 之 子过程

[源代码剖析]并行处理分布式系统每日任务序列 Celery 之 子过程解决信息

[源代码剖析] 并行处理分布式系统每日任务序列 Celery 之 Timer & Heartbeat

[源代码分析] 并行处理分布式系统每日任务序列 Celery 之 EventDispatcher & Event 部件

[源代码分析] 并行处理分布式系统每日任务序列 Celery 之 web服务

[源代码分析] 并行处理分布式框架 Celery 之 容错纠错机制

[源代码分析] 并行处理分布式框架 Celery 之 Lamport 逻辑性数字时钟 & Mingle

0x01 逻辑性数字时钟

1.1 来由

分布式架构解决了传统式单个构架的点射难题和特性容积难题,另一方面也产生了许多的难题,在其中一个难题便是多节点的数据同步难题:不一样设备上的物理学数字时钟无法同歩,造成没法区别在分布式架构中好几个连接点的事情时钟频率。

1978年 Lamport 明确提出了逻辑性数字时钟的定义,来处理分布式架构中区别事情产生的时钟频率难题。

1.2 什么是逻辑数字时钟

逻辑性数字时钟是为了更好地区别实际中的物理学数字时钟明确提出来的定义,一般状况下大家提及的時间全是指物理学時间,但事实上许多运用中,只需全部设备有同样的時间就可以了,这一時间不一定要跟具体時间同样。

更进一步,假如2个连接点中间不开展互动,那麼他们的時间乃至都不用同歩。因而难题的关键环节取决于连接点间的互动要在事情的产生次序上达成一致,而不是针对時间达成一致。

综上所述,逻辑性数字时钟指的是分布式架构中用以区别事情的产生次序的時间体制。

1.3 为何必须逻辑性数字时钟

时间在现实生活中是很重要的定义,拥有時间大家就能较为事儿产生的顺序。如果是单独电子计算机内实行的事务管理,因为他们共享资源一个记时器,因此可以非常容易根据时间格式来区别依次。同样在分布式架构中也根据时间格式的方法来区别依次可不可以?

回答是NO,由于在分布式架构中的不一样连接点间维持他们的数字时钟一致是一件不易的事儿。由于每一个连接点的CPU都是有自身的记时器,而不一样记时器中间会造成時间偏位,最后造成不一样连接点上边的時间不一致。

那麼是不是能够根据某类方法来同歩不一样连接点的物理学数字时钟呢?回答是有的,NTP便是常见的数据同步优化算法,可是即便 根据优化算法开展同歩,总是会有差值,这类差值在一些情景下(金融业分布式事务)是不可以接纳的。

因而,Lamport明确提出逻辑性数字时钟便是为了更好地处理分布式架构中的时钟频率难题,即怎样界定a在b以前产生。

当且仅当事情A是由事情B造成的情况下,事情A和B中间才存有一个依次关联。2个事情能够创建逻辑关系的前提条件是:2个事情中间可以用相当于或低于光的速度的速率信息传递。 特别注意的是这儿的逻辑关系指的是时钟频率关联,即時间的前后左右,并并不是逻辑性上的缘故和結果。

在分布式架构中,互联网不是靠谱的,因此大家除掉能够速率的管束,获得2个事情能够创建因果关系(时钟频率)关联的前提条件是:2个事情中间是不是产生过信息的传递。在分布式架构中,进程间通信的方式(共享内存、信息推送等)都归属于信息的传递。

1.4 Lamport 逻辑性数字时钟

分布式架构中按是不是存有连接点互动可分成三类事情,一类产生于连接点內部,二是推送事情,三是接受事情。

逻辑性数字时钟界定

  • Clock Condition:针对随意事情a, b:如果a -> b(->表明a在于b产生),那麼C(a) < C(b),相反要不然,由于有可能是高并发事情。
  • 如果a和b全是过程Pi里的事情,而且a在b以前,那麼Ci(a) < Ci(b) 。
  • 如果a是过程Pi里有关某信息的推送事情,b是另一过程Pj里有关该信息的接受事情,那麼Ci(a) < Cj(b)

Lamport 逻辑性数字时钟基本原理以下:

  • 每一个事情相匹配一个Lamport时间格式,初值为0
  • 假如事情在连接点内产生,时间格式加1
  • 假如事情归属于推送事情,时间格式加1并在信息中携带该时间格式
  • 假如事情归属于接收事情,时间格式 = Max(当地时间格式,信息中的时间格式) 1

假定有事情a、b,C(a)、C(b)各自表明事情a、b相匹配的Lamport时间格式,如果a产生在b以前(happened before),记作 a -> b,则有C(a) < C(b),比如图上有 C1 -> B1,那麼 C(C1) < C(B1)。根据该界定,事情集中化Lamport时间格式不一的事情可开展较为,大家得到事情的偏序关系(partial order)。留意:假如C(a) < C(b),并不可以表明a -> b,换句话说C(a) < C(b)是a -> b的必要不充分标准

假如C(a) = C(b),那a、b事情的次序也是如何的?特别注意的是当C(a) = C(b)的情况下,他们毫无疑问并不是逻辑关系,因此他们中间的依次实际上并不会危害結果,大家这儿只必须得出一种明确的方法来界定他们中间的依次就能获得全序关联。留意:Lamport逻辑性数字时钟只确保逻辑关系(偏序)的准确性,不确保肯定时钟频率的准确性。

c002 Lamport 数字时钟 in Kombu

在 Kombu 中,就会有 Lamport 数字时钟 的完成。

实际界定以下,我们可以了解:

  • 当推送信息情况下,应用 forward API 来提升数字时钟;
  • 当接到信息情况下,应用 adjust 来调节当地数字时钟;
class LamportClock:
    """Lamport's logical clock.

    A Lamport logical clock is a monotonically incrementing software counter
    maintained in each process.  It follows some simple rules:

        * A process increments its counter before each event in that process;
        * When a process sends a message, it includes its counter value with
          the message;
        * On receiving a message, the receiver process sets its counter to be
          greater than the maximum of its own value and the received value
          before it considers the message received.

    Conceptually, this logical clock can be thought of as a clock that only
    has meaning in relation to messages moving between processes.  When a
    process receives a message, it resynchronizes its logical clock with
    the sender.

    *Usage*

    When sending a message use :meth:`forward` to increment the clock,
    when receiving a message use :meth:`adjust` to sync with
    the time stamp of the incoming message.

    """

    #: The clocks current value.
    value = 0

    def __init__(self, initial_value=0, Lock=Lock):
        self.value = initial_value
        self.mutex = Lock()

    def adjust(self, other):
        with self.mutex:
            value = self.value = max(self.value, other)   1
            return value

    def forward(self):
        with self.mutex:
            self.value  = 1
            return self.value

    def sort_heap(self, h):
        if h[0][0] == h[1][0]:
            same = []
            for PN in zip(h, islice(h, 1, None)):
                if PN[0][0] != PN[1][0]:
                    break  # Prev and Next's clocks differ
                same.append(PN[0])
            # return first item sorted by process id
            return sorted(same, key=lambda event: event[1])[0]
        # clock values unique, return first item
        return h[0]

    def __str__(self):
        return str(self.value)

    def __repr__(self):
        return f'<LamportClock: {self.value}>'

c003 应用 clock

3.1 Kombu mailbox

例如在 Kombu mailbox 当中,推送情况下就必须带上当地的clock。

producer.publish(
                    reply, exchange=exchange, routing_key=routing_key,
                    declare=[exchange], headers={
                        'ticket': ticket, 'clock': self.clock.forward(),
                    }, retry=True,
                    **opts
                )

在接到信息时,就相对应调节当地数字时钟

def _collect(self, ticket,
                 limit=None, timeout=1, callback=None,
                 channel=None, accept=None):

        adjust_clock = self.clock.adjust

        def on_message(body, message):
            header = message.headers.get
            adjust_clock(header('clock') or 0)

3.2 Celery 运用

Celery 运用自身就有一个 LamportClock 自变量。

class Celery:
        self.clock = LamportClock()

3.3 EventDispatcher

在 EventDispatcher 推送 Event 情况下,便会应用 LamportClock 的数字时钟。

def publish(self, type, fields, producer,
                blind=False, Event=Event, **kwargs):
        clock = None if blind else self.clock.forward()
        event = Event(type, hostname=self.hostname, utcoffset=utcoffset(),
                      pid=self.pid, clock=clock, **fields)
        with self.mutex:
            return self._publish(event, producer,
                                 routing_key=type.replace('-', '.'), **kwargs)

c004 Mingle

在 Celery 的详细介绍中,Mingle 关键用在运行或是重新启动的情况下,它会和别的的 worker 互动,进而开展同歩。同歩的数据信息有:

  • 别的 worker 的 clock
  • 别的 worker 早已解决掉的 tasks

同歩 clock 比较好了解,可是为何要同歩 别的worker早已解决完的 task 呢?由于这一情景是运行或是重新启动。

如果我们在 Celery 当中设定一个连接点为task_acks_late=True以后,那麼这一连接点上已经实行的每日任务若是碰到关闭电源,运作中被完毕等状况,这种每日任务会被再次派发到别的连接点开展再试

因此当某一连接点重新启动期内,很有可能原本由本 worker 承担的 task 会早已被别的 worker 解决掉,为了更好地防止反复解决,就必须同歩一下。

4.1 界定

Mingle 界定以下:

class Mingle(bootsteps.StartStopStep):
    """Bootstep syncing state with neighbor workers.

    At startup, or upon consumer restart, this will:

    - Sync logical clocks.
    - Sync revoked tasks.

    """

    label = 'Mingle'
    requires = (Events,)
    compatible_transports = {'amqp', 'redis'}

    def start(self, c):
        self.sync(c)

4.2 Sync 全过程

运行即同歩,编码逻辑性以下:

  • Mingle 向 每一个 Worker 推送 hello
  • 每一个 Worker 都向 Mingle 回应自身的信息内容(clock 和 tasks)
  • Mingle 升级自身的信息内容

这必须留意的是:沒有调用函数,立即 send_hello 就回到了别的 worker 的結果,它是用多线程来仿真模拟的一个同歩全过程

而 在 send_hello回到情况下,由于此刻收到了全部 worker 的回应,也包含自身,因此必须把自己host相匹配的回应删掉。

相匹配编码以下:

    def sync(self, c):
        replies = self.send_hello(c)
        if replies:
            [self.on_node_reply(c, nodename, reply)
             for nodename, reply in replies.items() if reply]
        else:
            info('mingle: all aone')

4.2.1 进行同歩

最先,Mingle 会向 每一个 Worker 推送 hello。

    def send_hello(self, c):
        inspect = c.app.control.inspect(timeout=1.0, connection=c.connection)
        our_revoked = c.controller.state.revoked
        replies = inspect.hello(c.hostname, our_revoked._data) or {}
        replies.pop(c.hostname, None)  # delete my own response
        return replies

这时有关自变量以下:

c.controller.state = {module} <module 'celery.worker.state' >
c.controller.state.revoked = {LimitedSet: 0} <LimitedSet(0): maxlen=50000, expires=10800, minlen=0>
c.controller = {Worker} celery@DESKTOP-0GO3RPO
c = {Consumer}  
4.2.1.1 revoked task

我们可以见到,Mingle 会从 c.controller.state.revoked当中获得 內容,即 当今 worker 纪录的已被进行的 tasks。随后发给别的 worker。

4.2.1.2 inspect.hello

这儿是应用了 celery.app.control.Control 的 inspect 作用开展广播节目推送。

    def _request(self, command, **kwargs):
        return self._prepare(self.app.control.broadcast(
            command,
            arguments=kwargs,
            destination=self.destination,
            callback=self.callback,
            connection=self.connection,
            limit=self.limit,
            timeout=self.timeout, reply=True,
            pattern=self.pattern, matcher=self.matcher,
        ))

4.2.2 别的worker 回应

celery.app.control.Control 当中,会应用 _prepare 来解决别的 worker 的回到。

    def _prepare(self, reply):
        if reply:
            by_node = flatten_reply(reply)
            if (self.destination and
                    not isinstance(self.destination, (list, tuple))):
                return by_node.get(self.destination)
            if self.pattern:
                pattern = self.pattern
                matcher = self.matcher         return {node: reply for node, reply in by_node.items()
                        if match(node, pattern, matcher)}
            return by_node

4.2.3 接到后同歩

在接到别的worker回应以后会开展同歩,我们可以见到其同歩了数字时钟 和 tasks。

实际 task 的升级,是由 state 进行的。

    def sync_with_node(self, c, clock=None, revoked=None, **kwargs):
        self.on_clock_event(c, clock)
        self.on_revoked_received(c, revoked)

    def on_clock_event(self, c, clock):
        c.app.clock.adjust(clock) if clock else c.app.clock.forward()

    def on_revoked_received(self, c, revoked):
        if revoked:
            c.controller.state.revoked.update(revoked)

4.2.4 怎么使用 revoked

当任务发布情况下,假如发觉该每日任务早已被设定为 revoked,则不容易公布该每日任务。

def default(task, app, consumer,
            info=logger.info, error=logger.error, task_reserved=task_reserved,
            to_system_tz=timezone.to_system, bytes=bytes,
            proto1_to_proto2=proto1_to_proto2):
    """Default task execution strategy.

    Note:
        Strategies are here as an optimization, so sadly
        it's not very easy to override.
    """
    .....

    revoked_tasks = consumer.controller.state.revoked

    def task_message_handler(message, body, ack, reject, callbacks,
                             to_timestamp=to_timestamp):
        ......
        if (req.expires or req.id in revoked_tasks) and req.revoked():
            return
				...... 										
        if callbacks:
            [callback(req) for callback in callbacks]
        handle(req)
    return task_message_handler

c0EE 私人信息

★★★★★★生活中和技术性的思索★★★★★★

微信平台账户:罗西的思索

假如您想立即获得本人编写文章内容的消息提醒,或是想看看本人强烈推荐的技术文档,敬请期待。

在这里插入图片描述

c0FF 参照

分布式架构:Lamport 逻辑性数字时钟

5: 远程操作管理方法

6: Events 的完成

7: Worker 中间的互动

8: State 和 Result

评论(0条)

刀客源码 游客评论