EventPress: Topic 기반 EventBus 라이브러리

EventPress 는 Android/Kotlin 을 지원하는, Rx 친화적인 event bus 라이브러리입니다. RxJava 를 이용한 EventBus 라이브러리 중 유명했던 RxBus2 에서 많은 영감을 받았습니다. (MFlisar’s RxBus2)

Event bus 의 핵심은 이벤트 stream을 얼마나 효율적으로 관리하느냐이며, EventPress 는 이를 위해 MQTT protocol 에서 사용하는 것과 거의 유사한 토픽(Topic) 개념을 사용하고 있습니다.

토픽은 트리 형태의 계층 구조를 가지는 Event stream으로, 사용자가 자유롭게 토픽을 생성하고 해당 토픽에 이벤트를 publish 하거나 observe(subscribe) 할 수 있습니다.

.

EventPress 특징

  • MQTT protocol 과 유사한 토픽(topic) 계층구조로 Event stream 들을 관리
  • 각 토픽 (event stream)은 path 를 포함한 String으로 표현
    예) /viewmodel/logic/stream1
  • 토픽을 만들고 이벤트 “발행(publish)” 을 하면 해당 토픽을 “구독(observe/subscribe)”한 모든 옵저버(observer)에게 이벤트가 전달
  • 또한 토픽 경로상 하위의 토픽들을 구독한 옵저버에게도 이벤트가 전달됨. (토픽을 삭제하면 하위 토픽도 삭제됨)
  • RxKotlin/RxAndroid 를 사용하여 구현, EventStream 은 PublishProcessor 를 이용
  • backpressure 정책과 valve 기능, observer 가 실행될 thread 지정을 위한 API 를 지원
  • EventPress 는 작고 가벼운 라이브러리를 지향

.

라이브러리 import 방법 (JitPack 배포)

1. 프로젝트 레벨 build.gradle 에 아래 설정을 추가

allprojects {
    repositories {
        ...
        maven { url "https://jitpack.io" }
    }
}

2. 모듈 레벨 build.gradle 에 dependency 추가

dependencies {
	implementation 'com.github.godstale:EventPress:<LATEST-VERSION>'
}


최신 버전은 GitHub 에서 확인이 가능합니다.

.

사용법

목차

  • 토픽(Topic)
  • 테스트 코드
  • Simple test
  • 기본 사용법
  • 이벤트 발행
  • 토픽 삭제
  • Builder 테스트
  • Flow control
  • EventHolder
  • 주의사항

.

Topic

토픽은 이벤트 스트림을 표현하는 계층 구조로 표현됩니다. 각각의 토픽은 Linux 파일 시스템의 파일 경로처럼 ‘/’ 구분자를 이용해 표시합니다.

  • /api
  • /api/member
  • /api/member/login
  • /api/member/info

만약 위처럼 토픽 계층 구조를 만든다면, ‘/api/member’ 토픽에 이벤트를 발행했을 때 ‘/api/member’, ‘/api/member/login’, ‘/api/member/info’ 를 구독한 모든 옵저버에게 이벤트가 전달됩니다. (선택적으로 하위 토픽으로의 이벤트 발행을 막을수도 있습니다.)

EventPress 라이브러리는 Event stream 을 생성할 때 토픽의 이름을 자유롭게 사용할 수 있지만 지켜야할 규칙들이 몇 가지 있습니다.

  • 토픽 경로(토픽 이름을 포함한 full path)는 반드시 ‘/’ 로 시작
  • 토픽 경로는 ‘/’ 로 끝나서는 안됨
  • 루트 토픽 ‘/’ 을 EventPress API 에 사용할 수 없음
  • 공백 문자 사용할 수 없음
  • 다음의 문자들만 사용 가능 [ . _ 0-9 a-z A-Z / – ]
  • 토픽 경로는 최대 256 자
  • 토픽 경로상의 각 토픽 이름은 비어있을 수 없음. (예: /api//test)
  • /sys, /sys/class, /sys/ui, /sys/common 토픽은 시스템 토픽으로 예약되어 있으므로 사용불가 (하위 토픽은 가능)
  • 와일드 카드 문자, 기능은 지원하지 않음

.

테스트 코드

아래 링크에서 MainActivity 코드를 보시면 EventPress 의 사용법을 코드로 확인하실 수 있습니다.

.

Simple test

EventPress API 를 사용하기 전 초기화가 필요합니다. (Application 클래스에서 호출하기를 권장)

EventPress.initialize()

단순히 observe(), publish() 를 호출하기만 해도 이벤트를 전달할 수 있습니다.

// Register observer
EventPress.observe<String> {
	Log.d("###", "  -->[/sys/common] event received = $it")
}?.addTo(compositeDisposable)

// Send message
EventPress.publish("Hello world!!")

로그로 표시하기는 하지만 observe(), publish() 를 호출할 때 토픽을 지정하지는 않았습니다. observe() 를 호출할 때 토픽이 없는 경우 /sys/common 시스템 토픽으로 자동 바인딩됩니다. publish() 의 경우도 토픽이 지정되지 않으면 /sys/common 을 사용합니다.

단, 이 경우는 /sys/common 을 사용하는 모든 observer/publisher 가 동일한 object 타입을 사용한다는 암묵적인 원칙이 있습니다. observe() 가 호출될 때 타입 캐스팅에 사용할 클래스 타입(String) 을 지정했음을 주의하세요.

또한 observe() 가 리턴하는 disposable 도 필수적으로!! 사용자가 받아서 처리해줘야 합니다. (예제에서는 activity 가 종료될 때 처리하는 compositeDisposable에 등록하도록 addTo() 를 사용했습니다.)

.

기본 사용법

사용자가 직접 토픽을 만들고 이벤트를 발행하는 예제입니다.

val TOPIC_TEST_BASIC = "/test/basic"

// make topic
EventPress.builder()
    .setTopic(TOPIC_TEST_BASIC)
    .build()

// Observer 1
EventPress.observe<String>(TOPIC_TEST_BASIC) {
    Log.d("###", "  -->[$TOPIC_TEST_BASIC, #1] $it")
}?.addTo(compositeDisposable)

// Observer 2
EventPress.getTopicFlowable<String>(TOPIC_TEST_BASIC)
    .subscribe {
        Log.d("###", "  -->[$TOPIC_TEST_BASIC, #2] $it")
    }?.addTo(compositeDisposable)

// Send an event
EventPress.publish(TOPIC_TEST_BASIC, "Hello world!!", false)

EventPress.builder() 를 호출해서 EventPressBuilder로 토픽(Event stream)을 생성할 수 있습니다.

하지만 위 예제에서 EventPressBuilder 로 토픽 생성하는 코드를 지워도 똑같이 동작합니다. EventPress는 observe() 가 호출될 때 해당 토픽이 없으면 자동으로 (기본 옵션으로) 생성하기 때문입니다.

예제에서는 2가지 방법으로 토픽을 생성합니다.

observe<>(topic) {} : EventPress 내부에서 관리중인 Flowable 객체에 subscribe() 하면서 지정된 lambda 함수를 호출하도록 해줍니다. Flowable 인스턴스가 노출되지 않으므로 Rx operator 를 사용할 수는 없지만 예상치 못한 에러에서 안전한 방법입니다.

getTopicFlowable<>() : EventPress 내부에서 관리중인 Flowable 객체를 직접 리턴합니다. Rx operator 를 적용할 수 있으면 다른 Rx stream 과 연동이 가능합니다. 하지만 여기서 적용된 Rx operator 가 다른 observer 에게도 영향을 미칠 수 있습니다. 이 경우는 직접 subscribe() 를 호출해줘야 합니다.
.

이벤트 발행


이벤트 발행 예제

val TOPIC_TEST_PUBLISH = "/test/pub"
val TOPIC_TEST_PUBLISH1 = "/test/pub/depth1"
val TOPIC_TEST_PUBLISH2A = "/test/pub/depth1/depth2a"
val TOPIC_TEST_PUBLISH2B = "/test/pub/depth1/depth2b"

// create topic [/test/pub] and observe
EventPress.observe<String>(TOPIC_TEST_PUBLISH) {
    Log.d("###", "  -->[$TOPIC_TEST_PUBLISH] $it")
}?.addTo(compositeDisposable)

// create topic [/test/pub/depth1] and observe
EventPress.observe<String>("$TOPIC_TEST_PUBLISH1") {
    Log.d("###", "  -->[$TOPIC_TEST_PUBLISH1] $it")
}?.addTo(compositeDisposable)

// create topic [/test/pub/depth1/depth2a] and observe
EventPress.observe<String>("$TOPIC_TEST_PUBLISH2A") {
    Log.d("###", "  -->[$TOPIC_TEST_PUBLISH2A] $it")
}?.addTo(compositeDisposable)

// create topic [/test/pub/depth1/depth2b] and observe
EventPress.observe<String>("$TOPIC_TEST_PUBLISH2B") {
    Log.d("###", "  -->[$TOPIC_TEST_PUBLISH2B] $it")
}?.addTo(compositeDisposable)

// send an event to a targeted topic only
EventPress.publish(TOPIC_TEST_PUBLISH, "Hello world!! (Single)", false)

// send and event to a topic and descendants
EventPress.publish(TOPIC_TEST_PUBLISH, "Hello world!!")

EventPress.publish() 를 이용해서 이벤트를 발행합니다. recursive = true/false 파라미터를 이용하면 하위 토픽으로의 전파를 컨트롤 할 수 있습니다.

토픽 삭제

val TOPIC_TEST_REMOVE = "/test/pub"
val TOPIC_TEST_REMOVE1 = "/test/pub/depth1"
val TOPIC_TEST_REMOVE2A = "/test/pub/depth1/depth2a"
val TOPIC_TEST_REMOVE2B = "/test/pub/depth1/depth2b"

// create topic [/test/pub] and observe
EventPress.observe<String>(TOPIC_TEST_REMOVE) {
    Log.d("###", "  -->[$TOPIC_TEST_REMOVE] $it")
}?.addTo(compositeDisposable)

// create topic [/test/pub/depth1] and observe
EventPress.observe<String>("$TOPIC_TEST_REMOVE1") {
    Log.d("###", "  -->[$TOPIC_TEST_REMOVE1] $it")
}?.addTo(compositeDisposable)

// create topic [/test/pub/depth1/depth2a] and observe
EventPress.observe<String>("$TOPIC_TEST_REMOVE2A") {
    Log.d("###", "  -->[$TOPIC_TEST_REMOVE2A] $it")
}?.addTo(compositeDisposable)

// create topic [/test/pub/depth1/depth2b] and observe
val disposable2b = EventPress.observe<String>("$TOPIC_TEST_REMOVE2B") {
    Log.d("###", "  -->[$TOPIC_TEST_REMOVE2B] $it")
}

// publish test
EventPress.publish(TOPIC_TEST_REMOVE, "Hello world!!")

// check topic is alive
Log.d("###", "[$TOPIC_TEST_REMOVE2B] Disposed = ${disposable2b?.isDisposed}")

// remove a topic
EventPress.remove(TOPIC_TEST_REMOVE)

// send an event and check removed or not
EventPress.publish(TOPIC_TEST_REMOVE, "Hello world!!")

EventPress.remove() 를 호출하면 해당 토픽과 하위 토픽을 모두 삭제합니다. 토픽을 삭제하면 event stream 이 종료되며 해당 토픽에서 리턴한 disposable 을 모두 dispose() 해줍니다. (단, Flowable 을 직접 subscribe() 한 경우는 제외)


Builder 테스트

EventPressBuilder 를 이용해 토픽을 직접 생성하면, observer 루틴이 실행될 thread 를 지정할 수 있습니다.

val TOPIC_TEST_BUILDER = "/test/builder/default"

// EventPressBuilder uses computation thread as default.
EventPress.builder()
    .setTopic(TOPIC_TEST_BUILDER)
    .build()

// WARNING: this observer has a critical problem (touch UI in backgroun thread)
EventPress.observe<String>(TOPIC_TEST_BUILDER) {
    Log.d("###", "  -->[$TOPIC_TEST_BUILDER] $it")
    findViewById<TextView>(R.id.textMain).text = it
}?.addTo(compositeDisposable)

// If you want to receive event in UI thread,
// use withScheduler(EventScheduler.Type.UI)
EventPress.builder()
    .setTopic(MainActivity::class.java)
    .withScheduler(EventScheduler.Type.UI)
    .build()

// tests each observer runs in UI thread
EventPress.observe<String>(MainActivity::class.java) {
    Log.d("###", "  -->[UI #1] $it")
    findViewById<TextView>(R.id.textMain).text = it
}?.addTo(compositeDisposable)

EventPress.observe<String>(MainActivity::class.java) {
    Log.d("###", "  -->[UI #2] $it")
    findViewById<TextView>(R.id.textSub).text = it
}?.addTo(compositeDisposable)

EventPress.getTopicFlowable<String>(MainActivity::class.java)
    .subscribe {
        Log.d("###", "  -->[UI #3] $it")
        findViewById<TextView>(R.id.textSub2).text = it
    }?.addTo(compositeDisposable)

// Send messages in background thread
Observable.intervalRange(1, 10, 0, 1000, TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.io())
    .subscribe({
        EventPress.publish(MainActivity::class.java, "Hello world. count = $it")
    }, {}).addTo(compositeDisposable)

EventPressBuilder 체인에서 .withScheduler(EventScheduler.Type.UI) 를 추가하면 이벤트를 받았을 때 UI thread 에서 observer 가 실행됩니다. Scheduler를 지정하지 않으면 RxJava 의 computation thread 를 기본으로 사용합니다.

Flow control

Backpressure 정책과 valve 기능도 EventPressBuilder를 이용해서 적용할 수 있습니다.

// Make topic with backpressure strategy and valve control
EventPress.builder()
    .setTopic(MainActivity::class.java)
    .withScheduler(EventScheduler.Type.UI)
    .withBackpressure(EventFlowControl.BpType.BUFFER)
    .withValve()
    .build()

EventPress.observe<String>(MainActivity::class.java) {
    Log.d("###", "  -->[UI #1] $it")
    findViewById<TextView>(R.id.textMain).text = it
}?.addTo(compositeDisposable)

EventPress.observe<String>(MainActivity::class.java) {
    Log.d("###", "  -->[UI #2] $it")
    findViewById<TextView>(R.id.textSub).text = it
}?.addTo(compositeDisposable)

EventPress.getTopicFlowable<String>(MainActivity::class.java)
    .subscribe {
        Log.d("###", "  -->[UI #3] $it")
        findViewById<TextView>(R.id.textSub2).text = it
    }?.addTo(compositeDisposable)

// Send messages in background thread at every 1ms for 100000 times
Observable.intervalRange(1, 100000, 0, 1, TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.io())
    .subscribe({
        EventPress.publish(MainActivity::class.java,
            "Hello world. count = $it")
    }, {}).addTo(compositeDisposable)

// switch valve at every 5sec for 100 times
var valve = false
Observable.intervalRange(1, 100, 1000, 5000, TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.io())
    .subscribe {
        EventPress.switchTopicValve(MainActivity::class.java, valve)
        valve = !valve
    }.addTo(compositeDisposable)

.withBackpressure() 와 .withValve() 를 이용해서 해당 기능을 설정할 수 있습니다. Backpressure 정책은 설정하면 해당 토픽이 종료될 때까지 자동 적용됩니다. Valve 기능을 설정하면 EventPress.switchTopicValve() 를 이용해서 observer 로의 이벤트 발행을 중지시키고 이벤트를 caching 할 수 있습니다.

예제에서는 1ms 마다 이벤트를 발행하고 3개의 observer 가 이벤트를 수신합니다. 그리고 밸브를 매 5초마다 on/off 전환합니다.

EventHolder

observe<object_type>(), getTopicFlowable<object_type>() 을 호출할 때 전달받을 데이터의 클래스 타입을 설정해야합니다. 전달받을 수 있는 object 는 하나이므로 토픽에 사용할 object 타입을 신중히 고려해야 합니다. 사용할 데이터 타입을 결정할 때 편리하도록 라이브러리에서는 EventHolder 클래스를 제공합니다.

val TOPIC_TEST_EVENTHOLDER = "/test/eventholder"
val eventType = 1

EventPress.observe<EventHolder<String>>(TOPIC_TEST_EVENTHOLDER) {
    Log.d("###", "  -->[$TOPIC_TEST_EVENTHOLDER, #1] event = ${it.getEventType()}, ${it.getContentIfNotHandled()}")
}?.addTo(compositeDisposable)

EventPress.observe<EventHolder<String>>(TOPIC_TEST_EVENTHOLDER) {
    Log.d("###", "  -->[$TOPIC_TEST_EVENTHOLDER, #2] event = ${it.getEventType()}, ${it.getContentIfNotHandled()}")
}?.addTo(compositeDisposable)

EventPress.observe<EventHolder<String>>(TOPIC_TEST_EVENTHOLDER) {
    Log.d("###", "  -->[$TOPIC_TEST_EVENTHOLDER, #3] event = ${it.getEventType()}, ${it.getContentIfNotHandled()}")
}?.addTo(compositeDisposable)

Log.d("###", "[$TOPIC_TEST_EVENTHOLDER] Publish message")
val eventHolder = EventHolder<String>(eventType, "Hello world!!")
EventPress.publish(TOPIC_TEST_EVENTHOLDER, eventHolder, false)

EventHolder 는 Google iosched 예제에서 사용된 event wrapper 클래스와 거의 동일합니다. Event wrapper 에는 여러 observer에서 동일한 event object 에 접근하는 경우 중복 실행을 막기 위한 장치가 마련되어 있습니다. 자세한 내용은 아래 링크를 참조하세요.


주의사항

토픽을 구독할 때 사용하는 EventPress.observe(), EventPress.getTopicFlowable() 은 disposable 인스턴스를 리턴합니다. 해당 구독 행위에 대한 구독해제는 반드시 disposable.dispose() 를 이용해서 직접 해줘야 합니다.

이를 놓치는 경우 (특히 /sys/xxx 같은 시스템 토픽을 사용한 경우) 앱이 종료될 때 까지 해당 참조가 해제되지 못하는 문제가 발생할 수 있습니다.

EventPress 에 대한 제언이나 에러 리포트가 있으시면 아래에서 부탁드립니다.



You may also like...