EventPress: Topic 기반 EventBus 라이브러리
EventPress 는 Android/Kotlin 을 지원하는, Rx 친화적인 event bus 라이브러리입니다. RxJava 를 이용한 EventBus 라이브러리 중 유명했던 RxBus2 에서 많은 영감을 받았습니다. (MFlisar’s RxBus2)
Event bus 의 핵심은 이벤트 stream을 얼마나 효율적으로 관리하느냐이며, EventPress 는 이를 위해 MQTT protocol 에서 사용하는 것과 거의 유사한 토픽(Topic) 개념을 사용하고 있습니다.
토픽은 트리 형태의 계층 구조를 가지는 Event stream으로, 사용자가 자유롭게 토픽을 생성하고 해당 토픽에 이벤트를 publish 하거나 observe(subscribe) 할 수 있습니다.
.
EventPress 특징
- MQTT protocol 과 유사한 토픽(topic) 계층구조로 Event stream 들을 관리
- 각 토픽 (event stream)은 path 를 포함한 String으로 표현
예) /viewmodel/logic/stream1 - 토픽을 만들고 이벤트 “발행(publish)” 을 하면 해당 토픽을 “구독(observe/subscribe)”한 모든 옵저버(observer)에게 이벤트가 전달
- 또한 토픽 경로상 하위의 토픽들을 구독한 옵저버에게도 이벤트가 전달됨. (토픽을 삭제하면 하위 토픽도 삭제됨)
- RxKotlin/RxAndroid 를 사용하여 구현, EventStream 은 PublishProcessor 를 이용
- backpressure 정책과 valve 기능, observer 가 실행될 thread 지정을 위한 API 를 지원
- EventPress 는 작고 가벼운 라이브러리를 지향
.
라이브러리 import 방법 (JitPack 배포)
1. 프로젝트 레벨 build.gradle 에 아래 설정을 추가
allprojects { repositories { ... maven { url "https://jitpack.io" } } }
2. 모듈 레벨 build.gradle 에 dependency 추가
dependencies { implementation 'com.github.godstale:EventPress:<LATEST-VERSION>' }
최신 버전은 GitHub 에서 확인이 가능합니다.
.
사용법
목차
- 토픽(Topic)
- 테스트 코드
- Simple test
- 기본 사용법
- 이벤트 발행
- 토픽 삭제
- Builder 테스트
- Flow control
- EventHolder
- 주의사항
.
Topic
토픽은 이벤트 스트림을 표현하는 계층 구조로 표현됩니다. 각각의 토픽은 Linux 파일 시스템의 파일 경로처럼 ‘/’ 구분자를 이용해 표시합니다.
- /api
- /api/member
- /api/member/login
- /api/member/info
만약 위처럼 토픽 계층 구조를 만든다면, ‘/api/member’ 토픽에 이벤트를 발행했을 때 ‘/api/member’, ‘/api/member/login’, ‘/api/member/info’ 를 구독한 모든 옵저버에게 이벤트가 전달됩니다. (선택적으로 하위 토픽으로의 이벤트 발행을 막을수도 있습니다.)
EventPress 라이브러리는 Event stream 을 생성할 때 토픽의 이름을 자유롭게 사용할 수 있지만 지켜야할 규칙들이 몇 가지 있습니다.
- 토픽 경로(토픽 이름을 포함한 full path)는 반드시 ‘/’ 로 시작
- 토픽 경로는 ‘/’ 로 끝나서는 안됨
- 루트 토픽 ‘/’ 을 EventPress API 에 사용할 수 없음
- 공백 문자 사용할 수 없음
- 다음의 문자들만 사용 가능 [ . _ 0-9 a-z A-Z / – ]
- 토픽 경로는 최대 256 자
- 토픽 경로상의 각 토픽 이름은 비어있을 수 없음. (예: /api//test)
- /sys, /sys/class, /sys/ui, /sys/common 토픽은 시스템 토픽으로 예약되어 있으므로 사용불가 (하위 토픽은 가능)
- 와일드 카드 문자, 기능은 지원하지 않음
.
테스트 코드
아래 링크에서 MainActivity 코드를 보시면 EventPress 의 사용법을 코드로 확인하실 수 있습니다.
.
Simple test
EventPress API 를 사용하기 전 초기화가 필요합니다. (Application 클래스에서 호출하기를 권장)
EventPress.initialize()
단순히 observe(), publish() 를 호출하기만 해도 이벤트를 전달할 수 있습니다.
// Register observer EventPress.observe<String> { Log.d("###", " -->[/sys/common] event received = $it") }?.addTo(compositeDisposable) // Send message EventPress.publish("Hello world!!")
로그로 표시하기는 하지만 observe(), publish() 를 호출할 때 토픽을 지정하지는 않았습니다. observe() 를 호출할 때 토픽이 없는 경우 /sys/common 시스템 토픽으로 자동 바인딩됩니다. publish() 의 경우도 토픽이 지정되지 않으면 /sys/common 을 사용합니다.
단, 이 경우는 /sys/common 을 사용하는 모든 observer/publisher 가 동일한 object 타입을 사용한다는 암묵적인 원칙이 있습니다. observe() 가 호출될 때 타입 캐스팅에 사용할 클래스 타입(String) 을 지정했음을 주의하세요.
또한 observe() 가 리턴하는 disposable 도 필수적으로!! 사용자가 받아서 처리해줘야 합니다. (예제에서는 activity 가 종료될 때 처리하는 compositeDisposable에 등록하도록 addTo() 를 사용했습니다.)
.
기본 사용법
사용자가 직접 토픽을 만들고 이벤트를 발행하는 예제입니다.
val TOPIC_TEST_BASIC = "/test/basic" // make topic EventPress.builder() .setTopic(TOPIC_TEST_BASIC) .build() // Observer 1 EventPress.observe<String>(TOPIC_TEST_BASIC) { Log.d("###", " -->[$TOPIC_TEST_BASIC, #1] $it") }?.addTo(compositeDisposable) // Observer 2 EventPress.getTopicFlowable<String>(TOPIC_TEST_BASIC) .subscribe { Log.d("###", " -->[$TOPIC_TEST_BASIC, #2] $it") }?.addTo(compositeDisposable) // Send an event EventPress.publish(TOPIC_TEST_BASIC, "Hello world!!", false)
EventPress.builder() 를 호출해서 EventPressBuilder로 토픽(Event stream)을 생성할 수 있습니다.
하지만 위 예제에서 EventPressBuilder 로 토픽 생성하는 코드를 지워도 똑같이 동작합니다. EventPress는 observe() 가 호출될 때 해당 토픽이 없으면 자동으로 (기본 옵션으로) 생성하기 때문입니다.
예제에서는 2가지 방법으로 토픽을 생성합니다.
observe<>(topic) {} : EventPress 내부에서 관리중인 Flowable 객체에 subscribe() 하면서 지정된 lambda 함수를 호출하도록 해줍니다. Flowable 인스턴스가 노출되지 않으므로 Rx operator 를 사용할 수는 없지만 예상치 못한 에러에서 안전한 방법입니다.
getTopicFlowable<>() : EventPress 내부에서 관리중인 Flowable 객체를 직접 리턴합니다. Rx operator 를 적용할 수 있으면 다른 Rx stream 과 연동이 가능합니다. 하지만 여기서 적용된 Rx operator 가 다른 observer 에게도 영향을 미칠 수 있습니다. 이 경우는 직접 subscribe() 를 호출해줘야 합니다.
.
이벤트 발행
이벤트 발행 예제
val TOPIC_TEST_PUBLISH = "/test/pub" val TOPIC_TEST_PUBLISH1 = "/test/pub/depth1" val TOPIC_TEST_PUBLISH2A = "/test/pub/depth1/depth2a" val TOPIC_TEST_PUBLISH2B = "/test/pub/depth1/depth2b" // create topic [/test/pub] and observe EventPress.observe<String>(TOPIC_TEST_PUBLISH) { Log.d("###", " -->[$TOPIC_TEST_PUBLISH] $it") }?.addTo(compositeDisposable) // create topic [/test/pub/depth1] and observe EventPress.observe<String>("$TOPIC_TEST_PUBLISH1") { Log.d("###", " -->[$TOPIC_TEST_PUBLISH1] $it") }?.addTo(compositeDisposable) // create topic [/test/pub/depth1/depth2a] and observe EventPress.observe<String>("$TOPIC_TEST_PUBLISH2A") { Log.d("###", " -->[$TOPIC_TEST_PUBLISH2A] $it") }?.addTo(compositeDisposable) // create topic [/test/pub/depth1/depth2b] and observe EventPress.observe<String>("$TOPIC_TEST_PUBLISH2B") { Log.d("###", " -->[$TOPIC_TEST_PUBLISH2B] $it") }?.addTo(compositeDisposable) // send an event to a targeted topic only EventPress.publish(TOPIC_TEST_PUBLISH, "Hello world!! (Single)", false) // send and event to a topic and descendants EventPress.publish(TOPIC_TEST_PUBLISH, "Hello world!!")
EventPress.publish() 를 이용해서 이벤트를 발행합니다. recursive = true/false 파라미터를 이용하면 하위 토픽으로의 전파를 컨트롤 할 수 있습니다.
토픽 삭제
val TOPIC_TEST_REMOVE = "/test/pub" val TOPIC_TEST_REMOVE1 = "/test/pub/depth1" val TOPIC_TEST_REMOVE2A = "/test/pub/depth1/depth2a" val TOPIC_TEST_REMOVE2B = "/test/pub/depth1/depth2b" // create topic [/test/pub] and observe EventPress.observe<String>(TOPIC_TEST_REMOVE) { Log.d("###", " -->[$TOPIC_TEST_REMOVE] $it") }?.addTo(compositeDisposable) // create topic [/test/pub/depth1] and observe EventPress.observe<String>("$TOPIC_TEST_REMOVE1") { Log.d("###", " -->[$TOPIC_TEST_REMOVE1] $it") }?.addTo(compositeDisposable) // create topic [/test/pub/depth1/depth2a] and observe EventPress.observe<String>("$TOPIC_TEST_REMOVE2A") { Log.d("###", " -->[$TOPIC_TEST_REMOVE2A] $it") }?.addTo(compositeDisposable) // create topic [/test/pub/depth1/depth2b] and observe val disposable2b = EventPress.observe<String>("$TOPIC_TEST_REMOVE2B") { Log.d("###", " -->[$TOPIC_TEST_REMOVE2B] $it") } // publish test EventPress.publish(TOPIC_TEST_REMOVE, "Hello world!!") // check topic is alive Log.d("###", "[$TOPIC_TEST_REMOVE2B] Disposed = ${disposable2b?.isDisposed}") // remove a topic EventPress.remove(TOPIC_TEST_REMOVE) // send an event and check removed or not EventPress.publish(TOPIC_TEST_REMOVE, "Hello world!!")
EventPress.remove() 를 호출하면 해당 토픽과 하위 토픽을 모두 삭제합니다. 토픽을 삭제하면 event stream 이 종료되며 해당 토픽에서 리턴한 disposable 을 모두 dispose() 해줍니다. (단, Flowable 을 직접 subscribe() 한 경우는 제외)
Builder 테스트
EventPressBuilder 를 이용해 토픽을 직접 생성하면, observer 루틴이 실행될 thread 를 지정할 수 있습니다.
val TOPIC_TEST_BUILDER = "/test/builder/default" // EventPressBuilder uses computation thread as default. EventPress.builder() .setTopic(TOPIC_TEST_BUILDER) .build() // WARNING: this observer has a critical problem (touch UI in backgroun thread) EventPress.observe<String>(TOPIC_TEST_BUILDER) { Log.d("###", " -->[$TOPIC_TEST_BUILDER] $it") findViewById<TextView>(R.id.textMain).text = it }?.addTo(compositeDisposable) // If you want to receive event in UI thread, // use withScheduler(EventScheduler.Type.UI) EventPress.builder() .setTopic(MainActivity::class.java) .withScheduler(EventScheduler.Type.UI) .build() // tests each observer runs in UI thread EventPress.observe<String>(MainActivity::class.java) { Log.d("###", " -->[UI #1] $it") findViewById<TextView>(R.id.textMain).text = it }?.addTo(compositeDisposable) EventPress.observe<String>(MainActivity::class.java) { Log.d("###", " -->[UI #2] $it") findViewById<TextView>(R.id.textSub).text = it }?.addTo(compositeDisposable) EventPress.getTopicFlowable<String>(MainActivity::class.java) .subscribe { Log.d("###", " -->[UI #3] $it") findViewById<TextView>(R.id.textSub2).text = it }?.addTo(compositeDisposable) // Send messages in background thread Observable.intervalRange(1, 10, 0, 1000, TimeUnit.MILLISECONDS) .observeOn(Schedulers.io()) .subscribe({ EventPress.publish(MainActivity::class.java, "Hello world. count = $it") }, {}).addTo(compositeDisposable)
EventPressBuilder 체인에서 .withScheduler(EventScheduler.Type.UI) 를 추가하면 이벤트를 받았을 때 UI thread 에서 observer 가 실행됩니다. Scheduler를 지정하지 않으면 RxJava 의 computation thread 를 기본으로 사용합니다.
Flow control
Backpressure 정책과 valve 기능도 EventPressBuilder를 이용해서 적용할 수 있습니다.
// Make topic with backpressure strategy and valve control EventPress.builder() .setTopic(MainActivity::class.java) .withScheduler(EventScheduler.Type.UI) .withBackpressure(EventFlowControl.BpType.BUFFER) .withValve() .build() EventPress.observe<String>(MainActivity::class.java) { Log.d("###", " -->[UI #1] $it") findViewById<TextView>(R.id.textMain).text = it }?.addTo(compositeDisposable) EventPress.observe<String>(MainActivity::class.java) { Log.d("###", " -->[UI #2] $it") findViewById<TextView>(R.id.textSub).text = it }?.addTo(compositeDisposable) EventPress.getTopicFlowable<String>(MainActivity::class.java) .subscribe { Log.d("###", " -->[UI #3] $it") findViewById<TextView>(R.id.textSub2).text = it }?.addTo(compositeDisposable) // Send messages in background thread at every 1ms for 100000 times Observable.intervalRange(1, 100000, 0, 1, TimeUnit.MILLISECONDS) .observeOn(Schedulers.io()) .subscribe({ EventPress.publish(MainActivity::class.java, "Hello world. count = $it") }, {}).addTo(compositeDisposable) // switch valve at every 5sec for 100 times var valve = false Observable.intervalRange(1, 100, 1000, 5000, TimeUnit.MILLISECONDS) .observeOn(Schedulers.io()) .subscribe { EventPress.switchTopicValve(MainActivity::class.java, valve) valve = !valve }.addTo(compositeDisposable)
.withBackpressure() 와 .withValve() 를 이용해서 해당 기능을 설정할 수 있습니다. Backpressure 정책은 설정하면 해당 토픽이 종료될 때까지 자동 적용됩니다. Valve 기능을 설정하면 EventPress.switchTopicValve() 를 이용해서 observer 로의 이벤트 발행을 중지시키고 이벤트를 caching 할 수 있습니다.
예제에서는 1ms 마다 이벤트를 발행하고 3개의 observer 가 이벤트를 수신합니다. 그리고 밸브를 매 5초마다 on/off 전환합니다.
EventHolder
observe<object_type>(), getTopicFlowable<object_type>() 을 호출할 때 전달받을 데이터의 클래스 타입을 설정해야합니다. 전달받을 수 있는 object 는 하나이므로 토픽에 사용할 object 타입을 신중히 고려해야 합니다. 사용할 데이터 타입을 결정할 때 편리하도록 라이브러리에서는 EventHolder 클래스를 제공합니다. val TOPIC_TEST_EVENTHOLDER = "/test/eventholder" val eventType = 1 EventPress.observe<EventHolder<String>>(TOPIC_TEST_EVENTHOLDER) { Log.d("###", " -->[$TOPIC_TEST_EVENTHOLDER, #1] event = ${it.getEventType()}, ${it.getContentIfNotHandled()}") }?.addTo(compositeDisposable) EventPress.observe<EventHolder<String>>(TOPIC_TEST_EVENTHOLDER) { Log.d("###", " -->[$TOPIC_TEST_EVENTHOLDER, #2] event = ${it.getEventType()}, ${it.getContentIfNotHandled()}") }?.addTo(compositeDisposable) EventPress.observe<EventHolder<String>>(TOPIC_TEST_EVENTHOLDER) { Log.d("###", " -->[$TOPIC_TEST_EVENTHOLDER, #3] event = ${it.getEventType()}, ${it.getContentIfNotHandled()}") }?.addTo(compositeDisposable) Log.d("###", "[$TOPIC_TEST_EVENTHOLDER] Publish message") val eventHolder = EventHolder<String>(eventType, "Hello world!!") EventPress.publish(TOPIC_TEST_EVENTHOLDER, eventHolder, false)
EventHolder 는 Google iosched 예제에서 사용된 event wrapper 클래스와 거의 동일합니다. Event wrapper 에는 여러 observer에서 동일한 event object 에 접근하는 경우 중복 실행을 막기 위한 장치가 마련되어 있습니다. 자세한 내용은 아래 링크를 참조하세요.
주의사항
토픽을 구독할 때 사용하는 EventPress.observe(), EventPress.getTopicFlowable() 은 disposable 인스턴스를 리턴합니다. 해당 구독 행위에 대한 구독해제는 반드시 disposable.dispose() 를 이용해서 직접 해줘야 합니다.
이를 놓치는 경우 (특히 /sys/xxx 같은 시스템 토픽을 사용한 경우) 앱이 종료될 때 까지 해당 참조가 해제되지 못하는 문제가 발생할 수 있습니다.
EventPress 에 대한 제언이나 에러 리포트가 있으시면 아래에서 부탁드립니다.