package org.apache.spark.sql.loghub;

import java.io.BufferedWriter;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkContext;
import org.apache.spark.internal.Logging;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.streaming.HDFSMetadataLog;
import org.apache.spark.sql.execution.streaming.Offset;
import org.apache.spark.sql.execution.streaming.SerializedOffset;
import org.apache.spark.sql.execution.streaming.Source;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SetLike;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.immutable.StringOps$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: LoghubSource.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u001de!B\u0001\u0003\u0001\ta!\u0001\u0004'pO\",(mU8ve\u000e,'BA\u0002\u0005\u0003\u0019awn\u001a5vE*\u0011QAB\u0001\u0004gFd'BA\u0004\t\u0003\u0015\u0019\b/\u0019:l\u0015\tI!\"\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u0017\u0005\u0019qN]4\u0014\t\u0001iQ#\b\t\u0003\u001dMi\u0011a\u0004\u0006\u0003!E\tA\u0001\\1oO*\t!#\u0001\u0003kCZ\f\u0017B\u0001\u000b\u0010\u0005\u0019y%M[3diB\u0011acG\u0007\u0002/)\u0011\u0001$G\u0001\ngR\u0014X-Y7j]\u001eT!A\u0007\u0003\u0002\u0013\u0015DXmY;uS>t\u0017B\u0001\u000f\u0018\u0005\u0019\u0019v.\u001e:dKB\u0011a$I\u0007\u0002?)\u0011\u0001EB\u0001\tS:$XM\u001d8bY&\u0011!e\b\u0002\b\u0019><w-\u001b8h\u0011!!\u0003A!A!\u0002\u00131\u0013AC:rY\u000e{g\u000e^3yi\u000e\u0001\u0001CA\u0014)\u001b\u0005!\u0011BA\u0015\u0005\u0005)\u0019\u0016\u000bT\"p]R,\u0007\u0010\u001e\u0005\tW\u0001\u0011\t\u0011)A\u0005Y\u0005\u0011Bn\\4ik\n|eMZ:fiJ+\u0017\rZ3s!\tic&D\u0001\u0003\u0013\ty#A\u0001\nM_\u001eDWOY(gMN,GOU3bI\u0016\u0014\b\u0002C\u0019\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u001a\u0002)\u0015DXmY;u_Jdun\u001a5vEB\u000b'/Y7t!\u0011\u0019\u0014\bP\u0007\u000f\u0005Q:T\"A\u001b\u000b\u0003Y\nQa]2bY\u0006L!\u0001O\u001b\u0002\rA\u0013X\rZ3g\u0013\tQ4HA\u0002NCBT!\u0001O\u001b\u0011\u0005Mj\u0014B\u0001 <\u0005\u0019\u0019FO]5oO\"A\u0001\t\u0001B\u0001B\u0003%A(\u0001\u0007nKR\fG-\u0019;b!\u0006$\b\u000e\u0003\u0005C\u0001\t\u0005\t\u0015!\u0003D\u0003=\u0019H/\u0019:uS:<wJ\u001a4tKR\u001c\bCA\u0017E\u0013\t)%A\u0001\fM_\u001eDWOY(gMN,GOU1oO\u0016d\u0015.\\5u\u0011!9\u0005A!A!\u0002\u0013A\u0015A\u00044bS2|e\u000eR1uC2{7o\u001d\t\u0003i%K!AS\u001b\u0003\u000f\t{w\u000e\\3b]\")A\n\u0001C\u0001\u001b\u00061A(\u001b8jiz\"rAT(Q#J\u001bF\u000b\u0005\u0002.\u0001!)Ae\u0013a\u0001M!)1f\u0013a\u0001Y!)\u0011g\u0013a\u0001e!)\u0001i\u0013a\u0001y!)!i\u0013a\u0001\u0007\")qi\u0013a\u0001\u0011\"9a\u000b\u0001b\u0001\n\u00139\u0016AA:d+\u0005A\u0006CA-[\u001b\u00051\u0011BA.\u0007\u00051\u0019\u0006/\u0019:l\u0007>tG/\u001a=u\u0011\u0019i\u0006\u0001)A\u00051\u0006\u00191o\u0019\u0011\t\u0011}\u0003\u0001R1A\u0005\n\u0001\f!#\u001b8ji&\fGn\u00155be\u0012|eMZ:fiV\t\u0011\r\u0005\u00034s\td\u0004CA\u0017d\u0013\t!'A\u0001\u0006Ti>\u0014Xm\u00155be\u0012D\u0001B\u001a\u0001\t\u0002\u0003\u0006K!Y\u0001\u0014S:LG/[1m'\"\f'\u000fZ(gMN,G\u000f\t\u0005\u0006Q\u0002!I![\u0001\u000fM\u0016$8\r[!oIZ+'/\u001b4z)\tQW\u000e\u0005\u0002.W&\u0011AN\u0001\u0002\u0013\u0019><\u0007.\u001e2T_V\u00148-Z(gMN,G\u000fC\u0003oO\u0002\u0007\u0011-A\bta\u0016\u001c\u0017NZ5d\u001f\u001a47/\u001a;t\u0011\u0015\u0001\b\u0001\"\u0011r\u0003\u0019\u00198\r[3nCV\t!\u000f\u0005\u0002tm6\tAO\u0003\u0002v\t\u0005)A/\u001f9fg&\u0011q\u000f\u001e\u0002\u000b'R\u0014Xo\u0019;UsB,\u0007\"B=\u0001\t\u0003R\u0018!C4fi>3gm]3u+\u0005Y\bc\u0001\u001b}}&\u0011Q0\u000e\u0002\u0007\u001fB$\u0018n\u001c8\u0011\u0005Yy\u0018bAA\u0001/\t1qJ\u001a4tKRDq!!\u0002\u0001\t\u0003\n9!\u0001\u0005hKR\u0014\u0015\r^2i)\u0019\tI!!\f\u00022A!\u00111BA\u0014\u001d\u0011\ti!a\t\u000f\t\u0005=\u0011\u0011\u0005\b\u0005\u0003#\tyB\u0004\u0003\u0002\u0014\u0005ua\u0002BA\u000b\u00037i!!a\u0006\u000b\u0007\u0005eQ%\u0001\u0004=e>|GOP\u0005\u0002\u0017%\u0011\u0011BC\u0005\u0003\u000f!I!!\u0002\u0004\n\u0007\u0005\u0015B!A\u0004qC\u000e\\\u0017mZ3\n\t\u0005%\u00121\u0006\u0002\n\t\u0006$\u0018M\u0012:b[\u0016T1!!\n\u0005\u0011\u001d\ty#a\u0001A\u0002m\fQa\u001d;beRDq!a\r\u0002\u0004\u0001\u0007a0A\u0002f]\u0012Dq!a\u000e\u0001\t\u0003\nI$\u0001\u0003ti>\u0004HCAA\u001e!\r!\u0014QH\u0005\u0004\u0003\u007f)$\u0001B+oSRDq!a\u0011\u0001\t\u0003\n)%\u0001\u0005u_N#(/\u001b8h)\u0005a\u0004bBA%\u0001\u0011%\u00111J\u0001\u000fe\u0016\u0004xN\u001d;ECR\fGj\\:t)\u0011\tY$!\u0014\t\u000f\u0005=\u0013q\ta\u0001y\u00059Q.Z:tC\u001e,w\u0001CA*\u0005!\u0005!!!\u0016\u0002\u00191{w\r[;c'>,(oY3\u0011\u00075\n9FB\u0004\u0002\u0005!\u0005!!!\u0017\u0014\t\u0005]\u00131\f\t\u0004i\u0005u\u0013bAA0k\t1\u0011I\\=SK\u001aDq\u0001TA,\t\u0003\t\u0019\u0007\u0006\u0002\u0002V!Q\u0011qMA,\u0005\u0004%\t!!\u001b\u0002O%s5\u000b\u0016*V\u0007RKuJT0G\u001fJ{f)Q%M?>su\fR!U\u0003~cujU*`\r\u0006c5+R\u000b\u0002y!A\u0011QNA,A\u0003%A(\u0001\u0015J\u001dN#&+V\"U\u0013>suLR(S?\u001a\u000b\u0015\nT0P\u001d~#\u0015\tV!`\u0019>\u001b6k\u0018$B\u0019N+\u0005\u0005\u0003\u0006\u0002r\u0005]#\u0019!C\u0001\u0003S\na%\u0013(T)J+6\tV%P\u001d~3uJU0G\u0003&cul\u0014(`\t\u0006#\u0016i\u0018'P'N{FKU+F\u0011!\t)(a\u0016!\u0002\u0013a\u0014aJ%O'R\u0013Vk\u0011+J\u001f:{fi\u0014*`\r\u0006KEjX(O?\u0012\u000bE+Q0M\u001fN\u001bv\f\u0016*V\u000b\u0002B1\"!\u001f\u0002X\t\u0007I\u0011\u0001\u0002\u0002|\u00059a+\u0012*T\u0013>sUCAA?!\r!\u0014qP\u0005\u0004\u0003\u0003+$aA%oi\"I\u0011QQA,A\u0003%\u0011QP\u0001\t-\u0016\u00136+S(OA\u0001")
/* loaded from: input_file:org/apache/spark/sql/loghub/LoghubSource.class */
public class LoghubSource implements Source, Logging {
    public final SQLContext org$apache$spark$sql$loghub$LoghubSource$$sqlContext;
    public final LoghubOffsetReader org$apache$spark$sql$loghub$LoghubSource$$loghubOffsetReader;
    private final Map<String, Object> executorLoghubParams;
    public final String org$apache$spark$sql$loghub$LoghubSource$$metadataPath;
    public final LoghubOffsetRangeLimit org$apache$spark$sql$loghub$LoghubSource$$startingOffsets;
    private final boolean failOnDataLoss;
    private final SparkContext sc;
    private Map<StoreShard, String> initialShardOffset;
    private transient Logger org$apache$spark$internal$Logging$$log_;
    private volatile boolean bitmap$0;

    public static String INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE() {
        return LoghubSource$.MODULE$.INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE();
    }

    public static String INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE() {
        return LoghubSource$.MODULE$.INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5 */
    private Map initialShardOffset$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                HDFSMetadataLog<LoghubSourceOffset> hDFSMetadataLog = new HDFSMetadataLog<LoghubSourceOffset>(this) { // from class: org.apache.spark.sql.loghub.LoghubSource$$anon$1
                    public void serialize(LoghubSourceOffset loghubSourceOffset, OutputStream outputStream) {
                        outputStream.write(0);
                        BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8));
                        bufferedWriter.write(new StringBuilder().append("v").append(BoxesRunTime.boxToInteger(LoghubSource$.MODULE$.VERSION())).append("\n").toString());
                        bufferedWriter.write(loghubSourceOffset.json());
                        bufferedWriter.flush();
                    }

                    /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
                    public LoghubSourceOffset m96deserialize(InputStream inputStream) {
                        inputStream.read();
                        String iOUtils = IOUtils.toString(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
                        logDebug(new LoghubSource$$anon$1$$anonfun$deserialize$1(this, iOUtils));
                        Predef$.MODULE$.assert(iOUtils.length() != 0);
                        if (StringOps$.MODULE$.apply$extension(Predef$.MODULE$.augmentString(iOUtils), 0) != 'v') {
                            return LoghubSourceOffset$.MODULE$.apply(new SerializedOffset(iOUtils));
                        }
                        int indexOf = iOUtils.indexOf("\n");
                        if (indexOf <= 0) {
                            throw new IllegalStateException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Log file was malformed: failed to detect the log file version line."})).s(Nil$.MODULE$));
                        }
                        parseVersion(iOUtils.substring(0, indexOf), LoghubSource$.MODULE$.VERSION());
                        return LoghubSourceOffset$.MODULE$.apply(new SerializedOffset(iOUtils.substring(indexOf + 1)));
                    }

                    {
                        super(this.org$apache$spark$sql$loghub$LoghubSource$$sqlContext.sparkSession(), this.org$apache$spark$sql$loghub$LoghubSource$$metadataPath, ClassTag$.MODULE$.apply(LoghubSourceOffset.class));
                    }
                };
                this.initialShardOffset = ((LoghubSourceOffset) hDFSMetadataLog.get(0L).getOrElse(new LoghubSource$$anonfun$initialShardOffset$1(this, hDFSMetadataLog))).shardToOffsets();
                this.bitmap$0 = true;
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.initialShardOffset;
        }
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    public String logName() {
        return Logging.class.logName(this);
    }

    public Logger log() {
        return Logging.class.log(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.class.logInfo(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.class.logDebug(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.class.logTrace(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.class.logWarning(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.class.logError(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.class.logInfo(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.class.logDebug(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.class.logTrace(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.class.logWarning(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.class.logError(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.class.initializeLogIfNecessary(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.class.initializeLogIfNecessary(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.class.initializeLogIfNecessary$default$2(this);
    }

    public void commit(Offset offset) {
        Source.class.commit(this, offset);
    }

    private SparkContext sc() {
        return this.sc;
    }

    private Map<StoreShard, String> initialShardOffset() {
        return this.bitmap$0 ? this.initialShardOffset : initialShardOffset$lzycompute();
    }

    public LoghubSourceOffset org$apache$spark$sql$loghub$LoghubSource$$fetchAndVerify(Map<StoreShard, String> map) {
        return new LoghubSourceOffset(this.org$apache$spark$sql$loghub$LoghubSource$$loghubOffsetReader.fetchSpecificOffsets(map));
    }

    public StructType schema() {
        return LoghubOffsetReader$.MODULE$.loghubSchema();
    }

    public Option<Offset> getOffset() {
        initialShardOffset();
        Map<StoreShard, String> fetchLatestOffsets = this.org$apache$spark$sql$loghub$LoghubSource$$loghubOffsetReader.fetchLatestOffsets();
        logDebug(new LoghubSource$$anonfun$getOffset$1(this, fetchLatestOffsets));
        return new Some(new LoghubSourceOffset(fetchLatestOffsets));
    }

    public Dataset<Row> getBatch(Option<Offset> option, Offset offset) {
        Map<StoreShard, String> initialShardOffset;
        initialShardOffset();
        logInfo(new LoghubSource$$anonfun$getBatch$1(this, option, offset));
        if (option.isDefined()) {
            Object obj = option.get();
            if (obj != null ? obj.equals(offset) : offset == null) {
                return this.org$apache$spark$sql$loghub$LoghubSource$$sqlContext.internalCreateDataFrame(this.org$apache$spark$sql$loghub$LoghubSource$$sqlContext.sparkContext().emptyRDD(ClassTag$.MODULE$.apply(InternalRow.class)), schema(), true);
            }
        }
        Map<StoreShard, String> shardOffsets = LoghubSourceOffset$.MODULE$.getShardOffsets(offset);
        if (option instanceof Some) {
            initialShardOffset = LoghubSourceOffset$.MODULE$.getShardOffsets((Offset) ((Some) option).x());
        } else {
            if (!None$.MODULE$.equals(option)) {
                throw new MatchError(option);
            }
            logDebug(new LoghubSource$$anonfun$1(this));
            initialShardOffset = initialShardOffset();
        }
        Map<StoreShard, String> map = initialShardOffset;
        Set diff = shardOffsets.keySet().diff(map.keySet());
        Map<StoreShard, String> fetchEarliestOffsets = this.org$apache$spark$sql$loghub$LoghubSource$$loghubOffsetReader.fetchEarliestOffsets(diff.toSeq());
        Set keySet = fetchEarliestOffsets.keySet();
        if (keySet != null ? !keySet.equals(diff) : diff != null) {
            reportDataLoss(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Cannot find earliest offsets of ", ". Some data may have been missed"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{diff.diff(fetchEarliestOffsets.keySet())})));
        }
        logInfo(new LoghubSource$$anonfun$getBatch$2(this, fetchEarliestOffsets));
        Set diff2 = map.keySet().diff(shardOffsets.keySet());
        if (diff2.nonEmpty()) {
            reportDataLoss(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", " are gone. Some data may have been missed"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{diff2})));
        }
        Seq seq = ((SetLike) shardOffsets.keySet().filter(new LoghubSource$$anonfun$2(this, map, fetchEarliestOffsets))).toSeq();
        logDebug(new LoghubSource$$anonfun$getBatch$3(this, seq));
        LoghubSourceRDDOffsetRange[] loghubSourceRDDOffsetRangeArr = (LoghubSourceRDDOffsetRange[]) ((TraversableOnce) ((TraversableLike) seq.map(new LoghubSource$$anonfun$3(this, shardOffsets, map, fetchEarliestOffsets), Seq$.MODULE$.canBuildFrom())).filter(new LoghubSource$$anonfun$5(this))).toArray(ClassTag$.MODULE$.apply(LoghubSourceRDDOffsetRange.class));
        RDD map2 = new LoghubSourceRDD(sc(), this.executorLoghubParams, Predef$.MODULE$.wrapRefArray(loghubSourceRDDOffsetRangeArr), this.failOnDataLoss).map(new LoghubSource$$anonfun$6(this), ClassTag$.MODULE$.apply(InternalRow.class));
        logInfo(new LoghubSource$$anonfun$getBatch$4(this, loghubSourceRDDOffsetRangeArr));
        return this.org$apache$spark$sql$loghub$LoghubSource$$sqlContext.internalCreateDataFrame(map2, schema(), true);
    }

    public synchronized void stop() {
    }

    public String toString() {
        return new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"LoghubSource[", "]"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.org$apache$spark$sql$loghub$LoghubSource$$loghubOffsetReader}));
    }

    private void reportDataLoss(String str) {
        if (this.failOnDataLoss) {
            throw new IllegalStateException(new StringBuilder().append(str).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{". ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{LoghubSource$.MODULE$.INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE()}))).toString());
        }
        logWarning(new LoghubSource$$anonfun$reportDataLoss$1(this, str));
    }

    public LoghubSource(SQLContext sQLContext, LoghubOffsetReader loghubOffsetReader, Map<String, Object> map, String str, LoghubOffsetRangeLimit loghubOffsetRangeLimit, boolean z) {
        this.org$apache$spark$sql$loghub$LoghubSource$$sqlContext = sQLContext;
        this.org$apache$spark$sql$loghub$LoghubSource$$loghubOffsetReader = loghubOffsetReader;
        this.executorLoghubParams = map;
        this.org$apache$spark$sql$loghub$LoghubSource$$metadataPath = str;
        this.org$apache$spark$sql$loghub$LoghubSource$$startingOffsets = loghubOffsetRangeLimit;
        this.failOnDataLoss = z;
        Source.class.$init$(this);
        Logging.class.$init$(this);
        this.sc = sQLContext.sparkContext();
    }
}
