Goでfan-inとfan-outを実装する

Goのchannelとgoroutineを活用したfan-in/fan-outパターンを実装し、複数入力を集約・分散する並行処理設計を解説。

Read in: en
Goでfan-inとfan-outを実装する

概要

並行処理のパターンであるfan-in、fan-outをGoで実装する。

fan-in/fan-outとは

fan-inは、複数の入力を1つにまとめる処理で、fan-outは、1つの入力を複数に分ける処理である。

fan-inはデータを集約させ、fan-outはデータを分散させる。

Goではchannelとgoroutineを使って実現することができる。

実装

ソースコードはgithubにも置いてある。

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

func producer(id int, out chan<- int) {
	for i := 0; i < 5; i++ {
		value := rand.Intn(100)
		fmt.Printf("Producer %d: Sending %d\n", id, value)
		out <- value
		time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
	}
	close(out)
}

func fanIn(inputs []<-chan int, out chan<- int) {
	var wg sync.WaitGroup
	wg.Add(len(inputs))

	for _, input := range inputs {
		go func(ch <-chan int) {
			for value := range ch {
				out <- value
			}
			wg.Done()
		}(input)
	}

	go func() {
		wg.Wait()
		close(out)
	}()
}

func main() {
	rand.Seed(time.Now().UnixNano())

	// Fan-Out
	numProducers := 3
	inputs := make([]chan int, numProducers)
	for i := 0; i < numProducers; i++ {
		inputs[i] = make(chan int)
		go producer(i+1, inputs[i])
	}

	// Convert channels to <-chan int
	inputChans := make([]<-chan int, numProducers)
	for i := 0; i < numProducers; i++ {
		inputChans[i] = inputs[i]
	}

	// Fan-In
	result := make(chan int)
	go fanIn(inputChans, result)

	// Consume the merged values
	for value := range result {
		fmt.Printf("Consumer: Received %d\n", value)
	}

	fmt.Println("All done!")
}

fan-outの処理でデータを分散して、fan-inの処理でデータを集約している。

所感

並行処理は自身がないので勉強しないといけない。。。

参考

Tags: fan-in fan-out
Share: 𝕏 Post Facebook Hatena
✏️ View source / Discuss on GitHub
☕ サポート

このブログを応援していただける方は、以下からサポートをお願いします。いただいたサポートはブログ運営・技術研鑽に活用します。


関連記事