android– 使用retrofit2和rx java2发送高容量POSTS时出现OutOfMemoryException

android– 使用retrofit2和rx java2发送高容量POSTS时出现OutOfMemoryException,第1张

概述我有一个带有本地数据库(房间)的应用程序和一个服务,该服务使用改造2和rxjava对来自数据库的所有“事件”进行POST.当我发送大量的POST(即1500)时,应用程序抛出OutOfMemoryException.我认为这是因为每次客户端发送新POST时它都会启动一个新线程.有没有办法阻止改造/rxJava创建这么

我有一个带有本地数据库(房间)的应用程序和一个服务,该服务使用改造2和rxjava对来自数据库的所有“事件”进行POST.当我发送大量的POST(即1500)时,应用程序抛出OutOfMemoryException.我认为这是因为每次客户端发送新POST时它都会启动一个新线程.有没有办法阻止改造/ rxJava创建这么多线程?或者等待服务器响应是否更好?这是我的代码:

从本地db检索所有事件的类

public class RetreiveDbContent {private final EventDatabase eventDatabase;public RetreiveDbContent(EventDatabase eventDatabase) {    this.eventDatabase = eventDatabase;}@OverrIDepublic Maybe<List<Event>> eventsList() { return eventDatabase.eventDao().getAllEvents()            .subscribeOn(Schedulers.io())            .observeOn(AndroIDSchedulers.mainThread());}}

接下来,我有一个服务,它通过db事件列表迭代并发布所有这些服务.如果后端发回成功,则从本地数据库中删除该事件.

    private voID sendDbContent() {    mRetreiveDbContent.eventsList()            .subscribe(new MaybeObserver<List<Event>>() {        @OverrIDe        public voID onSubscribe(disposable d) {        }        @OverrIDe        public voID onSuccess(final List<Event> events) {            Timber.e("Size of List from db " + events.size());            final Compositedisposable disposable = new Compositedisposable();            Observable<Event> eventObservable = Observable.fromIterable(events);            eventObservable.subscribe(new Observer<Event>() {                @OverrIDe                public voID onSubscribe(disposable d) {                    disposable.add(d);                }                @OverrIDe                public voID onNext(Event event) {                    Timber.d("sending event from db " + event.getAction());                    mPresenter.postEvent(Event);                }                @OverrIDe                public voID one rror(Throwable e) {                    Timber.e("error while emitting db content " + e.getMessage());                }                @OverrIDe                public voID onComplete() {                    Timber.d("Finished looPing through db List");                    disposable.dispose();                }            });        }        @OverrIDe        public voID one rror(Throwable e) {            Timber.e("Error occurred while attempting to get db content " + e.getMessage());        }        @OverrIDe        public voID onComplete() {            Timber.d("Finished getting the db content");        }    });}

这是我的postEvent()&存在于演示者中的deleteEvent()方法

    public voID postEvent(final Event event) {    mSendtEvent.sendEvent(event)          .subscribeOn(Schedulers.io())            .observeOn(AndroIDSchedulers.mainThread())            .subscribe(new disposableObserver<Response<ResponseBody>>() {                @OverrIDe                public voID onNext(Response<ResponseBody> responseBodyResponse) {                    switch (responseBodyResponse.code()) {                        case CREATED_RESPONSE:                            Timber.d("Event posted successfully " + responseBodyResponse.code());                            deleteEventFromroom(event);                            break;                        case BAD_REQUEST:                            Timber.e("ClIEnt sent a bad request! We need to discard it!");                            break;                    }                }                @OverrIDe                public voID one rror(Throwable e) {                    Timber.e("Error " + e.getMessage());                    mVIEw.onErrorOccurred();                }                @OverrIDe                public voID onComplete() {                }            });}    public voID deleteEventFromroom(final Event event) {    final Compositedisposable disposable = new Compositedisposable();    mRemoveEvent.removeEvent(event)            .subscribeOn(Schedulers.io())            .observeOn(AndroIDSchedulers.mainThread())            .subscribe(new Observer() {                @OverrIDe                public voID onSubscribe(disposable d) {                    disposable.add(d);                }                @OverrIDe                public voID onNext(Object o) {                    Timber.d("Successfully deleted event from database " + event.getAction());                }                @OverrIDe                public voID one rror(Throwable e) {                }                @OverrIDe                public voID onComplete() {                    disposable.dispose();                }            });}

最后是mRemoveEvent交互器

public class RemoveEvent {private final EventDatabase eventDatabase;public RemoveEvent(EventDatabase eventDatabase) {    this.eventDatabase = eventDatabase;}@OverrIDepublic Observable removeEvent(final Event event) {    return Observable.fromCallable(new Callable<Object>() {        @OverrIDe        public Object call() throws Exception {            return eventDatabase.eventDao().delete(event);        }    });}}

注意:我是RXJava世界的新手.
先感谢您

解决方法:

您正在使用不支持背压的Observable.

Fom RxJava github页面:

Backpressure

When the dataflow runs through asynchronous steps, each step may
perform different things with different speed. To avoID overwhelming
such steps, which usually would manifest itself as increased memory
usage due to temporary buffering or the need for skipPing/dropPing
data, a so-called backpressure is applIEd, which is a form of flow
control where the steps can express how many items are they ready to
process. This allows constraining the memory usage of the dataflows in
situations where there is generally no way for a step to kNow how many
items the upstream will send to it.

In RxJava, the dedicated Flowable class is designated to support
backpressure and Observable is dedicated for the non-backpressured
operations (short sequences, GUI interactions, etc.). The other types,
Single, Maybe and Completable don’t support backpressure nor should
they; there is always room to store one item temporarily.

您应该使用Flowable,您将所有事件发送到下游以使用所有可用资源进行处理.

这是一个简单的例子:

Flowable.range(1, 1000)        .buffer(10)//Optional you can process single event        .flatMap(buf -> {            System.out.println(String.format("100ms for sending events to server: %s ", buf));            Thread.sleep(100);            return Flowable.fromIterable(buf);        }, 1)// <-- How many concurrent task should be executed        .map(x -> x + 1)        .doOnNext(i -> System.out.println(String.format("doOnNext: %d", i)))        .subscribeOn(Schedulers.io())        .observeOn(Schedulers.single(), false, 1)//OverrIDes the 128 default buffer size        .subscribe(new DefaultSubscriber<Integer>() {    @OverrIDe    public voID onStart() {        request(1);    }    @OverrIDe    public voID onNext(Integer t) {        System.out.println(String.format("Received response from server for event : %d", t));        System.out.println("Processing value would take some time");        try {            Thread.sleep(2000);        } catch (InterruptedException e) {            e.printstacktrace();        }        //You can request for more data here        request(1);    }    @OverrIDe    public voID one rror(Throwable t) {        t.printstacktrace();    }    @OverrIDe    public voID onComplete() {        System.out.println("ExampleUnitTest.onComplete");    }});

最后一个提示:您不应该立即将整个事件提取到内存中,基本上您在内存中保存所有“数据库事件”,考虑分页或类似Cursor,每个 *** 作获取100行并在处理它们后请求下一个100,我希望你用JobScheduler或WorkManager API做到这一点

总结

以上是内存溢出为你收集整理的android – 使用retrofit2和rx java2发送高容量POSTS时出现OutOfMemoryException全部内容,希望文章能够帮你解决android – 使用retrofit2和rx java2发送高容量POSTS时出现OutOfMemoryException所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/web/1108896.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-05-29
下一篇2022-05-29

发表评论

登录后才能评论

评论列表(0条)

    保存