package com.changba.module.ktv.liveroom.component.websocket.newws;

import android.util.Log;
import com.changba.library.commonUtils.ObjUtil;
import com.changba.library.commonUtils.stats.DataStats;
import com.changba.module.ktv.liveroom.component.websocket.IWebSocketDataListener;
import com.google.gson.Gson;
import com.xiaomi.mipush.sdk.Constants;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.json.JSONArray;
import org.json.JSONObject;
import rx.Observable;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Func1;
import rx.internal.util.RxRingBuffer;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;

/* loaded from: classes2.dex */
public class KtvRxWebSocketManager<T> implements IWebSocketDataListener {
    private static KtvRxWebSocketManager b;
    private boolean a;
    private Subject<KtvRxWebSocketTypeModel<T>, KtvRxWebSocketTypeModel<T>> c = PublishSubject.a().r();
    private ThreadPoolExecutor d = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(100000), new ThreadPoolExecutor.DiscardPolicy());

    private KtvRxWebSocketManager() {
    }

    public static <T> KtvRxWebSocketManager<T> a() {
        if (b == null) {
            synchronized (KtvRxWebSocketManager.class) {
                if (b == null) {
                    b = new KtvRxWebSocketManager();
                }
            }
        }
        return b;
    }

    private boolean d(String str) {
        return "toprunway".equals(str);
    }

    private void e(String str) {
        if (KtvRxWebSocketMessageType.a(str) == null) {
            throw new RuntimeException(KtvRxWebSocketMessageType.class.getName() + Constants.COLON_SEPARATOR + str + " 没有定义对应的类型");
        }
    }

    public Observable<T> a(String str, int i) {
        return b(str).a(AndroidSchedulers.a(), i);
    }

    @Override // com.changba.module.ktv.liveroom.component.websocket.IWebSocketDataListener
    public void a(final String str) {
        this.d.execute(new Runnable() { // from class: com.changba.module.ktv.liveroom.component.websocket.newws.KtvRxWebSocketManager.1
            @Override // java.lang.Runnable
            public void run() {
                if (ObjUtil.a(str)) {
                    return;
                }
                try {
                    JSONArray jSONArray = new JSONObject(str).getJSONArray("result");
                    int length = jSONArray.length();
                    for (int i = 0; i < length; i++) {
                        JSONObject jSONObject = jSONArray.getJSONObject(i);
                        KtvRxWebSocketManager.this.a(jSONObject.getString("type"), jSONObject.toString());
                    }
                } catch (Exception e) {
                    Log.e("-----RxWebSocket-----", "webSocket消息解析出错：message=" + str);
                    e.printStackTrace();
                }
            }
        });
        if (this.d.getQueue().size() < 100000 || this.a) {
            return;
        }
        this.a = true;
        DataStats.a("KtvRxWebSocketManager_线程池队列已满--队列大小=100000");
    }

    public void a(String str, String str2) {
        if (d(str)) {
            Log.v("-----RxWebSocket-----", "Rx--onReceiveMessage  type = " + str + " , message=" + str2);
        } else {
            Log.d("-----RxWebSocket-----", "Rx--onReceiveMessage  type = " + str + " , message=" + str2);
        }
        Class<T> a = KtvRxWebSocketMessageType.a(str);
        if (a != null) {
            this.c.onNext(new KtvRxWebSocketTypeModel<>(str, new Gson().fromJson(str2, (Class) a)));
        }
    }

    public Observable<T> b(final String str) {
        e(str);
        return this.c.c().d(new Func1<KtvRxWebSocketTypeModel<T>, Boolean>() { // from class: com.changba.module.ktv.liveroom.component.websocket.newws.KtvRxWebSocketManager.3
            @Override // rx.functions.Func1
            public Boolean a(KtvRxWebSocketTypeModel<T> ktvRxWebSocketTypeModel) {
                return Boolean.valueOf(str.equals(ktvRxWebSocketTypeModel.getType()));
            }
        }).f(new Func1<KtvRxWebSocketTypeModel<T>, T>() { // from class: com.changba.module.ktv.liveroom.component.websocket.newws.KtvRxWebSocketManager.2
            @Override // rx.functions.Func1
            public T a(KtvRxWebSocketTypeModel<T> ktvRxWebSocketTypeModel) {
                return ktvRxWebSocketTypeModel.getData();
            }
        }).h();
    }

    public Observable<T> c(String str) {
        return a(str, RxRingBuffer.b);
    }
}
