obo obo - 3 months ago 24
Swift Question

How to combine two observables in Reactive Extensions in order to paginate results?

I am trying to develop a pagination system in an iOS app using RxSwift. The use case is simple: the user can enter text in a search field and the app performs a request that is paginated. When he changes the value, a new request is performed on the first page (that means the value of the observable must be reseted to 1). If the user clears the search field (or enters a text with less than 2 characters), the list of results is cleared and the current page is reset. The next page is fetched when the user scrolls to the bottom of the list.

This is not a swift or iOS specific case and I suppose it could be written in an identical way using RxKotlin or RxJs or any other reactive extension.

My current attempt is to set up an observable for the text and a observable for the current page and to combine them in order to perform the request with both parameters.
I already succeeded in doing exactly what I am looking for but using global attributes for storing the current query and the current page. I would like to find a way to use only the values emitted by the observables without having to maintain them (I guess the code will be more concise and easy to read and understand).

Here is my current code :

// self.nextPage is a Variable<Int>
let moreObs: Observable<Int> = self.nextPage.asObservable()
.distinctUntilChanged() // Emit only if page has changed.

// self.searchTextObservable is a PublishedSubject<String> that receives the values from the textfield
let searchObs: Observable<String> = self.searchTextObservable
.throttle(0.4, scheduler: MainScheduler.instance) // Wait 400ms when the user stops writing.
.distinctUntilChanged() // Emit only if query has changed.

self.resultsObservable = Observable
.combineLatest(searchObs, moreObs) { query, page in
return ["q": query, "p": "\(page)"]
}
.subscribeOn(MainScheduler.instance) // Emit on main thread.
.observeOn(ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .Background)) // Perform on background thread.
.map { params in
if params["q"]!.characters.count > 2 {
return params
}
return [:]
}
.flatMap { params in
return params.isEmpty ?
Observable.of([]) :
self.search(params)
}
.map { results in
if results.count > 0 {
self.results.appendContentsOf(results)
} else {
self.results = []
}
return self.results
}


So far, the only feature that does not work is the reset operation on the nextPage's value. If I force it to
1
when the searchObs emits:

let searchObs: Observable<String> = self.searchTextObservable
.throttle(0.4, scheduler: MainScheduler.instance) // Wait 400ms when the user stops writing.
.distinctUntilChanged() // Emit only if query has changed.
.map {query in
self.nextPage.value = 1
return query
}


Then I have 2 requests that are performed.

Am I misusing Rx?

Answer

I wouldn't use combineLatest. Your page number is dependent on your current search text, so you should chain it with flatMapLatest. That way, you aren't responsible to maintain its state yourself, rather, let operator chaining reset that for you.

let disposeBag = DisposeBag()

let searchText = PublishSubject<String>()  // search field text
let newPageNeeded = PublishSubject<Void>() // fires when a new page is needed

struct RequestPage {
    let query: String
    let page: Int
}

let requestNeeded = searchText.asObservable()
    .flatMapLatest { text in
        newPageNeeded.asObservable()
            .startWith(())
            .scan(RequestPage(query: text, page: 0)) { request, _ in
                return RequestPage(query: text, page: request.page + 1)
            }
    }

requestNeeded
    .subscribeNext { print($0) }
    .addDisposableTo(disposeBag)

searchText.onNext("A")

searchText.onNext("B")
newPageNeeded.onNext(())

searchText.onNext("C")
newPageNeeded.onNext(())
newPageNeeded.onNext(())

This will output:

(RequestPage #1)(query: "A", page: 1)
(RequestPage #1)(query: "B", page: 1)
(RequestPage #1)(query: "B", page: 2)
(RequestPage #1)(query: "C", page: 1)
(RequestPage #1)(query: "C", page: 2)
(RequestPage #1)(query: "C", page: 3)