Git Product home page Git Product logo

article's People

Contributors

tuteng avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

article's Issues

深入理解之 Apache Pulsar Connector 与 Function 关系篇

背景知识

  • Connector:Apache Pulsar 的连接器,包括 Source 和 Sink 两个组件。
  • Functions:Apache Pulsar 的轻量级计算组件。

Instance 架构

使用 pulsar-admin source、pulsar-admin sink 或 pulsar-admin function 命令操作 Source、Sink 或 Function 时,在 Worker 上会启动一个 Instance,Instance 架构如下图所示。

image

  • Worker Service:在此处运行 Instance。
  • Source:将外部系统的数据输入至 Pulsar。在命令行中,可以使用 pulsar-admin 操作 Source。
  • Sink:将 Pulsar 的数据输出至外部系统。 在命令行中,可以使用 pulsar-admin 操作 Sink。
  • Function:执行轻量级计算。在命令行中,可以使用 pulsar-admin 操作。 Function。
  • IdentityFunction、PulsarSource 和 PulsarSink:Instance 的组件。
  • Consumer:Pulsar 的消费者。
  • Producer:Pulsar 的生产者。
  • queue: 进行数据传递的数据结构。
  • 红色箭头:从外部系统流入至 Pulsar 的数据和从 Pulsar 流出至外部系统的数据。
  • 黄色箭头:流入至 Pulsar Topic 的数据和从 Pulsar Topic 流出的数据。

前文提到,使用 pulsar-admin 启动一个 Source、Sink 或 Function 时,实际是启动了一个 Instance,该 Instance 跑在 Worker(图中标注 Worker Service 的深蓝色方块)上。

上图有两个 Worker Service:

  • Worker Service1 上有两个 Instance,分别是 Source 和 Function。
  • Worker Service2 上有一个 Instance,即 Sink。

另外,图中的 PulsarSource、IdentityFunction 和 PulsarSink 组件都是 Instance 的一部分。当外部系统向 Pulsar 输入数据时,会使用 pulsar-admin source 启动 Source 的 Instance,该 Instance 会初始化三个组件,代码如下:

// sink 组件,负责数据输出
 setupOutput(contextImpl);
// source 组件,负责数据输入
setupInput(contextImpl);
// function 组件,负责简单计算和过滤等
return new JavaInstance(contextImpl, object);

首先初始化数据的出口,如果数据目的地有问题,就不会继续进行。

每个组件内部有以下逻辑判断:

if (sinkSpec.getClassName().isEmpty()) {
  // 未指定 className,则使用系统默认的组件初始化,即前文提到的 PulsarSink
} else {
  // 如果可以找到 className,则使用它进行初始化
}
// If source classname is not set, we default pulsar source
if (sourceSpec.getClassName().isEmpty()) {
  // 未指定 className,则使用系统默认的组件初始化,即前文提到的 PulsarSource
} else {
  // 如果可以找到 className,则使用它进行初始化
}
// create the functions
if (userClassObject instanceof Function) {
  // 使用默认的的 IdentityFunction 进行初始化
    this.function = (Function) userClassObject;
} else {
    // 使用用户定义的 Function
    this.javaUtilFunction = (java.util.function.Function) userClassObject;
}

Source 命令启动后,Instance 发现提供了 className,则会使用 className 替换系统默认的 Source 组件接收外部系统的数据。收到数据后,会将这些数据放在 queue 中。

无论是用户自定义的 source,还是系统默认的 PulsarSource,收到数据之后都会执行以下代码:

consume(record);
public void consume(Record<T> record) {
      try {
          queue.put(record);
      } catch (InterruptedException e) {
          throw new RuntimeException(e);
      }
  }

Pulsar 会把数据放至队列中,等待 Function 处理。这里已指定了 Source ,因此使用系统默认的 IdentityFunction。PulsarSource 和用户自定义 source 有以下区别:

  • 用户自定义 Source:用于与外部系统集成(图中的红色箭头)。例如,数据库和日志等。
  • 系统默认的 PulsarSource:会启动 Consumer ,用于消费来自 Pulsar topic 的数据(图中指向 Consumer 的黄色箭头)。

数据流入至 Function 时,会执行以下逻辑:

// process the message
result = javaInstance.handleMessage(currentRecord, currentRecord.getValue());
public JavaExecutionResult handleMessage(Record<?> record, Object input) {
  ...
  if (function != null) {
    // 用户自定义 function 的处理逻辑
      output = function.process(input, context);
  } else {
    // 系统默认 IdentityFunction 的处理逻辑
      output = javaUtilFunction.apply(input);
  }
  ...
}

数据在 Function 处理完成之后,会进入到下一个组件,即 Sink,Sink 同样会执行类似的判断。如果是用户自定义 Sink,则会调用用户的类执行初始化;如果没有用户自定义 Sink,则使用系统默认的 PulsarSink 执行初始化,从而完成数据的输出,这里使用了 PulsarSink。

PulsarSink 与用户自定义 Sink 有以下区别:

  • 用户自定义 Sink:将数据输出至外部系统(图中第二个红色箭头)。
  • 系统默认的 PulsarSink:会初始化 Pulsar 的 Producer,将数据输出至 Pulsar Topic(图中指向外部的黄色箭头)。

以上流程是在命令行中执行 pulsar-admin source 时进行的逻辑,当执行 Function 时,会将 IdentityFunction 替换为用户的 Function 对象,但是 Source 和 Sink 会使用系统默认的 PulsarSource 和 PulsarSink 进行初始化(图中标注 Function 的蓝色方块)。pulsar-admin sink 同样如此,此处不再赘述。

总结

本文分享了 Source、Sink 和 Function 之间的关系以及数据流通的过程。 Instance 实际包含了三个组件:Source、Sink 和 Function,这三个组件是不同的,因此,“Source 和 Sink 是一种特殊的 Function” 的说法是不准确的。同时,本文还介绍了每个 Instance 对 pub/sub 模型的封装。Source 和 Sink 作为 Instance 中单独的组件,在构建与外部系统的生态时有重要意义。

如何编写可测试的golang代码

每次在开发之前,我都会考虑写好单元测试,但是随着开发的进行,就会发现事情没有这么简单,因为更多时候项目中间夹杂着很多的数据库操作,网络操作,文件操作等等,每次涉及到有这些操作的单元测试,都要花费很大的代价去初始化各种环境,拖到最后单元测试只能不了了之,因此这里的一个重点是写出来的代码本身不可测试,因此在这篇文章中,重点是如何写出可测试的代码,如何把一些无关的操作屏蔽掉,文章是我几个月之前翻译的,最近在项目中进行了实践,感觉不错,因此放到这里,希望能有更多的人看到。原文地址

在golang中通过接口和组合来实现高效的单元测试

go单元测试提供了:

  • Increased enforcement of behavior expectations beyond the compiler (providing critical assurance around rapidly changing code paths)
  • 快速的执行速度(许多流行的模块测试能在秒级完成)
  • 易于集成到CI环境(go test为内建)
  • 通过-race标志进行竞态检测

因此,单元测试是确保代码质量和防止回归的最佳方式之一。不幸的是,单元测试经常是很多go项目中最容易被忽视的方面之一。

这种情况有一部分是由于缺乏高质量的资源来解释如何正确构建一个可以被测试的go程序导致的。这份文档尝试提供这两方面的努力,提高go社区中可用程序的总体质量。

不要因为程序运行了就让你陷入错误的安全感:你不久就会庆幸你开始了测试。

预览

在文章中我会介绍下面的内容

  • 确保可测试的概念
  • 4个具体的例子来学习如何在go中进行有效的测试

最后你应该使用你学到的东西应用到实践中

概念
如果你从一开始就没有正确的构建和测试你的程序,那么测试这条路将会非常困难。这在编程界是一个相当普遍的格言在go测试中尤其正确

为了有效的测试go程序,有三个重要的概念:

  • 在你的go代码中使用接口
  • 通过组合构建更高层次的接口
  • 熟悉go test和testing模块

下面来详细解释一下

使用接口

你通过阅读go官方文档已经在工作中熟悉了go接口的使用。你可能不明白为什么接口如此重要,以及为什么你应该尽可能快的开始使用你自己的接口

对于那些不熟悉接口的,我建议去读一下go的官方文档 来理解接口是怎么工作的

长话短说

  • 接口是一组被定义的被考虑实现的方法类型的集合
  • 当任何给出的类型实现了该接口的所有方法时,go编译器就认为它实现了该接口

这在go的标准库中被用的很频繁。例如,在database/sql中使用相同的接口来编写与不同数据库进行交互的功能

新手go程序员可能已经能熟练的在其他编程语言中写单元测试像java,python或者php,通过使用stubs或者mocks技术来伪造方法调用的结果并且使用一种细粒度的方式来探索各种代码的路径。然而许多人并没有意识到,接口已经把上面的都实现了。

由于被嵌入到语言中,并被标准库所支持,接口为测试者提供了大量的功能和灵活性。可以在接口中封装给定测试之外的操作,并选择性的将其重新实现以用于相关测试。这允许作者控制测试中行为的每个方面。

使用组合
接口对增加灵活性和控制非常重要,但还不够。例如,考虑一下我们有一个struct将大量方法公开给外部消费者的情况,但是在其他的某些操作中也依赖于这些方法。我们不能将所有的对象封装在一个接口中,我们只需要实现我们需要测试的方法就够了。

因此,这变得至关重要,通过使用较小的接口来组成更大的接口,以便能够控制我们想要改变哪些方法和不想改变哪些方法去适应测试。在一个实际的例子中这样看起来更容易一点,因此我会避免更抽象的讨论直到文章的结束。

go test和testing模块

很明显,你至少应该浏览一下go test和testing模块的文档,熟悉一下每块能够让你更有效的进行单元测试。如果你不熟悉这些工具和库,用起来就会有些生疏(但是一旦熟悉了就好了)

那是很有吸引力的,第三方工具可以帮助测试,但是我强烈建议你避免这样做,直到你掌握了基础知识,并且确定依赖带给你的好处多于坏处。

首先你要有下面的一些基础知识

  • 对于任何给定的foo.go,测试被放置在相同的目录中并被命名为foo_test.go
  • go test . 运行当前目中的的单元测试。 go test ./... 将运行当前目录和该目录之下的测试, go test foo_test.go不工作因为被测试的文件不包括在内
  • -v标志对go test是有用的,它会打印出详细的输出(每个单独的测试结果)
  • Tests是个函数接受一个testing.T结构指针作为一个参数,并调用TestFoo,其中Foo是被测试的函数名称
  • 通常不会一直如你所期望的那样为真,相反,测试失败可以调用t.Fatal,如果你确定条件与你期望的不同
  • 在测试中打印输出可能不会如你所期望的那样工作。如果你在测试中需要打印信息可以使用t.Log或者t.Logf

例子

讨论的够多了,来写一些测试

下面的例子都可以在github上找到代码

例子1: Hello, Testing!
我假定你已经安装并且配置好了go开发环境

新建一个go的包在GOPATH下:

$ mkdir -p ~/go/src/github.com/nathanleclaire/testing-article
$ cd ~/go/src/github.com/nathanleclaire/testing-article

创建一个hello.go的文件

package main

import (
        "fmt"
)

func hello() string {
        return "Hello, Testing!"
}

func main() {
        fmt.Println(hello())
}

现在为hello.go写一个测试
新建一个hello_test.go的文件在相同的文件夹下

package main

import (
        "testing"
)

func TestHello(t *testing.T) {
        expectedStr := "Hello, Testing!"
        result := hello()
        if result != expectedStr {
                t.Fatalf("Expected %s, got %s", expectedStr, result)
        }
}

这个测试很简单。我们注入了一个*testing.T的实例到测试中,这被用来控制测试流和输出。我们把对函数调用的期望设置在一个变量中,然后在函数真正返回的时候检查它。

运行测试

$ go test -v
=== RUN TestHello
---PASS:TestHello(0.00s)
PASS
OK  github.com/nathanleclaire/testing-article 0.006s

例子2:用一个接口来模拟结果

作为程序的一部分,我们希望从GitHubAPI中获取一些数据。在这种情况下,假设我们想要查询一个给出的库的最新的tag

我们很容易写出下面的代码

package main

import (
    "encoding/json"
    "fmt"
    "io/ioutil"
    "log"
    "net/http"
)

type ReleasesInfo struct {
    Id      uint   `json:"id"`
    TagName string `json:"tag_name"`
}

// Function to actually query the GitHub API for the release information.
func getLatestReleaseTag(repo string) (string, error) {
    apiUrl := fmt.Sprintf("https://api.github.com/repos/%s/releases", repo)
    response, err := http.Get(apiUrl)
    if err != nil {
        return "", err
    }

    defer response.Body.Close()

    body, err := ioutil.ReadAll(response.Body)
    if err != nil {
        return "", err
    }

    releases := []ReleasesInfo{}

    if err := json.Unmarshal(body, &releases); err != nil {
        return "", err
    }

    tag := releases[0].TagName

    return tag, nil
}

// Function to get the message to display to the end user.
func getReleaseTagMessage(repo string) (string, error) {
    tag, err := getLatestReleaseTag(repo)
    if err != nil {
                return "", fmt.Errorf("Error querying GitHub API: %s", err)
    }

        return fmt.Sprintf("The latest release is %s", tag), nil
}

func main() {
        msg, err := getReleaseTagMessage("docker/machine")
        if err != nil {
                fmt.Fprintln(os.Stderr, msg)
        }

        fmt.Println(msg)
}

事实上,这是一个自然而然想到的go程序结构

但这是不可测试的。如果我们要getLatestReleaseTag 直接测试这个函数,那么如果GitHub API关闭了,或者GitHub决定限制我们(如果在CI环境中频繁地运行测试,那很可能会影响我们)。另外,每当最新版本标签更改时,我们都必须更新测试。

该怎么办?我们可以重新定义这个实现的方式,使其更具可测性。如果我们查询Github API使用interface来代替直接调用函数 ,那么我们实际上可以控制通过测试返回的结果。

我们重新定义这个程序有一点就是让他有个接口,ReleaseInfoer其中一个实现可以是GithubReleaseInfoer。ReleaseInfoer只有一个方法,GetLatestReleaseTag它在性质上与我们上面的函数类似(它接受一个存储库名称作为参数并返回一个 string和/或error作为结果)。

该接口看着像下面这样

type ReleaseInfoer interface {
        GetLatestReleaseTag(string) (string, error)
}

然后我们更新上面的函数直接调用使用GithubReleaseInfoer结构代替

type GithubReleaseInfoer struct {}

// Function to actually query the GitHub API for the release information.
func (gh GithubReleaseInfoer) GetLatestReleaseTag(repo string) (string, error) {
        // ... same code as above
}

更新后,getReleaseTagMessage和main像下面这样

// Function to get the message to display to the end user.
func getReleaseTagMessage(ri ReleaseInfoer, repo string) (string, error) {
    tag, err := ri.GetLatestReleaseTag(repo)
    if err != nil {
                return "", fmt.Errorf("Error query GitHub API: %s", err)
    }

        return fmt.Sprintf("The latest release is %s", tag), nil
}

func main() {
        gh := GithubReleaseInfoer{}
        msg, err := getReleaseTagMessage(gh, "docker/machine")
        if err != nil {
                fmt.Fprintln(os.Stderr, err)
                os.Exit(1)
        }

        fmt.Println(msg)
}

为什么要这么干?现在我们可以测试getReleaseTagMessage函数通过定义一个新的结构,只要实现了具有一个方法的ReleaseInfoer接口。这样,在测试的时候,我们就可以确保我们所依赖的方法的行为完全如我们期望的那样。

我们能定义一个FakeReleaseInfoer结构来表现我们想要的结构。我们只需要在结构中定义要返回的内容

package main

import "testing"

type FakeReleaseInfoer struct {
    Tag string
    Err error
}

func (f FakeReleaseInfoer) GetLatestReleaseTag(repo string) (string, error) {
    if f.Err != nil {
        return "", f.Err
    }

    return f.Tag, nil
}

func TestGetReleaseTagMessage(t *testing.T) {
    f := FakeReleaseInfoer{
        Tag: "v0.1.0",
        Err: nil,
    }

    expectedMsg := "The latest release is v0.1.0"
    msg, err := getReleaseTagMessage(f, "dev/null")
    if err != nil {
        t.Fatalf("Expected err to be nil but it was %s", err)
    }

    if expectedMsg != msg {
        t.Fatalf("Expected %s but got %s", expectedMsg, msg)
    }
}

从上面可以看到,FakeReleaseInfoer被设置为返回Tag v0.1.0和Err nil

这个测试很好,但是我们没有测试错误返回。这种情况最好也要测一下

在单元测试中有什么方法可以表达这个函数的各种测试用例和我们期望的返回值呢。当然,我们可以在一个函数中用一个匿名的结构体来构造测试用例和所期望的返回值

func TestGetReleaseTagMessage(t *testing.T) {
        cases := []struct {
                f           FakeReleaseInfoer
                repo        string
                expectedMsg string
                expectedErr error
        }{
                {
                        f: FakeReleaseInfoer{
                                Tag: "v0.1.0",
                                Err: nil,
                        },
                        repo:        "doesnt/matter",
                        expectedMsg: "The latest release is v0.1.0",
                        expectedErr: nil,
                },
                {
                        f: FakeReleaseInfoer{
                                Tag: "v0.1.0",
                                Err: errors.New("TCP timeout"),
                        },
                        repo:        "doesnt/foo",
                        expectedMsg: "",
                        expectedErr: errors.New("Error querying GitHub API: TCP timeout"),
                },
        }

        for _, c := range cases {
                msg, err := getReleaseTagMessage(c.f, c.repo)
                if !reflect.DeepEqual(err, c.expectedErr) {
                        t.Errorf("Expected err to be %q but it was %q", c.expectedErr, err)
                }

                if c.expectedMsg != msg {
                        t.Errorf("Expected %q but got %q", c.expectedMsg, msg)
                }
        }
}

注意reflect.DeepEqual的使用。这是来自标准库中的用来检查两个结构体是否相等的方法。这里用来检查错误是否相等,但也可以用来比较两个结构体的内容。仅仅使用 == 在这里并不能比较出相等,由于errors.New的使用(我尝试使用Error方法,但是对于nil值不工作,如果你有更好的方法可以告诉我)

这种技术在测试中可以获得更多对第三方库的控制。例如,Sam Alba的Golang Docker客户端会给你一个type DockerClient struct的交互,这对测试来说并不容易mock。但是你可以用type DockerClient interface在你自己的模块中创建一个模块,它指定你正在使用的方法dockerclient.DockerClient作为要实现的东西,在你的代码中使用它,然后创建你自己的接口版本来测试。

除了我在这里重点讨论的可测试性的好处外,使用接口可能会为您的程序的未来可扩展性带来巨大的利益。如果您已经构建了与GitHub API交互的每个组件,例如通过接口工作,则根本不需要更改程序的架构,以添加对其他源代码托管平台的支持。你可以简单地实现一个BitbucketReleaseInfoer 并使用它来包装Bitbucket API而不是GitHub。当然,这种类型的包装抽象将不适用于每个用例,但它可以用来强有力地模拟出外部和内部的依赖关系。

例子3 使用组合来测试一个更大的struct

上面的例子说明了一个可能非常有用的介绍性概念,但是有时候我们可能想要模拟一个struct相互依赖的部分,并分别测试每个部分。

如果你发现自己的一个interface或struct在一系列的要暴露方法中开始变大,那么分成几个更小的解耦并且相互组合可能是个好主意。例如,假设我们有个Job接口,它暴露了一个Log方法的内部和外部的结构。可以传递可变数量的参数传递给这个方法。它也提供Runing,Suspending和Resume方法

type Job interface {
    Log(...interface{})
    Suspend() error
    Resume() error
    Run() error
}

如果我们在工作中开发了一个struct并且实现了该接口,我们想使用内部的Log方法来记录日志。因此,像上面的例子那样实现整个接口是行不通的。那我们如何mock接口的部分来测试整个结构呢?

我们可以通过定义几个更小的接口然后使用组合。考虑一个Job,PollerJob的实现,用来做系统监控软件。我的第一版代码如下:

package main

import (
    "log"
    "net/http"
    "time"
)

type Job interface {
    Log(...interface{})
    Suspend() error
    Resume() error
    Run() error
}

type PollerJob struct {
    suspend     chan bool
    resume      chan bool
    resourceUrl string
    inMemLog    string
}

func NewPollerJob(resourceUrl string) PollerJob {
    return PollerJob{
        resourceUrl: resourceUrl,
        suspend:     make(chan bool),
        resume:      make(chan bool),
    }
}

func (p PollerJob) Log(args ...interface{}) {
    log.Println(args...)
}

func (p PollerJob) Suspend() error {
    p.suspend <- true
    return nil
}

func (p PollerJob) PollServer() error {
    resp, err := http.Get(p.resourceUrl)
    if err != nil {
        return err
    }

    p.Log(p.resourceUrl, "--", resp.Status)

    return nil
}

func (p PollerJob) Run() error {
    for {
        select {
        case <-p.suspend:
            <-p.resume
        default:
            if err := p.PollServer(); err != nil {
                p.Log("Error trying to get resource: ", err)
            }
            time.Sleep(1 * time.Second)
        }
    }
}

func (p PollerJob) Resume() error {
    p.resume <- true
    return nil
}

func main() {
    p := NewPollerJob("https://nathanleclaire.com")
    go p.Run()
    time.Sleep(5 * time.Second)

    p.Log("Suspending monitoring of server for 5 seconds...")
    p.Suspend()
    time.Sleep(5 * time.Second)

    p.Log("Resuming job...")
    p.Resume()

    // Wait for a bit before exiting
    time.Sleep(5 * time.Second)
}

上面程序的输出结构如下:

$ go run -race job.go
2015/10/11 20:37:59 https://nathanleclaire.com -- 200 OK
2015/10/11 20:38:01 https://nathanleclaire.com -- 200 OK
2015/10/11 20:38:02 https://nathanleclaire.com -- 200 OK
2015/10/11 20:38:03 https://nathanleclaire.com -- 200 OK
2015/10/11 20:38:04 https://nathanleclaire.com -- 200 OK
2015/10/11 20:38:04 Suspending monitoring of server for 5 seconds...
2015/10/11 20:38:10 Resuming job...
2015/10/11 20:38:10 https://nathanleclaire.com -- 200 OK
2015/10/11 20:38:11 https://nathanleclaire.com -- 200 OK
2015/10/11 20:38:12 https://nathanleclaire.com -- 200 OK
2015/10/11 20:38:14 https://nathanleclaire.com -- 200 OK
2015/10/11 20:38:15 https://nathanleclaire.com -- 200 OK
2015/10/11 20:38:16 https://nathanleclaire.com -- 200 OK

如果我们想测试各种复杂的互动,怎么办呢?所有的方法都放在一起,不使用外部资源测试程序的每个组件似乎是一件令人头疼的事情。

解决方案是将更高层的Job接口分解为几个其他接口,并将它们全部嵌入到PollerJob结构中,这样我们就可以在测试的时候将每个接口单独模拟出来。

我们能将Job接口拆分成几个不同的接口,如下所示:

type Logger interface {
    Log(...interface{})
}

type SuspendResumer interface {
    Suspend() error
    Resume() error
}

type Job interface {
    Logger
    SuspendResumer
    Run() error
}

您可以看到有一个SuspendResumer用于处理挂起/恢复功能的接口,并且一个Log仅用于管理Log方法的接口。另外,我们将创建一个PollServer接口来控制对我们正在轮询的服务器的状态调用:

type ServerPoller interface {
    PollServer() (string, error)
}

有了所有这些组件接口,我们就可以开始重新构建我们PollerJob的Job接口实现。通过嵌入Logger 和ServerPoller(两个接口)和一个指向PollSuspendResumer结构的指针,我们保证对于PollerJob作为一个Job的定义能通过编译。我们提供了一个NewPollerJob函数,它将提供一个结构的实例,并且正确地设置和初始化所有的组件。请注意,我们使用我们自己的组件实现了这个函数的返回。

type PollerLogger struct{}

type URLServerPoller struct {
    resourceUrl string
}

type PollSuspendResumer struct {
    SuspendCh chan bool
    ResumeCh  chan bool
}

type PollerJob struct {
    WaitDuration time.Duration
    ServerPoller
    Logger
    *PollSuspendResumer
}

func NewPollerJob(resourceUrl string, waitDuration time.Duration) PollerJob {
    return PollerJob{
        WaitDuration: waitDuration,
        Logger:       &PollerLogger{},
        ServerPoller: &URLServerPoller{
            resourceUrl: resourceUrl,
        },
        PollSuspendResumer: &PollSuspendResumer{
            SuspendCh: make(chan bool),
            ResumeCh:  make(chan bool),
        },
    }
}

其余的代码定义了相关的结构,并且可以在github上获得

这为我们提供了灵活性,当我们进行测试时,我们需要将PollerJob结构中的每个组件单独虚拟出来。每个组件可以在需要的地方重新使用和重复工作,更灵活,使我们能够从我们依赖的组件中获得更多的可能。

我们现在能单独测试Run,而不必与任何实际的服务器通信。我们只需要简单的控制ServerPoller的返回并且验证被写入的内容是否与我们预期的那样。因此测试文件看起来像下面这样。

package main

import (
    "errors"
    "fmt"
    "testing"
    "time"
)

type ReadableLogger interface {
    Logger
    Read() string
}

type MessageReader struct {
    Msg string
}

func (mr *MessageReader) Read() string {
    return mr.Msg
}

type LastEntryLogger struct {
    *MessageReader
}

func (lel *LastEntryLogger) Log(args ...interface{}) {
    lel.Msg = fmt.Sprint(args...)
}

type DiscardFirstWriteLogger struct {
    *MessageReader
    writtenBefore bool
}

func (dfwl *DiscardFirstWriteLogger) Log(args ...interface{}) {
    if dfwl.writtenBefore {
        dfwl.Msg = fmt.Sprint(args...)
    }
    dfwl.writtenBefore = true
}

type FakeServerPoller struct {
    result string
    err    error
}

func (fsp FakeServerPoller) PollServer() (string, error) {
    return fsp.result, fsp.err
}

func TestPollerJobRunLog(t *testing.T) {
    waitBeforeReading := 100 * time.Millisecond
    shortInterval := 20 * time.Millisecond
    longInterval := 200 * time.Millisecond

    testCases := []struct {
        p           PollerJob
        logger      ReadableLogger
        sp          ServerPoller
        expectedMsg string
    }{
        {
            p:           NewPollerJob("madeup.website", shortInterval),
            logger:      &LastEntryLogger{&MessageReader{}},
            sp:          FakeServerPoller{"200 OK", nil},
            expectedMsg: "200 OK",
        },
        {
            p:           NewPollerJob("down.website", shortInterval),
            logger:      &LastEntryLogger{&MessageReader{}},
            sp:          FakeServerPoller{"500 SERVER ERROR", nil},
            expectedMsg: "500 SERVER ERROR",
        },
        {
            p:           NewPollerJob("error.website", shortInterval),
            logger:      &LastEntryLogger{&MessageReader{}},
            sp:          FakeServerPoller{"", errors.New("DNS probe failed")},
            expectedMsg: "Error trying to get state: DNS probe failed",
        },
        {
            p: NewPollerJob("some.website", longInterval),

            // Discard first write since we want to verify that no
            // additional logs get made after the first one (time
            // out)
            logger: &DiscardFirstWriteLogger{MessageReader: &MessageReader{}},

            sp:          FakeServerPoller{"200 OK", nil},
            expectedMsg: "",
        },
    }

    for _, c := range testCases {
        c.p.Logger = c.logger
        c.p.ServerPoller = c.sp

        go c.p.Run()

        time.Sleep(waitBeforeReading)

        if c.logger.Read() != c.expectedMsg {
            t.Errorf("Expected message did not align with what was written:\n\texpected: %q\n\tactual: %q", c.expectedMsg, c.logger.Read())
        }
    }
}

请注意,创建我们自己的ReadableLogger接口进行测试并能够以各种方式实现Logger为我们提供了灵活性的帮助。Suspend而且Resume也同样能够被测试通过控制JobPoller组件的ServerPoller接口

func TestPollerJobSuspendResume(t *testing.T) {
    p := NewPollerJob("foobar.com", 20*time.Millisecond)
    waitBeforeReading := 100 * time.Millisecond
    expectedLogLine := "200 OK"
    normalServerPoller := &FakeServerPoller{expectedLogLine, nil}

    logger := &LastEntryLogger{&MessageReader{}}
    p.Logger = logger
    p.ServerPoller = normalServerPoller

    // First start the job / polling
    go p.Run()

    time.Sleep(waitBeforeReading)

    if logger.Read() != expectedLogLine {
        t.Errorf("Line read from logger does not match what was expected:\n\texpected: %q\n\tactual: %q", expectedLogLine, logger.Read())
    }

    // Then suspend the job
    if err := p.Suspend(); err != nil {
        t.Errorf("Expected suspend error to be nil but got %q", err)
    }

    // Fake the log line to detect if poller is still running
    newExpectedLogLine := "500 Internal Server Error"
    logger.MessageReader.Msg = newExpectedLogLine

    // Give it a second to poll if it's going to poll
    time.Sleep(waitBeforeReading)

    // If this log writes, we know we are polling the server when we're not
    // supposed to (job should be suspended).
    if logger.Read() != newExpectedLogLine {
        t.Errorf("Line read from logger does not match what was expected:\n\texpected: %q\n\tactual: %q", newExpectedLogLine, logger.Read())
    }

    if err := p.Resume(); err != nil {
        t.Errorf("Expected resume error to be nil but got %q", err)
    }

    // Give it a second to poll if it's going to poll
    time.Sleep(waitBeforeReading)

    if logger.Read() != expectedLogLine {
        t.Errorf("Line read from logger does not match what was expected:\n\texpected: %q\n\tactual: %q", expectedLogLine, logger.Read())
    }
}

测试一个小功能会有很多方法,但是随着代码量的增长,它要有很好的扩展。照这样mock可以使其更容易指定错误情况下的表现或者在复杂的并发情况下控制逻辑。

由于接口为测试提供了实用性和创造性,最好将外部依赖关系封装在一个中,然后将它们组合起来,以尽可能创建更高级的接口。正如你所希望的那样,即使小的单一方法的接口也可以被用来组合成更大的功能。

例子4:使用和构造标准库功能
如上图所示的概念是为自己的程序非常有用,但你也将注意到,许多在go标准库的构建可以在单元测试中以类似的方式进行管理.

我们来看看测试一个HTTP服务器的例子。在goroutine中实际启动HTTP服务器并向它发送你希望能够直接处理的请求(例如http.Get),但是这更像一个集成测试而不是一个合适的单元测试。下面看一个小型的http服务,并讨论如何进行测试。

package main

import (
    "fmt"
    "log"
    "net/http"
)

func mainHandler(w http.ResponseWriter, r *http.Request) {
    token := r.Header.Get("X-Access-Token")
    if token == "magic" {
        fmt.Fprintf(w, "You have some magic in you\n")
        log.Println("Allowed an access attempt")
    } else {
        http.Error(w, "You don't have enough magic in you", http.StatusForbidden)
        log.Println("Denied an access attempt")
    }
}

func main() {
    http.HandleFunc("/", mainHandler)
    log.Fatal(http.ListenAndServe(":8080", nil))
}

上面的Http服务监听在8080端口,并且检查是否有一个X-Access-Token的header被设置。如果token匹配上了我们的"magic"值,我们允许用户访问并且返回一个HTTP 200 OK的状态码。否则我们拒绝请求,返回一个403.这是对一些API服务器如何处理授权的简单模仿,该如何测试它呢?

正如你所看到的。这个mainHandler函数接收两个参数,a http.ResponseWriter(注意它是一个interface你可以通过阅读源http源码或文档来验证)和一个http.Request结构体指针。为了测试这个handler,我们能构造http.ResponseWriter 接口的实现,今后也可以继续使用,幸运的是,Go作者已经提供了一个httptest包含ResponseRecorder 结构的包,以帮助解决这个问题。这样的模块提供了通用的测试功能用一个有用而常见的模式。

鉴于此,我们也能手工构造一个http.Request结构通过调用NewRequest带上我们期望的参数。我们只需要简单的调用Header.Set在Request上来设置header。我们在NewRequest方法中指定它应该是GET方法,并且不再请求体中包含任何信息,同样我们也可以测试POST请求

初始化的测试如下:

package main

import (
    "bytes"
    "net/http"
    "net/http/httptest"
    "testing"
)

func TestMainHandler(t *testing.T) {
    rootRequest, err := http.NewRequest("GET", "/", nil)
    if err != nil {
        t.Fatal("Root request error: %s", err)
    }

    cases := []struct {
        w                    *httptest.ResponseRecorder
        r                    *http.Request
        accessTokenHeader    string
        expectedResponseCode int
        expectedResponseBody []byte
    }{
        {
            w:                    httptest.NewRecorder(),
            r:                    rootRequest,
            accessTokenHeader:    "magic",
            expectedResponseCode: http.StatusOK,
            expectedResponseBody: []byte("You have some magic in you\n"),
        },
        {
            w:                    httptest.NewRecorder(),
            r:                    rootRequest,
            accessTokenHeader:    "",
            expectedResponseCode: http.StatusForbidden,
            expectedResponseBody: []byte("You don't have enough magic in you\n"),
        },
    }

    for _, c := range cases {
        c.r.Header.Set("X-Access-Token", c.accessTokenHeader)

        mainHandler(c.w, c.r)

        if c.expectedResponseCode != c.w.Code {
            t.Errorf("Status Code didn't match:\n\t%q\n\t%q", c.expectedResponseCode, c.w.Code)
        }

        if !bytes.Equal(c.expectedResponseBody, c.w.Body.Bytes()) {
            t.Errorf("Body didn't match:\n\t%q\n\t%q", string(c.expectedResponseBody), c.w.Body.String())
        }
    }
}

但是,我们可以考虑的测试功能有一个显而易见的缺失。我们不检查写入的log内容是我们所期望的。我们该怎么做?

如果我们检查标准库的log包的源代码,我们就可以看到这个log.Println方法直接封装了一个Logger 结构的实例,内部调用了Write方法在Writer接口上(在使用std结构的情况下,如果你直接引用 log.*,那么Writer就是os.Stdout).我想知道是否有任何方法可以将接口设置为我们期望的那样,以便可以验证所写的就是我们期望的。

当然,有一种方法可以这样做,我们能引用log.SetOutput方法来指定我们自定义的writer为了记录日志。我们使用io.Pipe来创建Writer。这将为我们提供一个Reader,我们能用它来读随后的writer调用在Logger中。我们用bufio.Reader封装了给出的PipeReader,因此我们可以调用bufio.Reader的ReadString方法一行一行的读。

注意PipeWriter的文档:

Write实现了标准的写接口,它写入数据到管道,阻塞直到readers读完所有的数据或者read端被关闭。

因此,我们必须并发的读从PipeReader中,在mainHandler函数正在写入时,我在自己的goroutine中运行这个测试。在我原来的版本中我得到这个错误,并通过使用go test的-timeout标志发现了这个错误,如果超时的话它会导致panic。

最后组合起来,像下面这样:

func TestMainHandler(t *testing.T) {
    rootRequest, err := http.NewRequest("GET", "/", nil)
    if err != nil {
        t.Fatal("Root request error: %s", err)
    }

    cases := []struct {
        w                    *httptest.ResponseRecorder
        r                    *http.Request
        accessTokenHeader    string
        expectedResponseCode int
        expectedResponseBody []byte
        expectedLogs         []string
    }{
        {
            w:                    httptest.NewRecorder(),
            r:                    rootRequest,
            accessTokenHeader:    "magic",
            expectedResponseCode: http.StatusOK,
            expectedResponseBody: []byte("You have some magic in you\n"),
            expectedLogs: []string{
                "Allowed an access attempt\n",
            },
        },
        {
            w:                    httptest.NewRecorder(),
            r:                    rootRequest,
            accessTokenHeader:    "",
            expectedResponseCode: http.StatusForbidden,
            expectedResponseBody: []byte("You don't have enough magic in you\n"),
            expectedLogs: []string{
                "Denied an access attempt\n",
            },
        },
    }

    for _, c := range cases {
        logReader, logWriter := io.Pipe()
        bufLogReader := bufio.NewReader(logReader)
        log.SetOutput(logWriter)

        c.r.Header.Set("X-Access-Token", c.accessTokenHeader)

        go func() {
            for _, expectedLine := range c.expectedLogs {
                msg, err := bufLogReader.ReadString('\n')
                if err != nil {
                    t.Errorf("Expected to be able to read from log but got error: %s", err)
                }
                if !strings.HasSuffix(msg, expectedLine) {
                    t.Errorf("Log line didn't match suffix:\n\t%q\n\t%q", expectedLine, msg)
                }
            }
        }()

        mainHandler(c.w, c.r)

        if c.expectedResponseCode != c.w.Code {
            t.Errorf("Status Code didn't match:\n\t%q\n\t%q", c.expectedResponseCode, c.w.Code)
        }

        if !bytes.Equal(c.expectedResponseBody, c.w.Body.Bytes()) {
            t.Errorf("Body didn't match:\n\t%q\n\t%q", string(c.expectedResponseBody), c.w.Body.String())
        }
    }
}

我希望这些例子清楚地说明了在Go标准库中以及在你自己的代码中具有良好架构的接口的价值,以及如何读取你所依赖的模块的源代码(包括Go标准库,它是很准确的文档)可以让你更好的理解你正在使用的代码,以及简化测试。

基于系统日志分析进行异常检测

日志解析:https://github.com/logpai/logparser
异常检测:https://github.com/logpai/loglizer
预备知识:需要对逻辑回归、决策树、SVM、PCA、聚类等有一些了解
论文原文: https://github.com/AmateurEvents/article/blob/master/System-Log-Analysis-for-Anomaly-Detection.pdf

前言

异常检测在现代大规模分布式系统的管理中起着重要作用。记录系统运行时信息的日志广泛用于异常检测。传统上,开发人员(或操作者)经常用关键字搜索和规则匹配手动检查日志。然而,现代系统的规模和复杂性不断增加,使得日志爆增,这使得人工检测变得不可行。为了减少人工工作量,提出了许多基于自动日志分析的异常检测方法。然而,开发人员可能仍然不知道他们应该采用哪种异常检测方法,因为这些异常检测方法之间缺乏review和比较。此外,即使开发人员决定采用异常检测方法,重新实现也需要付出不小的努力。为了解决这些问题,我们详细回顾和评估了六种最先进的基于日志的异常检测方法,包括三种监督方法和三种非监督方法,并发布了一个开源工具包,便于重用。在两个公开可用的生产日志数据集上对这些方法进行了评估,共有15923592条日志消息和365298个异常实例。我们相信,我们的工作,连同评估结果以及相应的调查结果,可以为采用这些方法提供指导,并为未来的发展提供参考。

一 说明

现代系统正在向大规模发展,通过构建的数千台机器来扩展分布式系统(例如hadoop,spark),通过使用数千台处理器的超级计算机来扩展高性能计算(例如Blue Gene/L )。这些系统正在成为IT行业的核心部分,支持多种在线服务(如搜索引擎、社交网络和电子商务)和智能应用(如天气预报、商业智能和生物医学工程)。因为这些系统大多设计为全天候运行,为全球数百万在线用户提供服务,所以高可用性和可靠性成为必须。这些系统的任何事件,包括服务中断和服务质量下降,都会导致应用程序崩溃,并导致巨大的收入损失。异常检测旨在及时发现异常系统行为,在大规模系统的事件管理中发挥着重要作用。及时的异常检测允许系统开发人员(或操作员)及时发现问题并立即解决,从而减少系统停机时间。系统通常会生成日志,记录系统运行期间的详细运行时信息。这种广泛可用的日志被用作系统异常检测的主要数据源。基于日志的异常检测已经成为学术界和工业界具有实际重要性的研究课题。对于传统的独立系统,开发人员根据他们的领域知识手动检查系统日志或编写规则来检测异常,并额外使用关键字搜索(例如,“失败”、“异常”)或正则表达式匹配。然而,这种严重依赖人工检查日志的异常检测对于大规模系统来说已经变得不充分,基于以下几点:

  1. 现代系统的大规模和并行性使得系统行为过于复杂,每个开发人员都无法理解,他们通常只负责子组件。例如,许多开源系统(例如Hadoop、Spark )由数百名开发人员实现。开发人员可能对整个系统行为只有不完全的了解,因此从大量日志中识别问题是一项巨大的挑战。

  2. 现代系统正在以每小时约50Gb(约1.2亿~ 2亿行)的速度生成大量日志,这种日志的庞大数量使得从噪声数据中手动识别关键信息以进行异常检测变得非常困难,即使是使用搜索和grep这样的实用工具也是如此。

  3. 大规模系统通常采用不同的容错机制来构建。系统有时会以冗余方式运行相同的任务,甚至主动终止推测性任务以提高性能。在这种情况下,使用关键字搜索的传统方法对于提取这些系统中的可疑日志消息变得无效,这可能导致许多误报,这些误报实际上是与实际故障无关的日志消息。这将大大增加人工检查的工作量。

因此,针对异常检测的自动日志分析方法非常受欢迎。基于日志的异常检测在过去几十年里得到了广泛的研究。然而,我们发现学术界的研究和工业实践之间存在差距。一方面,开发人员在许多情况下不了解最新的异常检测方法,因为目前缺乏对这一主题的全面审查。他们必须阅读大量文献才能全面了解当前的异常检测方法。这是一项繁琐的任务,但并不能保证找到最合适的方法,因为每项研究工作通常都只是针对特定系统的一个详细报告。如果开发人员事先没有理解这些方法所需的机器学习背景知识,这一困难可能会加剧。另一方面,据我们所知,目前没有基于日志的开源工具可用于异常检测。现有异常检测方法之间也缺乏比较。对于开发人员来说,很难知道哪种方法是解决手头实际问题的最佳方法。为了比较所有候选方法,他们需要用自己的实现来尝试每一种方法。通常需要付出巨大的努力来重现这些方法,因为没有谁能保证正确实现底层的机器学习算法。

为了弥补这一差距,本文对基于日志的异常检测进行了详细的回顾和评估,并发布了一个开源异常检测工具包。我们的目标不是改进任何特定的方法,而是描绘当前异常检测日志分析研究的总体情况。我们相信,我们的工作可以在两个方面为研究人员和实践人员带来好处:回顾可以帮助他们快速理解当前的异常检测方法;而开源工具包允许他们容易地重用现有的方法,并进行进一步的定制或改进。这有助于避免耗时但重复的重新实施工作.

异常检测的日志分析过程包括四个主要步骤:日志收集、日志解析、特征提取和异常检测。在我们的最后一项工作中,我们介绍了自动日志解析方法的回顾和评估,其中公开发布了四个开源日志解析器。在这项工作中,我们将主要关注异常检测的特征提取和机器学习模型。根据所涉及的数据类型和采用的机器学习技术,异常检测方法可以分为两大类:监督异常检测和非监督异常检测。受监督的方法需要有标签的训练数据,对正常情况和异常情况有明确的说明。然后利用分类技术来学习模型,以最大限度地区分正常和异常实例。然而,无监督的方法根本不需要标签。他们的工作基于这样的观察,即异常实例通常表现为远离其他实例的异常点。因此,可以应用无监督学习技术,如聚类。

更具体地说,我们回顾并实施了最近文献中报道的六种代表性异常检测方法,包括三种有监督的方法(即逻辑回归、决策树和SVM )和三种无监督的方法(即日志聚类、PCA和不变量挖掘)。我们还在两个公开可用的日志数据集上对这些方法进行了系统评估,共有15923592条日志消息和365298个异常实例。评估结果以精确度(报告的异常正确率)、召回率(检测到的真实异常率)和效率(不同日志大小的运行时间)报告。虽然数据有限,但我们认为这些结果以及揭示的相应发现可以为采用这些方法提供指导,并作为未来发展的基础。

总之,本文做出了以下贡献:

  1. 基于自动日志分析的常用异常检测方法的详细回顾
  2. 一个开源工具包,由六种典型的异常检测方法组成
  3. 当前异常检测方法有效性和效率的系统评估

本文的其余部分组织如下。第二节描述了基于日志的异常检测的总体框架。第三节回顾了六种典型的异常检测方法。我们在第四节报告了评估结果,并在第五节进行了一些讨论。第六节介绍了相关工作,最后第七节总结了论文

二 框架预览

图1说明了基于日志的异常检测的总体框架。异常检测框架主要包括四个步骤:日志收集、日志解析、特征提取和异常检测。

image

日志搜集:大规模系统通常会生成日志来记录系统状态和运行时信息,每个日志都包括时间戳和指示发生了什么的日志消息。这些有价值的信息可以用于多种目的(例如,异常检测),首先收集日志以供进一步使用。例如,图1描绘了从亚马逊EC2平台[ 47 ]上的HDFS日志中提取的8条日志线,而为了便于展示,这里省略了一些字段

日志解析:日志是非结构化的,包含自由形式的文本。日志解析的目的是提取一组事件模板,从而可以构造原始日志。更具体地说,每个日志消息都可以被解析成带有一些特定参数(可变部分)的事件模板(恒定部分)。如图1所示,第四条日志消息(日志4 )被解析为"Event2",事件模板=》"Received block * of size * from *"

特征提取:将日志解析成单独的事件后,我们需要进一步将它们编码成数字特征向量,从而可以应用机器学习模型。为此,我们首先使用不同的分组技术将原始日志分割成一组日志序列,包括固定窗口、滑动窗口和会话窗口。然后,对于每个日志序列,我们生成一个特征向量(事件计数向量),表示每个事件的发生次数。所有特征向量一起可以形成特征矩阵,即事件计数矩阵

异常检测:最后,可以将特征矩阵馈送给机器学习模型进行训练,从而生成异常检测模型。所构建的模型可用于识别新进入的日志序列是否异常。

三 实践

在这一部分中,我们详细回顾了不同阶段的方法:日志解析、特征提取和异常检测。对于日志解析,我们简要给出了基本**,并介绍了几种典型的日志解析器。然后,讨论了三种特征提取技术,它们被应用于解析的日志事件以生成特征向量。在获得特征向量后,我们重点研究了六种有代表性的异常检测方法,其中三种是有监督的方法,另外三种是无监督的方法。

A 日志解析

日志是由固定部分和可变部分组成的纯文本,这些部分在不同的事件中可能会有所不同。例如,对于给出的日志 "Connection from 10.10.34.12 closed" and "Connection from 10.10.34.13 closed",单词 Connection from closed被认为是不变的部分,因为他们总是保持相同,而其余的部分被认为是可变部分,因为它们总是在变。开发人员在源代码中预先定义了常量部分,并且变量部分通常是动态生成的(例如端口号、IP地址),这在异常检测中无法很好地利用。日志解析的目的是将常量部分与变量部分分开,并形成一个成熟的日志事件(即示例中的"Connection from * closed")

有两种类型的日志解析方法:基于聚类的方法(例如,LKE[20],LogSig[44])和基于启发式的方法(例如,iPLoM[29],SLCT[45])。在基于聚类的日志分析器中,首先计算日志之间的距离,在下一步中,通常使用聚类技术将日志分组到不同的聚类中。最后,从每个集群生成事件模板。对于基于启发式的方法,计算每个日志位置上每个单词的出现次数。接下来,频繁的单词被选择并合成为事件候选词。最后,选择一些候选项作为日志事件。在我们以前的作品[24]中,我们实现并比较了四个日志分析器。此外,我们在线上发布了一个开源日志解析工具包,用于将原始日志解析成日志事件。

B. 特征提取

该步骤的主要目的是从日志事件中提取有价值的特征,这些特征可以被输入异常检测模型。特征提取的输入是日志解析步骤中生成的日志事件,输出是事件计数矩阵。为了提取特征,我们首先需要将日志数据分成不同的组,其中每个组代表一个日志序列。为此,窗口被应用于将日志数据集划分成有限块。如图1所示,我们使用三种不同类型的窗口:固定窗口、滑动窗口和会话窗口

固定窗口

固定窗口和滑动窗口都基于时间戳,时间戳记录每个日志的发生时间。每个固定窗口都有其大小,这意味着时间跨度或持续时间。如图1所示,窗口大小为∆t,这是一个常量值,例如一小时或一天。因此,固定窗口的数量取决于预定义的窗口大小。同一窗口中发生的日志被视为日志序列

滑动窗口

与固定窗口不同,滑动窗口由两个属性组成:窗口大小和步长,例如,每小时窗口每五分钟滑动一次。通常,步长小于窗口大小,因此会导致不同窗口的重叠。图1显示了窗口大小是∆T,而步长是转发距离。滑动窗口的数量通常大于固定窗口,主要取决于窗口大小和步长。发生在同一滑动窗口中的日志也被分组为日志序列,尽管由于重叠,日志可能会在多个滑动窗口中重复

会话窗口

与上述两种窗口类型相比,会话窗口基于标识符而不是时间戳。标识符用于在一些日志数据中标记不同的执行路径。例如,带有block_id的HDFS日志记录了某些数据块的分配、写入、复制和删除。因此,我们可以根据标识符对日志进行分组,其中每个会话窗口都有一个唯一的标识符。

在利用窗口技术构建日志序列之后,生成事件计数矩阵X。在每个日志序列中,我们计算每个日志事件的发生次数,以形成事件计数向量。例如,如果事件计数向量是[ 0、0、2、3、0、1、0 ],这意味着在这个日志序列中,事件3发生了两次,事件4发生了三次。最后,大量事件计数向量被构造成事件计数矩阵X,其中条目Xi, j记录了事件j在第i个日志序列中发生了多少次。

C.监督类异常检测

监督学习(例如决策树)被定义为从标记的训练数据中导出模型的机器学习任务。标记训练数据是监督异常检测的前提,它通过标记来指示正常或异常状态。训练数据的标签越多,模型就越精确。下面我们将介绍三种有代表性的监督方法:逻辑回归、决策树和支持向量机( SVM )。

1.逻辑回归

Logistic回归是一种被广泛用于分类的统计模型。为了决定实例的状态,逻辑回归估计所有可能状态(正常或异常)的概率p。概率p由逻辑函数计算,逻辑函数建立在标记的训练数据上。当出现新实例时,逻辑函数可以计算所有可能状态的概率p (0<p<1 )。获得概率后,概率最大的状态即为分类输出

为了检测异常,从每个日志序列中构造一个事件计数向量,每个事件计数向量及其标签称为实例。首先,我们使用训练实例来建立逻辑回归模型,这实际上是一个逻辑函数。在获得模型后,我们将一个测试实例X输入到逻辑函数中,以计算其异常可能性p,当p≥0.5时,X的标记是异常的,否则是正常的

2.决策树

image

决策树是一个树形结构图,它使用分支来说明每个实例的预测状态。决策树是使用训练数据以自顶向下的方式构建的。每个树节点都是使用当前的“最佳”属性创建的,这是通过属性的信息增益来选择的。例如,图2中的根节点显示,我们的数据集中总共有20个实例。分割根节点时,事件2的出现次数被视为“最佳”属性。因此,根据该属性的值,整个20个训练实例被分成两个子集,其中一个包含12个实例,另一个包含8个实例

决策树首次应用于Web请求日志系统的故障诊断。事件计数向量及其在第III-B节中描述的标签被用来构建决策树。为了检测新实例的状态,它根据每个遍历树节点的谓词遍历决策树。在遍历结束时,实例将到达其中一个叶子,这反映了该实例的状态

3.SVM

支持向量机是一种有监督的分类学习方法。在SVM中,超平面被构造成在高维空间中分离不同类别的实例。找到超平面是一个优化问题,它使超平面和不同类别中最近的数据点之间的距离最大化。

在参考[26]中,使用SVM检测故障并将其与其他方法进行比较。类似于逻辑回归和决策树,训练实例是事件计数向量及其标签。在通过SVM的异常检测中,如果一个新实例位于超平面上方,它将被报告为异常,否则被标记为正常。支持向量机有两种,即线性支持向量机和非线性支持向量机。在本文中,我们只讨论线性支持向量机,因为在我们的大多数实验中,线性支持向量机的性能优于非线性支持向量机。

D.非监督类异常检测

与有监督的方法不同,无监督学习是另一种常见的机器学习任务,但是它的训练数据没有标记。由于缺乏标签,无监督方法更适用于现实生产环境。常见的非监督方法包括各种聚类方法、关联规则挖掘、PCA等。

1.聚类

在参考[27]中,设计一种基于聚类的方法,称为LogCollect,用于识别在线系统问题。LogCluster需要两个培训阶段,即知识库初始化阶段和在线学习阶段。因此,训练实例分别被分为这两个阶段的两个部分。

知识库初始化阶段包括三个步骤:日志矢量化、日志聚类、代表性向量提取。首先,日志序列被矢量化为事件计数向量,并通过逆文档频率( IDF ) [ 41 ]和归一化进一步修正。其次,LogCluster将正常和异常事件计数向量分别聚类,并以聚集层次聚类的方式生成两组向量聚类(即正常聚类和异常聚类)作为知识库。最后,我们通过计算每个聚类的质心来选择一个代表向量。

在线学习阶段用于进一步调整知识库初始化阶段构建的集群。在线学习阶段,事件计数向量被一个接一个地添加到知识库中。给定一个事件计数向量,计算它和现有代表向量之间的距离。如果最小距离小于阈值,此事件计数向量将被添加到最近的群集,并且此群集的代表向量将被更新。否则,LogCluster将使用此事件计数向量创建一个新群集。

构建知识库并完成在线学习过程后,可以使用LogCollect来检测异常。具体来说,为了确定新日志序列的状态,我们计算它到知识库中代表性向量的距离。如果最小距离大于阈值,则日志序列被报告为异常。否则,如果最近的群集是正常/异常群集,日志序列将被报告为正常/异常

2.PCA

image

PCA是一种统计方法,被广泛用于进行降维。PCA背后的基本**是将高维数据(例如,高维点)投影到由k个主分量(即,k个维度)组成的新坐标系中,其中k被设置为小于原始维度。PCA通过寻找捕捉高维数据中最大方差的分量(即轴)来计算k个主分量。因此,PCA变换的低维数据可以保留原始高维数据的主要特征(例如,两点之间的相似性)。例如,在图3中,PCA试图将二维点转换为一维点。Sn被选为主要成分,因为点之间的距离可以通过将它们映射到Sn来最好地描述。

PCA首次应用于基于日志的异常检测,参考[47]。在它们的异常检测方法中,每个日志序列被矢量化为事件计数向量。之后,PCA被用来寻找事件计数向量维度之间的模式。利用PCA,生成两个子空间,即正常空间Sn和异常空间Sa。Sn由前k个主成分构成,Sn由剩余的( n - k )构成,其中n是原始尺寸。然后,计算事件计数向量y到Sa的投影:

image

其中P = [ v1,v2,.。。,vk,]是前k个主成分。如果ya的长度大于阈值,相应的事件计数向量将被报告为异常。例如,图3中选择的点是异常的,因为它在Sa上的投影长度太大。具体而言,一个事件计数向量被看作一个异常,如果下面的条件满足:

image

其中平方预测误差(即SPE )表示“长度”,而Qα是提供( 1α)置信水平的阈值。我们将Q=0.001设置为原始文件中的值。对于k,我们通过调整PCA来自动计算它,以捕获95 %的数据方差,也与原始论文相同。

3.Invariants Mining

image

程序不变量是线性关系,即使在不同的输入和不同的工作负载下,在系统运行过程中也始终保持这种关系。不变量挖掘首次应用于基于日志的异常检测在[28]。具有相同会话id (例如,HDFS中的块id )的日志通常表示该会话的程序执行流程。简化的程序执行流程如图4所示。

在这个执行流程中,系统在从A到G的每个阶段都生成一条日志消息。假设系统中运行着大量的实例,并且它们遵循图4中的程序执行流程,下面的等式将是有效的:

n(A) = n(B)
n(B) = n(C) + n(E) + n(F)
n(C) = n(D)
n(G) = n(D) + n(E) + n(F)

其中n(*)表示属于相应事件类型*的日志数量。
直觉上,不变量挖掘可以揭示代表系统正常执行行为的多个日志事件之间的线性关系(例如,n ( A ) = n ( B ) )。线性关系在现实世界的系统事件中占主导地位。例如,通常,文件打开后必须关闭。因此,带有短语“打开文件”的日志和带有短语“关闭文件”的日志将成对出现。如果实例中日志事件“打开文件”和“关闭文件”的数量不相等,它将被标记为异常,因为它违反了线性关系。

不变量挖掘旨在寻找不变量(即线性关系),包含三个步骤。不变量挖掘的输入是从日志序列生成的事件计数矩阵,其中每行都是事件计数向量。首先,利用奇异值分解估计不变空间,确定下一步需要挖掘的不变量r。其次,该方法通过强力搜索算法找出不变量。最后,通过将其支持度与阈值(例如,98 %的事件计数向量支持)进行比较,验证每个挖掘的不变候选。该步骤将继续,直到获得r个独立不变量。

在基于不变量的异常检测中,当新的日志序列到达时,我们检查它是否遵循不变量。如果至少有一个不变量被破坏,日志序列将被报告为异常。

E.方法比较

为了加强对上述六种异常检测方法的理解,帮助开发人员更好地选择要使用的异常检测方法,我们在这一部分讨论了不同方法的优缺点。

对于监督方法,异常检测需要标签。决策树比其他两种方法更容易解释,因为开发人员可以通过有意义的解释(即树节点中的谓词)来检测异常。Logistic回归不能解决线性不可分离的问题,这可以通过使用基于核函数的SVM来解决。然而,SVM的参数很难调整(例如,惩罚参数),因此建立模型通常需要大量人工努力。

由于缺乏标签,无监督的方法更加实用和有意义。日志聚类使用在线学习的**。因此,它适合处理大量日志数据。不变量挖掘不仅可以高精度地检测异常,而且可以为每个检测到的异常提供有意义和直观的解释。然而,不变量挖掘过程非常耗时。PCA不容易理解,并且对数据敏感。因此,不同数据集的异常检测精度不同。

F.工具实现

我们在Python中实现了6种异常检测方法,有4000多行代码,并将它们打包成工具包。对于有监督的方法,我们利用广泛使用的机器学习包scikit-learning[39]来实现Logistic回归、决策树和SVM的学习模型。SVM和Logistic回归中有很多参数,我们在训练中手动调整这些参数以获得最佳结果。对于SVM,我们逐一尝试了不同的核函数和相关参数,发现线性核函数支持向量机比其他核函数具有更好的异常检测精度。对于逻辑回归,也探索了不同的参数,并对它们进行了仔细的调整,以获得最佳性能。

然而,实现无监督的方法并不简单。对于日志聚类,我们不能直接使用scikit-learn中的聚类API,因为它不是为大规模数据集设计的,因为我们的数据不适合内存。我们将聚类算法实现为在线版本,其中每个数据实例被一个接一个地分组为一个聚类。有多个阈值需要调整。我们也付出了巨大的努力来实现不变量挖掘方法,因为我们为可能的不变量建立了搜索空间,并提出了多种方法来修正所有不必要的不变量。测试阈值的不同组合非常耗时。我们最终实现了PCA方法,根据原始参考,基于scikit-learning中的API。PCA只有两个参数,很容易调整。

四 结果评估

在这一部分,我们将首先介绍我们使用的数据集和我们评估的实验设置。然后,我们分别提供有监督和无监督异常检测方法的评估结果,因为这两种方法通常适用于不同的环境。最后,对所有这些方法的效率进行了评估。

A.实验设计

image

日志数据集:公开发布的生产日志是稀缺的数据,因为公司由于机密问题很少发布它们。幸运的是,通过探索大量文献并与相应的作者密切联系,我们成功地获得了两个日志数据集,HDFS数据[47]和BGL数据[36],它们适合于评估现有的异常检测方法。这两个数据集都是从生产系统收集的,共有15923592条日志消息和365298个异常样本,由原始领域专家手动标记。因此,我们将这些标签(异常与否)作为准确性评估的基础事实。表一提供了数据集的更多统计信息。

HDFS数据包含11175629条日志消息,这些消息是从亚马逊EC2平台[47]收集的。HDFS日志记录每个数据块操作(如分配、写入、复制、删除)的唯一数据块ID。因此,日志中的操作可以更自然地被会话窗口捕获,如III - B中所介绍的,因为每个唯一的块ID可以用来将日志分割成一组日志序列。然后,我们从这些日志序列中提取特征向量,并生成575061个事件计数向量。其中,16838个样本被标记为异常。

BGL数据包含4747963条日志信息,由Lawrence Livermore国家实验室( LLNL ) [36]的BlueGene / L超级计算机系统记录。与HDFS数据不同,BGL日志没有记录每个作业执行的标识符。因此,我们必须使用固定窗口或滑动窗口将日志切片为日志序列,然后提取相应的事件计数向量。但是窗口的数量取决于选择的窗口大小(和步长)。在BGL数据中,348460条日志消息被标记为故障,如果该序列中存在任何故障日志,则日志序列被标记为异常。

实验设置:我们在Linux服务器上运行所有实验,该服务器配备英特尔至强E5 - 2670 v2 CPU和128 GB DDR 3 1600 RAM,64位Ubuntu 14.04.2和Linux内核3.16.0正在其上运行。除非另有说明,每个实验运行五次,并报告平均结果。我们使用精度、召回率和F-measure (最常用的度量)来评估异常检测方法的准确性,因为我们已经掌握了这两个数据集的基本真相(异常与否)。如下所示,精度测量报告的异常正确率,召回率测量检测到的真实异常率,F-measure表示精度和召回率的调和平均值。

对于所有三种监督方法,我们选择前80 %的数据作为训练数据,剩下的20 %作为测试数据,因为只有先前发生的事件可能导致后续的异常。默认情况下,我们将固定窗口的窗口大小设置为1小时,将滑动窗口的窗口大小和步长分别设置为6小时和1小时。

B.监督方法的准确性

image

image

image

为了探索监督方法的准确性,我们使用它们来检测HDFS数据和BGL数据上的异常。我们使用会话窗口分割HDFS数据,然后生成事件计数矩阵,而固定窗口和滑动窗口分别应用于BGL数据。为了检验三种监督方法(即Logistic回归、决策树、SVM )的有效性,我们首先在训练数据上训练模型,然后将其应用于测试数据。我们报告了不同环境下的训练精度和测试精度,如图7 ~ 9所示。我们可以观察到,所有监督方法都达到了很高的训练精度(超过0.95 ),这意味着通过使用我们的特征表示,正常实例和异常实例可以很好地分离。然而,它们对测试数据的准确性因不同的方法和数据集而异。HDFS数据的总体精度高于固定窗口和滑动窗口BGL数据的精度。这主要是因为HDFS系统只记录了29种事件类型的相对简单的操作,这远远小于BGL数据中的385种。此外,HDFS数据按会话窗口分组,从而在每个日志序列中的事件之间产生更高的相关性。因此,HDFS上的异常检测方法比BGL上的异常检测方法性能更好。

image

image

特别是,图5显示了HDFS数据异常检测的准确性,所有三种方法在F-measure接近1的情况下测试数据时都有出色的性能。当在固定窗口的BGL测试数据上应用监督方法时,虽然它们在训练数据上表现良好,但并不能达到高精度。如图6所示,在具有固定窗口的BGL上,所有三种方法的召回率仅为0.57,而它们的高检测精度为0.95。我们发现,由于固定窗口大小只有一个小时,因此,它可能会导致异常的不均匀分布。例如,当前窗口中发生的一些异常可能实际上与前一个时间窗口中的事件有关,并且它们被错误地划分。因此,一小时固定窗口的异常检测方法在BGL数据上表现不佳。

发现1 :有监督的异常检测方法实现了高精度,而召回率因不同数据集和窗口设置而异

为了解决固定窗口性能差的问题,我们使用滑动窗口来拆分BGL数据,设置窗口大小为6h、步长为1h。结果如图7所示。与固定窗口相比,基于滑动窗口的异常检测对测试数据具有更高的准确性。原因在于,通过使用滑动窗口,我们不仅可以获得与固定窗口一样多的窗口(事件计数向量),而且可以避免由于窗口大小大得多而导致的分布不均的问题。在有监督的方法中,我们观察到支持向量机F-measure值为0.85时达到了最佳的整体精度。。此外,与固定窗口的结果相比,基于滑动窗口的决策树和逻辑回归在召回率方面分别提高了10.5%和31.6%。

image

为了进一步研究不同窗口大小和不同步长对异常检测精度的影响,我们通过改变一个参数同时保持另一个参数不变来进行实验。根据图8的图表a ),我们将步长保持在一小时,同时改变窗口大小,如表II所示。大于12小时的窗口大小不被考虑,因为它们在实际应用中不实用。我们可以观察到,随着窗口大小的增加,SVM的F-measure略有下降,而Logistic回归的精度先缓慢增加,但当窗口大小增加到9小时时,精度急剧下降,然后保持稳定。显而易见,当窗口大小为6小时时,逻辑回归获得了最高精度。决策树准确率的变化趋势与logistic回归相反,在12小时达到最高准确率。因此,逻辑回归对窗口大小敏感,而决策树和SVM保持稳定。

发现2 :滑动窗口的异常检测可以比固定窗口获得更高的精度。

与窗口大小相比,步长可能对异常检测精度有很大影响。表二显示,如果我们减小步长,同时将窗口大小保持在6小时,滑动窗口(数据实例)的数量会急剧增加。所有三种方法都显示出相同的趋势,精度首先略有提高,然后在3小时左右下降。这可能是因为当使用大步长时,例如在3小时,数据实例的数量急剧减少。六小时的步长出现了一个例外:窗口大小等于步长,因此滑动窗口与固定窗口相同。在这种情况下,由重叠引起的一些噪声被去除,这导致检测精度的小幅度提高。

C.无监督方法的准确性

尽管受监督的方法实现了高精度,特别是在HDFS数据上,但是这些方法不一定适用于实际环境,因为在实际环境中数据标签经常不可用。针对这一问题,提出了无监督异常检测方法。为了探索无监督方法的异常检测精度,我们在HDFS数据和BGL数据上对它们进行评估。如上一节所示,滑动窗口可以导致更准确的异常检测。因此,我们只报告BGL数据上滑动窗口的结果。

由于日志群集在具有50万个实例的HDFS数据上非常耗时,因此调整参数变得不切实际,因此我们选择在合理时间内可以处理的最大日志大小来表示HDFS数据。

在图9中,我们可以观察到,所有无监督的方法在HDFS数据上显示出良好的准确性,但是它们在BGL数据上获得的准确性相对较低。在三种方法中,不变量挖掘在这两种数据上取得了优于其他无监督异常检测方法的性能( F-measure为0.91 )。不变量挖掘自动构建线性相关模式来检测异常,这与BGL数据的本质非常吻合,在BGL数据中,故障通过一些关键事件来标记。日志聚类和PCA在BGL数据上没有获得良好的检测精度。日志聚类的性能差是由事件计数矩阵的高维稀疏特性造成的。因此,日志聚类很难区分异常和正常情况,这通常会导致大量误报。

image

我们进行了深入的研究,以进一步理解为什么PCA不能在BGL数据上实现高精度。PCA检测异常的标准是到正常空间的距离(平方预测误差)。如图10所示,当距离大于特定阈值时(红色虚线表示我们当前的阈值),一个实例被识别为异常。然而,通过使用地面真实标签来绘制距离分布,如图10所示,我们发现两个类别(正常和异常)不能被任何单一的阈值自然分开。因此,PCA在BGL数据上表现不佳。

发现3 :与监督方法相比,非监督方法的性能通常较差。但是不变量挖掘是一种性能稳定、高效的方法。

像监督方法一样,我们也对不同的窗口大小和步长设置进行实验,以探索它们对准确性的影响。如图11所示,我们有一个有趣的观察,即随着窗口大小的增加,精度会稳步提高,而步长的变化对精度影响很小。这一观察与我们发现的监督方法相反。如表II所示,当窗口大小增加时,窗口数量大大减少。给定更大的窗口大小,覆盖更多的信息,同时也可以添加更多的噪声,但是无监督的方法可以发现更准确的异常检测模式

发现4 :窗口大小和步长的设置对监督方法和非监督方法有不同的影响。

D.异常检测方法的效率

image

在图12中,所有这些异常检测方法的效率都在两个日志大小不同的数据集上进行评估。如图所示,有监督的方法可以在短时间内(不到一分钟)检测异常,而无监督的方法更耗时( PCA除外)。我们可以观察到,除了时间复杂度为O ( N2 )的日志聚类之外,所有异常检测方法都随着日志大小的增加而线性扩展。请注意,水平轴和垂直轴都不是线性比例。此外,日志聚类无法在可接受的时间内处理大规模数据集;因此,日志聚类的运行时间结果没有完全绘制出来。值得注意的是,不变量挖掘的运行时间大于BGL数据上的日志聚类,而不是HDFS数据上的日志聚类,因为BGL数据中的事件类型多于HDFS数据,这增加了不变量挖掘的时间。此外,还应该注意的是,在BGL数据的日志大小为125兆字节时,不变量挖掘的运行时间略有减少。这是因为我们设置了停止标准来控制其在大型数据集上的强力搜索过程,这可以避免不必要的高维相关性搜索。

发现5 :大多数异常检测方法随着日志大小线性扩展,但是日志聚类和不变量挖掘方法需要进一步优化以加快速度。

五.讨论

在这一部分中,我们讨论了我们工作的一些局限性,并进一步为未来的研究提供了一些潜在的方向。

数据集的多样性:生产系统记录的日志对于评估异常检测方法非常重要。然而,公开可用的日志数据集是稀缺资源,因为公司经常由于机密问题而不愿意打开他们的日志数据。这是评估变得困难的地方。由于[36],[47]作者的支持,我们获得了两个生产日志数据集,这使得我们的工作得以进行。数据集代表两种不同类型系统的日志,但是评估结果和发现可能仍然受到数据集多样性的限制。显然,更多日志数据集的可用性将允许我们总结我们的发现,并大力支持相关研究。我们未来的计划是从开放平台收集更多日志数据集。

通常,不同的系统通常有非常不同的日志,如HDFS和BGL数据集所示。为了概括我们不同异常检测方法的实现,我们主要关注由事件计数矩阵表示的特征空间,这已经在大多数现有工作中使用(例如,[28],[4] )。还有一些其他特征需要进一步探索,例如日志消息的时间戳,由此可以提取两个连续事件的持续时间和日志序列的顺序信息。然而,正如[28]报道的那样,现代分布式系统生成的日志通常由不同的进程交织在一起。因此,从这些日志中提取可靠的时间特征成为一个巨大的挑战。

我们已经审查并实施了大多数常用的、有代表性的异常检测日志分析方法。然而,还有一些采用不同模型的其他方法,例如频繁序列挖掘[22]、有限状态机[20]、形式概念分析[18]和信息检索[30]。我们还认为,由于日志分析的实际重要性,将会出现更多的日志。我们正在实施和维护一套更全面的开源工具。

开源日志分析工具:目前缺乏可直接用于异常检测的公开日志分析工具。我们还注意到,一系列新公司(如[3]、[4])正在提供日志分析工具作为他们的产品。但是他们都像黑匣子一样工作。这将导致可重复研究的难度增加,并减慢整个创新过程。我们希望我们的工作为公开源代码迈出了第一步,我们提倡在这方面做出更多努力。

潜在的方向

  1. 方法的可解释性。目前大多数基于日志的异常检测方法都建立在机器学习模型(如PCA )上。但是这些模型中的大多数都像一个“黑匣子”。也就是说,它们很难解释来提供直观的见解,开发人员经常无法弄清楚异常是什么。非常需要能够反映异常性质的方法
  2. 实时日志分析。当前的系统和平台经常实时生成大量日志。因此,实时处理大日志数据成为一大挑战。大数据平台上日志分析工具的开发和实时异常检测的功能是需要的

六.相关工作

日志分析:日志分析已被广泛用于提高软件系统在许多方面的可靠性,如异常检测[10]、[28]、[47]、故障诊断[17]、[31]、[38]、程序验证[11]、[42]和性能预测[16]。这些日志分析方法大多包括两个步骤:日志解析和日志挖掘,这两个步骤近年来得到了广泛的研究。He等人[24]评估四种离线日志解析方法的有效性,SLCT[45]、IPLOM [29]、LogSig [44]和LKE [20],它们不需要系统源代码。Nagappan等人[34]提出了一种享受线性运行时间和空间的离线日志解析方法。徐等人[47]设计了一种基于系统源代码的在线日志解析方法。对于日志挖掘,徐等人[47]使用PCA检测异常,PCA的输入是从日志中生成的矩阵。Bessatnikh等人[11]利用系统日志生成有限状态机,描述系统运行时行为。不同于这些使用日志分析来解决不同问题的论文,我们关注基于日志分析的异常检测方法。

异常检测:异常检测的目的是发现异常行为,这可以报告给开发人员进行手动检查和调试。Bovenzi等人[13]提出了一种操作系统级别的异常检测方法,对于任务关键型系统是有效的。Venkatakrishnan等人[46]检测安全异常,以防止攻击破坏系统。与这些侧重于检测特定异常的方法不同,本文评估了大规模系统中一般异常检测方法的有效性。巴奔科等人[9]设计了一种利用异常检测到的故障自动生成解释的技术。阿隆索等人[6]通过使用不同的分类器来检测异常。Fashchi等人[19]采用基于回归的分析技术来检测云应用操作的异常。Azevedo等人[8]使用聚类算法来检测卫星中的异常。这些方法利用不同系统收集的性能指标数据,可以补充本文评估的基于日志的异常检测方法。基于日志的异常检测得到了广泛研究,[19]、[20]、[28]、[31]、[43]、[47]。在这篇论文中,我们回顾和评估了六种异常检测方法,这些方法采用了日志分析[12]、[15]、[26]、[27]、[28]、[47],因为它们新颖且具有代表性。

经验学习:近年来,出现了许多关于软件可靠性的实证研究,因为实证研究通常可以为研究人员和开发人员提供有用和实用的见解。袁等人[48]研究开源系统的日志记录实践,并为开发者提供改进建议。傅等人[21],[49]对工业伐木实践进行了实证研究。Pechia等人[37]研究工业项目中影响测井分析的测井目标和问题。阿莫林等人[7]评估使用决策树算法识别代码气味的有效性。兰萨罗等人[25]分析库代码中的软件故障如何表现为接口错误。萨哈等人[40]从五个不同的角度研究长寿的虫子。米伦科夫斯基等人[33]调查并系统化计算机入侵检测系统评估中的常见做法。钱德拉等人[14]调查在不同类别中使用机器学习技术的异常检测方法,但是本文旨在回顾和对比将日志分析技术应用于系统异常检测的现有工作。

七.结论

日志被广泛用于检测现代大规模分布式系统中的异常。然而,由于日志大小的急剧增加,严重依赖人工日志检查的传统异常检测变得不可能。为了减少人工工作量,近年来,自动日志分析和异常检测方法得到了广泛研究。然而,开发人员仍然不知道最先进的异常检测方法,并且经常不得不自己重新设计一种新的异常检测方法,因为目前的方法缺乏全面的回顾和比较。在本文中,我们通过详细回顾和评估六种最先进的异常检测方法来填补这一空白。我们还在两个代表性的生产日志数据集上比较了它们的准确性和效率。此外,我们发布了这些异常检测方法的开源工具包,以便于重用和进一步研究。

从 Function Worker 到 Pulsar IO

前言

本文主要分享关于 Function Worker 的实现原理,以及如何一步一步演化出 Pulsar IO。

Function Worker

image

这张图是对 Function Worker 非常直观的描述,可以看到 Function Worker 主要由 Metadata Manager,Scheduler Manager,Runtime Manager 和 Membership Manager 组成。

  • Metadata Manager: 用来存放元数据。
  • Membership Manager: 主要用来维护各个 Worker 之间的关系,选举就是在这里完成的。
  • Scheduler Manager: 用来完成调度。
  • Runtime Manager: 就是我们的 function 和 Pulsar IO 真正运行的地方。

准备知识

订阅

image

在 Pulsar 中有四种订阅模式,分别是 Exclusive,Failover,Shared 和 Key_Shared

  • Exclusive: 在同一个 Topic 上不能出现相同名字的订阅。
  • Failover:如果当前的订阅挂了,会有另外一个相同名字的订阅顶替它,这是一种容灾和高可用的策略,Function Worker 的高可用就是基于此来实现的。
  • Shared: 同一个 Topic 上可以出现多个相同的订阅,数据会被轮流发给不同的订阅者。
  • key_Shared: Shared 订阅模式的升级版。

生产、消费

Consumer consumer = client.newConsumer()
  .topic("my-topic")
  .subscriptionName("my-subscription")
  .subscribe();

Reader reader = pulsarClient.newReader()
        .topic(topic)
        .startMessageId(id)
        .create();

Producer<byte[]> producer = client.newProducer()
        .topic(topic)
        .create();

这是生产和消费的代码,逻辑很简单,但是很重要,在 Function Worker 的实现中大量复用了这部分逻辑。上面包括了初始化 Consumer、Reader、Producer 三部分的内容。Reader 可以认为是对 Consumer 的封装,基于 Exclusive 的定于模式,使用起来更加方便。

Membership Manager

image

Membership Manager 维护了各个 Worker 之间的关系,当我们启动了多个 Function Worker 的时候,会在这里基于 Pulsar 的 Failover 订阅模式选举出一个 leader,后面的调度都由该 leader 完成。

在这个图上有 Worker1 Worker2 和 Worker3 三个 Function Worker,它们基于 Pulsar 的 Failover 订阅模式使用 participant 的订阅名称订订阅到 coordinate 这个 Topic 上,当前 Worker2 成为了它们中的 leader。基于 Failover 订阅模式,如果一个 Worker 挂了,另一个会自动变成 leader。关于选举的测试可以参考这篇文章。

当各个 Function Worker 启动之后,首先会使用这样的代码来实现初始化一个 leader,它们都会使用相同的的订阅名称订阅到相同的 Topic 上,但是只有一个处于活跃状态,该 Worker 就是 leader,后面的调度任务都是由它来完成。

consumer = (ConsumerImpl<byte[]>) client.newConsumer()
                .topic(workerConfig.getClusterCoordinationTopic())
                .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION)
                .subscriptionType(SubscriptionType.Failover)
                .consumerEventListener(this)
                .property(WORKER_IDENTIFIER, consumerName)
                .subscribe();

function worker 作为 Pulsar IO 和 Pulsar Function 的运行时环境,基于 Pulsar 的 pub/sub 实现了高可用,选举,调度等, function worker 可以完全独立于 Pulsar 部署,对于我们使用 Pulsar 来构建大型系统有很多可以借鉴的地方。

Metadata Manager

image

我们对 Source/Sink/Function 做的一些操作,例如 listupdatecreatedelete 等的元数据管理都是在这里实现的。

Reader<byte[]> reader = pulsarClient.newReader()
                    .topic(this.workerConfig.getFunctionMetadataTopic())
                    .startMessageId(MessageId.earliest)
                    .create();

初始化了一个 reader,从 Pulsar 的 Topic 中接收数据,Metadata Manager 的实现,主要做了几件事情:

  • 初始化 Reader。
  • 设置订阅 metadata 这个 Topic。
  • 设置从 earliest 开始消费。

用户的请求数据会通过一个 Producer 发送过来。

pulsarClient.newProducer().topic(functionMetadataTopic).create()

Scheduler Manager

image

Producer<byte[]> producer = client.newProducer().topic(config.getFunctionAssignmentTopic())
                                .enableBatching(false)
                                ......
                                .createAsync().get(10, TimeUnit.SECONDS);

调度服务是基于 producer 来实现,调度算法实现了 IScheduler 接口,当前实支持 RoundRobin 的调度算法。获取到消息并执行,开始分配任务。

Reader<byte[]> reader = this.getWorkerService().getClient().newReader()
                    .topic(this.getWorkerConfig().getFunctionAssignmentTopic()).readCompacted(true)
                    .startMessageId(MessageId.earliest).create();

Runtime Manager

image

用来执行我们提交的实例,包括 create, stop, delete, start, restart 等。
当前 Runtime 支持基于进程,线程和 k8s 模式来运行,instance 支持使用 Python、Java 和 Golang 来开发。

从 Instance 到 Function

现在 Function Worker 的各个组件都介绍完了,接着我们来看一下运行在 Function Worker 上的 Instance。

def process(input):
    return "{}!".format(input)

这是使用 Python 开发的一个 Instance,它会接收一个输入 input,并在最后面增加一个 !,然后输出。这样 Function 就出现了,如下图,它接收输入,进行计算,然后输出。

image

在 Function 中输入正好对应 Pulsar 的 pub/sub 模型中的 Consumer,输出则对应 Producer,计算层是用户自己开发的逻辑。Function 与 Function 通过 Topic 进行连接。

从 Function 到 Pulsar IO

image

上面在 Pulsar 中的计算模型 Function 已经出现了,可以看到对于 Function 来说,它处理的是 Pulsar 内部的数据,它总是能从一个 Topic 中接收数据,然后输出数据到另外的一个 Topic 中,这很好的处理了 Pulsar 中一些简单的计算任务,但是很多时候我们都需要同外部系统打交道,这时我们就迫切的需要 Pulsar 能够方便的连接外部系统,例如 db,log 等。

Source

image

如图我们把 Function 模型中左边的 Consumer 去掉,保留右边的 Producer,自然就可以从外部系统接收数据,这样就实现了 Pulsar IO 中的 Source。

Sink

image

如图我们把 Function 模型中右边的 Producer 去掉,保留左边的 Consumer,这样就实现了将 Pulsar Topic 中的数据输出到外部系统,在 Pulsar 中我们称之为 Sink。

总结

从 Instance 到 Function 再到 Source 和 Sink,基于 Pulsar 的 Consumer 和 Producer 似乎是自然而然的事情,Function Worker 作为 Pulsar IO 和 Pulsar Function 的运行时环境,基于 Pulsar 的 pub/sub 实现了高可用,选举,调度等,Function Worker 可以完全独立于 Pulsar 部署,对于我们使用 Pulsar 来构建大型系统有很多可以借鉴的地方。

Apache Pulsar事务设计草案

动机

  1. 用例
    1. 事件处理
    2. 原子性
  2. 事务保证
    1. 隔离级别
  3. 总览
    1. 概念
      1. 事务协调器
      2. 事务缓冲区
      3. 事务确认
      4. 物化机制
    2. 事务流
      1. 事务开始
      2. 事务循环
        1. 增加分区到事务
        2. 发送消息到分区
        3. 增加订阅到事务
        4. 响应消息到订阅
      3. 结束事务
        1. 结束事务请求
        2. 最终确定过程
        3. 标记事务为COMMITTED或者ABORTED

设计选型

  1. 事务缓冲区
    1. Marker Approach
      1. 预览
      2. 挑战
        1. 物化
        2. 清理
        3. 保留
      3. 改变
      4. 讨论
    2. Sidecar Approach
      1. 预览
      2. 挑战
        1. 物化
        2. 清理
        3. 保留
      3. 改变
      4. 讨论
        1. 大事务

本文为我学习PIP 31: Transaction Support时做的一些笔记,中间有不少翻译的不准确的地方,可以结合原文来进行更详细的阅读

materialization:物化,这个概念需要和事务的隔离级别READ_COMMITTED一起来理解更好

动机

本文概述了Apache Pulsar支持事务性消息传递的草案。事务被用来加强Apache Pulsar的消息传递语义,以及Pulsar Functions的处理保证。

Apache Pulsar目前提供的最高级别的消息传递保证是通过幂等生成器在一个分区上“准确地”生成一次。保证用户通过幂等生成器生成到单个分区的每条消息都将被准确保存一次,不会丢失数据。当生产者试图向多个分区产生消息时,不存在“原子性”。例如,当broker崩溃时,发布失败可能会发生,如果生产者没有重试或者已经用尽了重试次数,消息可能不会被写入pulsar。在消费者方面,确认目前是一项best-effort的操作,这将导致消息重新传递,因此消费者会收到重复的消息。pulsar只保证消费者至少消费一次。

类似地,Pulsar Functions只保证对幂等函数上的单个事件进行准确处理一次。它不能保证多个事件或产生多个结果时能够准确处理一次。例如,如果一个函数接受多个事件并产生一个结果(例如,窗口函数),该函数可能会在产生结果和确认传入消息之间,甚至在确认单个事件之间失败。这将导致所有(或一些)传入消息被重新投递和重新处理,并产生新的结果。

Pulsar和Pulsar Functions的用户将从事务支持中获取较大的收益。写入或处理的每条消息都将准确地发生一次,不会重复,也不会丢失数据——即使在broker或者函数实例失败的情况下也是如此。事务性不仅让基于Pulsar和Pulsar Funcitons来开发应用更容易,也使Pulsar的应用范围更广阔。

用例

事件处理

事务性保证事件处理(又名事件流、流处理)应用程序获得巨大好处。当事件流的被重复处理不可接受时,在事件处理的应用中典型的例子是“consume-transform-produce”类型的任务需要事务性保证。

Pulsar Functions就是这样的一个基于Apache Pulsar构建的事件处理框架。函数调用是典型的“consume-transform-produce”类型的任务——它从一个(或多个)主题中读取事件,通过调用函数来处理事件,并将结果产生到一个(或多个)主题中。

许多SPE (流处理引擎)或自定义的事件处理逻辑(使用Pulsar的生产者和消费者)属于“consume-transform-produce”的范畴。它需要能够在一次原子操作中向不同的主题分区生成并确认一批消息。原子性意味着要么提交所有消息(所有输出消息都被精确地保存一次,并且所有输入消息都被确认),要么没有任何消息被提交。

单个事务中生成的消息数量和确认的消息数量可能会有所不同,从几条消息到大量消息不等。主题分区(又名分区)的数量也会有所不同,从一个分区到多个分区。这取决于处理效率、窗口/缓冲逻辑以及许多其他与应用相关的因素。理解上述内容的典型例子是事件处理应用程序中的窗口操作。当读者阅读本文档时,我们鼓励大家记住这类用例,因为这将在本提案的剩余部分中推动许多设计选择。

原子性

以原子方式产生多个消息是“consume-transfer-produce”类型的一种特殊情况,它只需要以原子方式“produce”消息。

Atomic produce有一个默认5M大小的消息限制在Apache Pulsar中。你可以将消息的限制调整到非常大,但这意味着你必须使用非常大的网络和磁盘IO,这对内存和磁盘不是很友好。如果大家将一条大消息分解成多条小消息,并将每个小消息作为一条pulsar消息发送,我们需要确保以原子方式发送这些消息。

数据库CDC或使用Apache Pulsar搜集日志是atomic producing的典型用例。

事务保证

如“动机”部分所述,提供事务将使事件流应用程序能够在一个原子操作中消费、处理和生成消息。

这意味着,事务中的一批消息可以从许多分区接收、产生并向其确认。事务中涉及的所有操作都将作为一个单元成功或失败。

然而,我们不能保证在提交的事务中产生的消息会被其下游消费者一起消费。这基于以下几个原因:

  1. 消费者可能不会从参与提交事务的所有分区中消费。因此,他们永远无法读取事务中包含的所有消息。
  2. 消费者可能有不同的接收者队列大小或缓冲/窗口大小,他们可能只对消费一定数量的消息感兴趣。该数量可以是任意数字。

然而,我们可能能够支持一起消费提交给一个分区的消息。但这取决于我们在下面选择的设计。由于对这一特性没有强烈的要求,我们暂时将这一点排除在保证范围之外。

隔离级别

与数据库事务类似,事件流系统中的事务也将具有隔离级别。隔离级别为:

  • READ_UNCOMMITTED :这意味着没有隔离,消费者能够读取未提交的消息。
  • READ_COMMITTED :所有消费者只能阅读提交的消息。
  • SERIALIZABILITY : 在多个分区上执行多个事务相当于事务的某种串行执行(总排序)。如果事务A在B之前提交,这保证了事务A的提交消息出现在事务B的提交消息之前。因此,消费者将能够在这些事务涉及的分区中看到提交消息的完全相同的顺序。

Pulsar事务必须支持的隔离级别是READ_COMMITTED。是否支持READ_UNCOMMITTED或SERIALIZABILITY需要依赖Pulsar用户的输入。

总览

Pulsar中有许多设计事务支持的方法。所有这些建议都可以形成一个共同的框架,我们将在本节中讨论。之后的章节将详细描述基于这个框架的细节。

概念

事务协调器

为了实现消息的事务性,我们必须引入一个名为事务协调器(又名TC )的服务器端模块。TC管理生产者发送的消息和消费者发送的确认的事务,并作为一个整体进行提交或中止操作。

事务协调器会将事务状态持久保存在一些持久存储中(例如,由单独的主题或表服务中的表支持的事务日志)以进行恢复。我们将在下一节中讨论如何实现TC,以及TC如何管理事务状态。

事务缓冲区

事务中产生的消息将存储在事务缓冲区(又称TB )中。除非消费者提交事务,否则不会将TB中的消息持久化(可见)。当事务中止时,TB中的消息将被丢弃。根据TB的实现方式,可能需要一个清理过程(例如压缩)来清理中止事务的消息。

TB的实现要求:

  1. 无论生产者如何重试生成消息,生成到TB的消息都不会重复。
  2. 在broker崩溃期间不会丢失消息。
    还有其他因素需要考虑,例如清理中止事务的消息、写扩增、排序等。我们将讨论TB的解决方案以及里面的一些权衡。

事务确认

事件流应用程序(例如Pulsar Functions)可能包括消费者和生产者,其中应用程序消费来自输入pulsar主题的消息,并产生新消息以输出到pulsar主题。为了实现精确的一次(exactly one),我们需要确保输入消息上的确认作为事务的一部分发生,以便实现原子性。否则,如果确认输入主题和生产输出主题的消息之间出现故障,将根据两个操作的顺序发生数据重复或数据丢失:如果首先生产者提交了消息,然后发生了故障,则输入消息将在恢复时重新投递,因为它们未被确认,因此数据重复;如果首先确认输入消息,则提交失败的输出消息将不会重新生成,因为输入消息已被确认,因此会丢失数据。

因此,我们需要在事务中包含确认来保证原子性。为了实现这一点,我们必须改变事务中确认的行为。因为目前pulsar中的所有确认都只是best-effort的操作。ack可能在网络断开或broker崩溃期间丢失,这将导致数据重复。

我们还需要考虑单个确认和累积确认之间的提交冲突。在接下来的章节中,我们将讨论如何增强消费者协议和游标管理,以支持事务中的确认。

物化机制

对于附加到TB的消息,事务实现还应该提供物化机制来物化未提交的消息,以使它们在事务提交时对消费者可见。这种物化机制因TB的实现而异。

物化机制还应该考虑隔离级别(如果我们想要支持比READ_COMMITTED更高的隔离级别)。

我们将在后面的章节中讨论事务如何实现未提交的消息。

事务流

所有事务实现都可以使用以上章节中描述的这些关键组件/概念来构造。

image

在图1中,事务流如下:

  • 灰色方框代表不同的broker
  • 灰色圆形框表示可以在broker内部运行或作为独立服务运行的逻辑组件(例如,像作为broker的一部分的function worker)
  • 所有蓝色方框代表日志。日志可以是pulsar主题、bookkeeper ledger或managed ledger。
  • 每个箭头代表请求流或消息流。这些操作按每个箭头旁边的数字指示的顺序进行。
  • 下面的部分编号与图表中显示的操作相匹配。

开始事务

在事务开始时,pulsar客户端会找到一个事务协调器(TC)。TC将为事务分配一个事务id(又名TxnID)。事务将在事务日志中记录其事务id和打开状态(表示事务是打开的) (如步骤1a所示)。这确保了无论TC崩溃,事务状态都保持不变。事务状态被记录到日志后,TC将事务id回复给pulsar客户端。

事务循环

在这个阶段,pulsar客户端将进入一个事务循环,重复consume-transform-produce由事务组成的消息的动作。这是一个漫长的阶段,可能包含多个生成和确认请求。

增加分区到事务

在pulsar客户端向新的主题分区生成消息之前,客户端向TC发送一个请求,将该分区添加到事务中。TC将事务的分区更改记录到其事务日志中,以确保持久性(如2.1a所示)。这一步确保TC知道事务接触的所有分区,因此TC可以在分区结束阶段提交或中止每个分区上的更改。

发送消息到分区

pulsar客户端开始向分区产生消息。该生产流程与正常消息生产流程相同。唯一的区别是由事务产生的一批消息将包含事务id。接收该批消息的broker检查该批消息是否属于事务。如果它不属于事务,broker将批处理直接写入分区的managed ledger(这是正常的生产流程)。如果它属于一个事务,broker将把它们写入事务的事务缓冲区。

事务缓冲区必须满足以下要求:
a. 就算broker崩溃,附加到事务缓冲区的消息都应该持久保存。
b. 无论生产者在网络断开时如何重试产生相同的消息,消息都应该精确地追加一次。
c. 在提交事务之前,不应将消息物化呈现给消费者。

事务缓冲区可以以多种方式实现。它可以是managed ledger本身,一个独立的managed ledger,或一些其他实现。我们将在后面的章节中讨论关于事务缓冲区设计选择的更多细节。

增加订阅到事务

pulsar客户端在新的订阅首次被确认为事务的一部分时向TC发送请求。TC在步骤2.3a中记录事务的订阅添加。该步骤确保TC知道事务覆盖的所有订阅,因此TC可以在结束事务阶段提交或中止对每个订阅的更改。

响应消息到订阅

pulsar客户端开始确认订阅消息。该事务确认流程与正常确认流程相同。然而,确认请求将携带一个事务id。接收确认请求的broker检查确认是否属于该事务。如果它属于一个事务,broker将把消息标记为PENDING_ACK状态。PENDING_ACK状态意味着在确认被提交或中止之前,消息不能被其他消费者确认或否认。(参见“New Acknowledgement State”部分的详细信息)这使得如果一条消息上有两个事务试图确认,只有一个会成功,另一个会中止。

结束事务

在事务结束时,应用程序将决定提交或中止事务。当在确认消息上检测到冲突时,事务也可以中止。

结束事务请求

当pulsar客户端完成一个事务时,它可以向TC发出一个结束事务请求,其中一个字段指示事务是提交还是中止。

收到该请求后,TC将:

  1. 将提交或中止消息写入其事务日志(如3.1a所示)。
  2. 开始向该事务中涉及的所有分区提交或中止消息或确认的过程。它在3.2中展示并进行相应的描述。
  3. 在成功提交或中止该事务中涉及的所有分区后,TC将提交或中止消息写入其事务日志。它在3.3中展示并进行相应的描述。
最终确定过程

在此阶段,TC将通过提交确认、终止确认所有分区上的消息来完成事务。

提交生产的消息是将消息进行物化,并使它们对消费者可见(如图3.2a所示)。由于故障(例如恢复后的重试、网络断开等),提交操作可能会发生多次。TB实现必须确保在提交过程中不会引入重复。

中止生成的消息将丢弃TB中的消息。如果事务中止,TB必须确保清理这些消息并回收空间。
提交确认将消息从PENDING_ACK移动到ACK.。中止确认将不会确认消息,因此该消息将被重新传递给其他消费者。

标记事务为COMMITTED或者ABORTED

对于所有分区生产的消息被提交或终止确认之后,TC将最终COMMITTED或ABORTED 的事务状态消息写入其事务日志,指示事务已完成(如图3.3a所示)。此时,与其事务日志中的事务相关的所有消息都可以安全地删除。

该图显示了涉及不同组件的整个事务流程。然而,这些组件的实现细节在这里没有很好地讨论。我们将在下面几节中更详细地讨论它们,并逐个组件地比较设计选择。

此外,在改进事务流程方面还可以进行许多优化。这些都被排除在这个提案之外,以确保我们从一个可靠和健壮的实现开始,先把事情做好。

设计选型

“Transaction Coordinator”和“Transactional Acknowledgement”很容易实现。详见“A Full Proposal”一节。最具挑战性的部分将是“Transaction Buffer”部分,因为将会有许多不同权衡的建议。这些提案将在下文讨论。

事务缓冲区

概括我们上面对事务流的描述,事务缓冲实现应该考虑以下几点:

  1. 事务循环期间:
    a. 即使broker崩溃,附加到事务缓冲区的消息都应该持久保存。
    b. 无论生产者在网络断开时如何重试产生相同的消息,消息都应该精确地被追加一次。
    c. 在提交事务之前,不应将消息物化呈现给其他消费者。
  2. 物化机制,用于物化事务缓冲区中的消息,使它们对其他消费者可见
    a. 消息如何物化将影响我们如何向消费者发送消息
  3. 一种清除机制,用于清理事务缓冲区中的消息以回收磁盘空间。

Marker Approach

实现事务缓冲区的方法之一是:

  • 重用分区作为“事务缓冲区”
  • 将事务中涉及的消息直接写入分区
  • 将它们标记为不可见,消费者在提交事务之前无法消费它们
  • 提交事务时,将消息标记为可见
  • 事务中止时清除(在后台)消息。

预览

image

图2标记方法演示了标记方法的样子。灰色方框代表正常客户端(通过非事务流程)产生的消息;颜色框代表由事务产生的消息;不同的颜色表示不同的事务。事务产生的每个消息将被标记为“- ”(例如“txn2-m2”)。<txn>-commit < txn>-abort 是提交或中止给定事务时附加的标记。

在这种方法下,所有事务性消息都直接附加到分区的managed ledger下。向broker发送消息时,需要添加额外的元数据(例如TxnID字段)。broker调度程序检查这些元数据以及事务状态,以决定是否应该调度它们。每个事务都将使用TxnID作为生产者ID,因此broker可以使用de-duplication来确保消息只被添加到分区一次。当事务协调器开始提交或中止事务时,它会向分区日志中写入'-commit'或者'-abort',以将事务标记为“COMMITTED”或“ABORTED”。此时,COMMITTED事务的消息可以安全地发送给消费者,ABORTED事务的消息可以通过后台的扫描进程来清理。
图2展示了3个事务,“txn1”、“txn2”和“txn3”。“txn1”和“txn2”被提交,而“txn3”被中止。

挑战

这种方法存在一些挑战。

物化

<txn>-commit是用于将事务标记为'COMMITTED'并将消息物化给消费者的提交标记。它也是事务的“fencing”点——在这个标记之后产生给同一个事务的任何消息都将被拒绝。

因为一个事务可以在多个消息上传播,所以我们需要一个为事务索引消息的机制。因此,当物化发生时,调度程序知道如何获取消息并正确地调度它们。

这可以通过MessageDeduplication游标来完成。当前,消息MessageDeduplication游标维护生产者标识与其序列id之间的映射。我们可以扩展它来维护txn id和它的消息的消息id列表之间的映射。当TC提交事务时:

  1. 将事务的消息id列表添加为提交标记的一部分。
  2. 将提交标记写入分区。
  3. 成功写入标记后,从MessageDeduplication游标中删除事务,因为事务已经实现。如果需要,可以将事务添加到事务缓存中进行快速查找(可选)。
清理

<txn>-abort是用于将事务标记为'ABORTED'的提交标记。一旦事务被标记为“ABORTED”,该事务的消息就可以安全地删除。但是,由于managed ledger仅支持追加,因此无法从分区中删除单个消息。所以这些信息不容易删除。消息必须等到保留过期,或者需要额外的“压缩”过程来压缩成段以删除中止事务的消息。这需要重写一个新的段。我们可以改进当前pulsar的压缩逻辑来实现它,或者作为将数据移动到分层存储的一部分来处理这个过程。

保留

在当前的方法中,由于事务性消息(提交和中止的消息)与普通消息交织在一起,broker应该小心确认。因为如果消息所属的事务尚未完成(提交或中止),游标不能向前移动。

改变

总之,这种方法需要更改以下组件:
在消息元数据中引入新字段,让broker判断消息是否属于事务。
在消息元数据中引入新字段,以判断消息是否是事务标记。
更改MessageDeduplication以维护事务id及其消息id列表之间的映射。
更改broker调度程序,跳过未物化的消息调度
更改压缩或卸载程序逻辑以丢弃属于中止事务的消息

在这种方法中,我们最终可能会触及broker的几乎每一个部分。

讨论

有几个性能相关的讨论点:
由于附加的事务消息和事务提交可以在不同的时间发生,所以同一事务的消息不会连续存储(逻辑上和物理上都在bookie上)。因此条目缓存行为可以非常随机。例如,在图2中,当读取txn2的消息时,它必须跳回txn2-m1,然后读取“txn2-m1”和“txn2-m2”;broker读取txn1的消息,它必须跳回读取txn1-m1。

不仅如此,在这个提案中,我们将普通消息与事务性消息混合在一起,这将显著改变普通消息的缓存行为,这可能会导致代理有更多的网络I/O。

Sidecar Approach

与标记方法相反,其他方法可以描述为sidercar approach,基本要点如下:

  • 使用单独的“managed ledger”作为“transaction buffer”。咱们使用transaction log重命名本节的讨论。
  • 将事务中涉及的消息写入事务日志。
  • 因为消息没有被写入分区,所以在提交之前消费者看不到它们。
  • 提交事务时,将“COMMIT”消息写入分区,包括其消息的消息id列表,然后将相同的“COMMIT”消息写入事务日志。我们将在后面的章节中讨论为什么我们需要两个“COMMIT”消息。
  • 中止事务时,向事务日志中写入“ABORT”消息,将事务标记为中止。后台进程清理中止事务的消息。

预览

image

图3 Sidecar Approach展示了Sidecar Approach的样子。灰色方框代表正常客户端(通过非事务流程)产生的消息;颜色框代表由事务产生的消息;不同的颜色表示不同的事务。事务产生的每个消息将被标记为“txn- ”(例如“txn2-m2”)。<txn>-commit<txn>-abort 是提交或中止给定事务时附加的标记。

在这种方法中,所有事务消息都直接追加到分区的事务日志中。每个事务都将使用TxnID作为生产者ID发送到分区的事务日志中,因此broker可以使用de-duplication逻辑来确保消息准确地附加到事务日志中一次。

  1. 当事务协调器开始提交事务时,它会向分区写入一个“-commit”标记,以指示事务标记为“COMMITTED”。此操作会密封事务。当broker调度程序看到提交标记时,它将作为事务消息的索引。必须改进deduplication游标,使其包含事务日志和分区,以确保提交标记准确写入一次。
  2. 当事务协调器开始中止一个事务时,它会向事务日志中写入一个“-abort”标记,以指示该事务被标记为“ABORTED”。此操作会密封事务。之后,在事务日志中的事务消息能被后台进程清理

与标记方法相比,只有“commit”标记被写入分区,因此调度器和保留策略几乎不会改变。“commit”标记只是一个指向一批消息的指针。事务数据和普通数据的分离将确保:
事务性用例不会影响正常的用例
它在事务用例和正常用例之间建立了隔离。尤其是在缓存方面。正常用例的缓存行为将保持不变。我们可以创建一个增强的条目缓存来优化事务消息访问。

图3展示了3个事务,“txn1”、“txn2”和“txn3”。“txn1”和“txn2”被提交,而“txn3”被中止。

挑战

在这个方法中有几个挑战
物化

与标记方法类似,我们使用提交标记“-commit”将事务标记为“COMMITTED”,以将消息物化后给消费者。

清理

<txn>-abort是用于将事务标记为'ABORTED'的提交标记。一旦事务被标记为“ABORTED”,该事务的消息就可以安全地删除。但是,由于事务日志是仅追加的,因此无法从分区中删除单个消息。需要在后台运行一个额外的“压缩”过程来压缩事务日志,以删除中止事务的消息。

保留
与标记方法相比,保留变得容易得多。当确认发生在提交标记上时,它会将提交标记加载到内存中,并找到要确认的事务的消息id。然后,它会将这些消息标记为事务日志中已确认的消息。

改变

总之,这种方法需要更改以下组件: 1. 引入一个新的元消息——元消息有一个指向其他消息的指针列表。调度程序将通过跟踪指针来解析元消息以读取实际消息。 2. 更改MessageDeduplication以维护事务id及其消息id列表之间的映射,并处理事务日志和分区的游标。 3. 引入新的压缩逻辑来压缩一个段,以丢弃中止事务的消息

讨论

大事务

如果我们将支持无限大小的消息建模为一系列消息块的事务,我们可以引入一个设置来告诉broker使用单独的ledger在分区上存储给定事务的消息。在这种方法中,我们可以让提交标记直接指向ledger。删除ledger类似于提交标记被删除时的删除消息。

Sidecar Approach的详细实现在“Broker - Transaction Buffer”一节中描述。

完整的设计草案

接下来是一份更详细完整的的设计草案,阅读起来要比上面简单,暂时没有翻译

从源码角度看 Apache Pulsar 中生产消费的整个流程

  • Apache Pulsar 一个分布式的发布订阅的消息系统
  • Producer 生产者
  • Consumer 消费者
  • Subscribe 订阅

摘要

本篇内容主要是从源码的角度完整的分析一遍 Apache Pulsar 中生产消费的整个流程,从而加深对 Pulsar 整体流程的学习。内容中虽然会涉及 netty 服务的相关内容,但是不是本文的重点。

环境准备

下载安装代码

git clone https://github.com/apache/pulsar
cd pulsar
mvn install -DskipTests

启动服务

./bin/pulsar standalone

开启消费

# 指定 topic ,指定订阅名称,阻塞住
./bin/pulsar-client consume persistent://public/default/my-topic --subscription-name my-sub2 --num-messages 0

生产消息

./bin/pulsar-client produce my-topic --messages "hello-pulsar"

以上,我们就完成了一次完整的生产消费,下面我们从源代码的角度看看 Pulsar 为我们做了什么。

本文涉及到的几个文件

服务端文件

在 Pulsar 服务端主要涉及到了三个模块,pulsar-broker、pulsar-common、managed-ledger

# Pulsar Broker 服务启动的入口,初始化各种组件
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
# 具体的各种逻辑处理的地方,里面有各种 handle 开头的函数,用来路由客户端发送过来的请求
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
# ServerCnx 的父类代码,与消费者保持心跳在这里处理
pulsar/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarHandler.java
# PulsarHandler 的父类代码,实现了 netty 下类 ChannelInboundHandlerAdapter 的 channelRead 方法,处理客户端发过来的各种命令
pulsar/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
# 服务端的生产者代码
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
# 服务端的消费中代码
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
# Topic 处理代码
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
# Subscription 相关代码
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
# 我们用到的独享模式下的相关代码
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
# 管理 ledger 的代码
pulsar/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
# ManagedLedgerFactoryImpl 对 ManagedLedgerImpl 进行操作
pulsar/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
# 操作 entry 的代码
pulsar/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java

客户端文件

# ./bin/pulsar-client produce 时,调用的代码
pulsar/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
./bin/pulsar-client consume 时,需要调用的代码
pulsar/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
# 发送前进行相应的命令拼装
pulsar/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java

流程

Pulsar Broker 初始化

这时候会进入 BrokerService 类,当前我们用到的,比较重要的是下面的两个调度器

// ledger 管理初始化
this.managedLedgerFactory = pulsar.getManagedLedgerFactory();
// 操作 Topic 相关
this.topicOrderedExecutor = OrderedScheduler.newSchedulerBuilder()
                .numThreads(pulsar.getConfiguration().getNumWorkerThreadsForNonPersistentTopic())
                .name("broker-topic-workers").build();
// 读写操作相关
final DefaultThreadFactory workersThreadFactory = new DefaultThreadFactory("pulsar-io");

接着进入 Start 方法,netty 服务初始化后的 handler 注册是在这里完成

bootstrap.childHandler(new PulsarChannelInitializer(pulsar, false));

ServerCnx 类注册是在 PulsarChannelInitializer 的 initChannel 方法中完成的

这样,当我们调用 ./bin/pulsar standalone 命令后,我们当前需要的一些组件就初始化好了

Consumer 开启订阅

Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName(this.subscriptionName)
                    .subscriptionType(subscriptionType).subscribe();

在 CmdConsume 中主要通过上面的这句开启订阅
类与方法的调用流程
ConsumerBuilderImpl(subscribe) -> ConsumerBuilderImpl(subscribeAsync) -> PulsarClientImpl(subscribeAsync)
-> PulsarClientImpl(singleTopicSubscribeAsync) -> PulsarClientImpl(singleTopicSubscribeAsync)
-> ConsumerImpl(newConsumerImpl) -> ConsumerImpl(ConsumerImpl) -> ConsumerImpl(grabCnx) -> ConnectionHandler(grabCnx)
-> ConsumerImpl(connectionOpened)
经过层层调用,最后会到达 connectionOpened 中:

ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel,
    consumerName, isDurable, startMessageIdData, metadata, readCompacted, InitialPosition.valueOf(subscriptionInitialPosition.getValue()), si);

在 newSubscribe 中会添加一个如下的类型信息,服务端会根据该类型信息进行相应的处理

ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.SUBSCRIBE).setSubscribe(subscribe));

接下来 Consumer 会把该拼装好的请求发送给 Broker ,然后阻塞住,等待消费确认消息:

PulsarClient client = clientBuilder.build();
Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName(this.subscriptionName)
        .subscriptionType(subscriptionType).subscribe();

RateLimiter limiter = (this.consumeRate > 0) ? RateLimiter.create(this.consumeRate) : null;
while (this.numMessagesToConsume == 0 || numMessagesConsumed < this.numMessagesToConsume) {
    if (limiter != null) {
        limiter.acquire();
    }

    Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
    if (msg == null) {
        LOG.debug("No message to consume after waiting for 5 seconds.");
    } else {
        numMessagesConsumed += 1;
        System.out.println(MESSAGE_BOUNDARY);
        String output = this.interpretMessage(msg, displayHex);
        System.out.println(output);
        consumer.acknowledge(msg);
    }
}
client.close();

Broker 对 subscribe 的处理

当有消费者订阅到来的时候,Broker 会进入 PulsarDecoder 的 channelRead 中:

case SUBSCRIBE:

  checkArgument(cmd.hasSubscribe());
  handleSubscribe(cmd.getSubscribe());
  cmd.getSubscribe().recycle();
  break;

在 PulsarDecoder 的子类 ServerCnx 中,实现了该函数

@Override
protected void handleSubscribe(final CommandSubscribe subscribe) {
  ...
  topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
                  subType, priorityLevel, consumerName, isDurable,
                  startMessageId, metadata, readCompacted, initialPosition);
  ...
}

在 PersistentTopic 类的 subscribe 中的主要逻辑如下:

// 生成 Consumer 对象,并添加到 subscription 中
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName,
                                                 maxUnackedMessages, cnx, cnx.getRole(), metadata, readCompacted, initialPosition);
subscription.addConsumer(consumer);

下面我看看下在 Subscription 类中,addConsumer 的逻辑:

@Override
public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
  ...
  // 三种订阅类型,独享、共享、Failover,这里我们用的是独享模式
  switch (consumer.subType()) {
    case Exclusive:
        if (dispatcher == null || dispatcher.getType() != SubType.Exclusive) {
            dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Exclusive, 0, topic);
        }
        break;
    case Shared:
        if (dispatcher == null || dispatcher.getType() != SubType.Shared) {
            dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursor);
        }
        break;
    case Failover:
        int partitionIndex = TopicName.getPartitionIndex(topicName);
        if (partitionIndex < 0) {
            // For non partition topics, assume index 0 to pick a predictable consumer
            partitionIndex = 0;
        }

        if (dispatcher == null || dispatcher.getType() != SubType.Failover) {
            dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover, partitionIndex,
                    topic);
        }
        break;
    default:
        throw new ServerMetadataException("Unsupported subscription type");
    }
  dispatcher.addConsumer(consumer);
  ...
}

在 PersistentDispatcherSingleActiveConsumer 中没有 addConsumer 这个方法,它会调用父类的 addConsumer 方法,主要是做一些判断,并且添加当前
处于活跃状态的 Consumer, 与今天的内容关系不大,先暂时放一下

以上 Consumer 就在该 Topic 上成功的以独享模式进行了订阅

生产者发布消息

Producer<byte[]> producer = client.newProducer().topic(topic).create();

List<byte[]> messageBodies = generateMessageBodies(this.messages, this.messageFileNames);
RateLimiter limiter = (this.publishRate > 0) ? RateLimiter.create(this.publishRate) : null;
for (int i = 0; i < this.numTimesProduce; i++) {
    for (byte[] content : messageBodies) {
        if (limiter != null) {
            limiter.acquire();
        }

        // 主要关注该方法
        producer.send(content);
        numMessagesSent++;
    }
}
client.close();

经过各种调用,进入 ProducerImpl 的 internalSendAsync 方法,然后再 sendAsync 中调用了 sendMessage ,然后再调用 Commands.newSend 方法,配置好要发送的类型:

ByteBufPair res = serializeCommandSendWithSize(BaseCommand.newBuilder().setType(Type.SEND).setSend(send),
                checksumType, messageData, payload);

下面就可以把消息成功地发送到 Broker 了

Broker 处理消息

当生产者发送过来的消息到达 Broker 后,Broker 会将其写入到 Bookie (这里我们主要关注 Persistent 类型的 Topic),当写入成功后,又会将其转发给先前订阅好的消费者,下面我们看下具体的处理过程

还是在 PulsarDecoder 的 channelRead 中,不过这次是 handleSend :

case SEND: {

  checkArgument(cmd.hasSend());

  // Store a buffer marking the content + headers
  ByteBuf headersAndPayload = buffer.markReaderIndex();
  handleSend(cmd.getSend(), headersAndPayload);
  cmd.getSend().recycle();
  break;
}

在其子类 ServerCnx 中实现了该 handleSend 方法:

// ServerCnx.java
@Override
protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
    ...
    producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload, send.getNumMessages());
    ...
}

// Producer.java
public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize) {
  ...
  topic.publishMessage(headersAndPayload,
                MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(), batchSize));
  ...
}

// PersistentTopic.java
@Override
public void publishMessage(ByteBuf headersAndPayload, PublishContext publishContext) {
    if (messageDeduplication.shouldPublishNextMessage(publishContext, headersAndPayload)) {
        // ledger 类型 org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl
        // 第二个参数是回调类,入下面的 asyncAddEntry,如果完成,就会回调 this.addComplete 方法
        ledger.asyncAddEntry(headersAndPayload, this, publishContext);
    } else {
      ...
    }
}

// 将来会被回调
// PersistentTopic.java
@Override
public void addComplete(Position pos, Object ctx) {
    PublishContext publishContext = (PublishContext) ctx;
    PositionImpl position = (PositionImpl) pos;

    // Message has been successfully persisted
    messageDeduplication.recordMessagePersisted(publishContext, position);
    publishContext.completed(null, position.getLedgerId(), position.getEntryId());
}

// ManagedLedgerImpl.java
@Override
public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx) {
    ...
    OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback, ctx);

    // Jump to specific thread to avoid contention from writers writing from different threads
    executor.executeOrdered(name, safeRun(() -> {
        pendingAddEntries.add(addOperation);
        internalAsyncAddEntry(addOperation);
    }));
}

// ManagedLedgerImpl.java
private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
  ...
  addOperation.initiate();
}

// OpAddEntry.java
public void initiate() {
    ...
    ledger.asyncAddEntry(duplicateBuffer, this, ctx);
}

// OpAddEntry.java
@Override
public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx) {

    if (rc != BKException.Code.OK) {
      ...
    } else {
        if(!checkAndCompleteTimeoutTask()) {
            return;
        }
        // Trigger addComplete callback in a thread hashed on the managed ledger name
        ml.getExecutor().executeOrdered(ml.getName(), this);
    }
}

// OpAddEntry.java
@Override
public void safeRun() {
  ...
  if (closeWhenDone) {
    ...
  } else {
      AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
      if (cb != null) {
          cb.addComplete(lastEntry, ctx);
          ml.notifyCursors();
          this.recycle();
      }
  }
}

现在我们来详细看下上面的调用:

  • OpAddEntry 中的 initiate 里的 ledger.asyncAddEntry 是调用的 bookkeeper 的接口,可以参考 Api ,看一下 org.apache.bookkeeper.client.LedgerHandle 类
  • ledger.asyncAddEntry 的第二个参数是回调函数,类型是 org.apache.bookkeeper.client.AsyncCallback.AddCallback
  • AddCallback 是个 interface,OpAddEntry 刚好实现了这个 interface,AddCallback 中的方法 addComplete 在 OpAddEntry 中被实现
  • asyncAddEntry 可以理解成操作完成,就会进行回调,从这上面的几个调用关系可以看到,这里的回调类是 this,就是上面的 addOperation
  • addOperation 的类型是 OpAddEntry,因此我们可以理解成当在 bookkeeper 中增加 entry 完成后,就会回调 OpAddEntry 的 addComplete 方法
  • ml 的类型是 ManagedLedgerImpl,ledger 的管理器
  • ml.getExecutor().executeOrdered 可以认为启动一个线程,因为该类继承了 SafeRunnable 类
  • 进入 safeRun 回调 PersistentTopic 类的 addComplete 方法
  • ml.notifyCursors() 进入转发消息给 Consumer 的逻辑
  • notifyCursors 中的 waitingCursor 第一次进来不会为空,会用一个线程进行处理,知道处理完毕,就会退出
  • 接着会调用 asyncReadEntries 读取 bookkeeper 上的 entry
  • 从 entryCache 上读取
  • 最后调用 Consumer 的 sendMessage 将消息发送给消费者
// ManagedLedgerImpl.java
void notifyCursors() {
  while (true) {
      final ManagedCursorImpl waitingCursor = waitingCursors.poll();
      log.info("waitingCursor {}", waitingCursor);
      if (waitingCursor == null) {
          break;
      }
      executor.execute(safeRun(() -> waitingCursor.notifyEntriesAvailable()));
  }
}

// ManagedCursorImpl.java
void notifyEntriesAvailable() {
  ...
  if (opReadEntry != null) {
      opReadEntry.readPosition = (PositionImpl) getReadPosition();
      ledger.asyncReadEntries(opReadEntry);
  } else {
      ...
  }
}

// ManagedCursorImpl.java
void asyncReadEntries(OpReadEntry opReadEntry) {
  ...
  if (currentLedger != null && ledgerId == currentLedger.getId()) {
      // Current writing ledger is not in the cache (since we don't want
      // it to be automatically evicted), and we cannot use 2 different
      // ledger handles (read & write)for the same ledger.
      internalReadFromLedger(currentLedger, opReadEntry);
  } else {
    
      // Get a ledger handle to read from
      getLedgerHandle(ledgerId).thenAccept(ledger -> {
          internalReadFromLedger(ledger, opReadEntry);
      }).exceptionally(ex -> {
        ...
      });
  }
}

// ManagedCursorImpl.java
private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) {
  asyncReadEntry(ledger, firstEntry, lastEntry, false, opReadEntry, opReadEntry.ctx);
}

protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, boolean isSlowestReader,
        OpReadEntry opReadEntry, Object ctx) {
    if (checkTimeout) {
        entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, isSlowestReader, readCallback, readOpCount);
    } else {
        entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, isSlowestReader, opReadEntry, ctx);
    }
}

// EntryCacheImpl.java
@Override
public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
        final ReadEntriesCallback callback, Object ctx) {
  try {
    asyncReadEntry0(lh, firstEntry, lastEntry, isSlowestReader, callback, ctx);
  } catch (Throwable t) {
    ...
  }
}

// EntryCacheImpl.java
private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
            final ReadEntriesCallback callback, Object ctx) {
              ...
              callback.readEntriesComplete((List) entriesToReturn, ctx);
              ...
}

// PersistentDispatcherSingleActiveConsumer.java
@Override
public void readEntriesComplete(final List<Entry> entries, Object obj) {
  topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, SafeRun.safeRun(() -> {
      internalReadEntriesComplete(entries, obj);
  }));
}

// PersistentDispatcherSingleActiveConsumer.java
public synchronized void internalReadEntriesComplete(final List<Entry> entries, Object obj) {
  if (currentConsumer == null || readConsumer != currentConsumer) {
  } else {
      currentConsumer.sendMessages(entries, (future, sentMsgInfo) -> {});
  }
}

// Consumer.java
public SendMessageInfo sendMessages(final List<Entry> entries, SendListener listener) {
    ...
    ctx.channel().eventLoop().execute(() -> {
        for (int i = 0; i < entries.size(); i++) {
            ctx.write(Commands.newMessage(consumerId, messageId, redeliveryCount, metadataAndPayload), promise);
        }
    });
    ...
}

以上基本上就完成了整个的消费者订阅,生产者发送消息,Broker 转发消息给消费者的整个流程,因为中间技术问题颇多,限于篇幅,有些问题没有时间展开叙述。

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.