事件流与事件溯源

事件流是持续捕获和存储系统中发生的事件的过程。这些事件可以实时处理和分析,也可以存储以供后续分析。

事件流是持续捕获和存储系统中发生的事件的过程。这些事件可以实时处理和分析,也可以存储以供后续分析。

事件流与事件溯源

事件流和事件溯源是事件驱动架构中两个相关但不同的概念。

事件流是持续捕获和存储系统中发生的事件的过程。这些事件可以实时处理和分析,也可以存储以供后续分析。事件流通常用于需要实时处理大量数据的系统,如金融交易系统或社交媒体平台。

以下是使用流行的Kafka消息系统在Go中进行事件流处理的简单示例:

package main

import (
    "context"
    "fmt"
    "github.com/segmentio/kafka-go"
)

func main() {
    // 设置Kafka生产者以将事件发送到主题
    writer := kafka.NewWriter(kafka.WriterConfig{
        Brokers: []string{"localhost:9092"},
        Topic:   "my-topic",
    })

    // 发送一些事件到主题
    writer.WriteMessages(context.Background(),
        kafka.Message{
            Key:   []byte("key1"),
            Value: []byte("value1"),
        },
        kafka.Message{
            Key:   []byte("key2"),
            Value: []byte("value2"),
        },
    )

    // 设置Kafka消费者以从主题读取事件
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers: []string{"localhost:9092"},
        Topic:   "my-topic",
    })

    // 从主题读取事件
    for {
        msg, err := reader.ReadMessage(context.Background())
        if err != nil {
            break
        }
        fmt.Printf("Received message: key=%s, value=%s\n", string(msg.Key), string(msg.Value))
    }
}

而事件溯源是一种构建系统的模式,将应用程序状态的所有变化存储为事件序列。这些事件然后可以用于在任何时间点重建应用程序的状态。事件溯源通常用于需要可审计性、可追溯性或合规性的系统,如金融系统或医疗系统。

以下是在Go中使用内存事件存储进行事件溯源的简单示例:


package main

import (
    "fmt"
)

type Event struct {
    Type string
    Data interface{}
}

type EventStore struct {
    events []Event
}

func (store *EventStore) Append(event Event) {
    store.events = append(store.events, event)
}

func (store *EventStore) GetEvents() []Event {
    return store.events
}

type Account struct {
    id      string
    balance int
    store   *EventStore
}

func NewAccount(id string, store *EventStore) *Account {
    return &Account{
        id:      id,
        balance: 0,
        store:   store,
    }
}

func (account *Account) Deposit(amount int) {
    event := Event{
        Type: "deposit",
        Data: amount,
    }
    account.store.Append(event)
    account.balance += amount
}

func (account *Account) Withdraw(amount int) {
    if account.balance >= amount {
        event := Event{
            Type: "withdraw",
            Data: amount,
        }
        account.store.Append(event)
        account.balance -= amount
    }
}

func (account *Account) GetBalance() int {
    return account.balance
}

func main() {
    store := &EventStore{}
    account := NewAccount("123", store)

    account.Deposit(100)
    account.Withdraw(50)
    account.Deposit(25)

    events := store.GetEvents()
    for _, event := range events {
        switch event.Type {
        case "deposit":
            amount := event.Data.(int)
            fmt.Printf("Deposited %d\n", amount)
        case "withdraw":
            amount := event.Data.(int)
            fmt.Printf("Withdrew %d\n", amount)
        }
    }

    fmt.Printf("Final balance: %d\n", account.GetBalance())
}

事件溯源是通过将每个对聚合的修改记录为事件并将其追加到连续流中的一种方法。要重建聚合的最终状态,需要按顺序读取这些事件,然后将其应用于聚合。这与在创建、读取、更新和删除(CRUD)系统中执行的即时修改形成对比。在CRUD系统中,对记录状态的任何更改都存储在数据库中,实质上覆盖了同

一聚合的先前版本。

事件流与事件溯源

一旦价格变化已保存到Products表中,只更新了价格本身,而行的其余部分保持不变。然而,如图5.1所示,这种方法导致了先前价格和更改背后的上下文的丢失。

为了保留不仅新价格还包括关键元数据(如调整原因)的信息,将更改记录为Events表中的事件。先前的价格在先前事件中保持不变,以便在需要时检索。

为了实现有效的事件溯源,建议使用提供强大一致性保证并使用乐观并发控制的事件存储。在实践中,这意味着当多个修改同时发生时,只有初始修改才能附加到流中。随后的修改可能需要重试或可能会失败。

©本文为清一色官方代发,观点仅代表作者本人,与清一色无关。清一色对文中陈述、观点判断保持中立,不对所包含内容的准确性、可靠性或完整性提供任何明示或暗示的保证。本文不作为投资理财建议,请读者仅作参考,并请自行承担全部责任。文中部分文字/图片/视频/音频等来源于网络,如侵犯到著作权人的权利,请与我们联系(微信/QQ:1074760229)。转载请注明出处:清一色财经

(0)
打赏 微信扫码打赏 微信扫码打赏 支付宝扫码打赏 支付宝扫码打赏
清一色的头像清一色管理团队
上一篇 2024年2月1日 23:02
下一篇 2024年2月1日 23:03

相关推荐

发表评论

登录后才能评论

联系我们

在线咨询:1643011589-QQbutton

手机:13798586780

QQ/微信:1074760229

QQ群:551893940

工作时间:工作日9:00-18:00,节假日休息

关注微信