package org.apache.hadoop.hdfs.server.namenode.ha;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputException;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Time;

@InterfaceAudience.Private
@InterfaceStability.Evolving
/* loaded from: classes2.dex */
public class EditLogTailer {
    static final /* synthetic */ boolean $assertionsDisabled = false;
    public static final Log LOG = LogFactory.getLog(EditLogTailer.class);
    private InetSocketAddress activeAddr;
    private final Configuration conf;
    private FSEditLog editLog;
    private final long logRollPeriodMs;
    private final FSNamesystem namesystem;
    private final long sleepTimeMs;
    private NamenodeProtocol cachedActiveProxy = null;
    private long lastRollTriggerTxId = HdfsConstants.INVALID_TXID;
    private long lastLoadedTxnId = HdfsConstants.INVALID_TXID;
    private final EditLogTailerThread tailerThread = new EditLogTailerThread();
    private long lastLoadTimestamp = Time.now();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes2.dex */
    public class EditLogTailerThread extends Thread {
        private volatile boolean shouldRun;

        private EditLogTailerThread() {
            super("Edit log tailer");
            this.shouldRun = true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void doWork() {
            while (this.shouldRun) {
                try {
                    if (EditLogTailer.this.tooLongSinceLastLoad() && EditLogTailer.this.lastRollTriggerTxId < EditLogTailer.this.lastLoadedTxnId) {
                        EditLogTailer.this.triggerActiveLogRoll();
                    }
                } catch (InterruptedException unused) {
                } catch (EditLogInputException e) {
                    EditLogTailer.LOG.warn("Error while reading edits from disk. Will try again.", e);
                } catch (Throwable th) {
                    EditLogTailer.LOG.fatal("Unknown error encountered while tailing edits. Shutting down standby NN.", th);
                    ExitUtil.terminate(1, th);
                }
                if (!this.shouldRun) {
                    return;
                }
                EditLogTailer.this.doTailEdits();
                try {
                    Thread.sleep(EditLogTailer.this.sleepTimeMs);
                } catch (InterruptedException e2) {
                    EditLogTailer.LOG.warn("Edit log tailer interrupted", e2);
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setShouldRun(boolean z) {
            this.shouldRun = z;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            SecurityUtil.doAsLoginUserOrFatal(new PrivilegedAction<Object>() { // from class: org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer.EditLogTailerThread.1
                @Override // java.security.PrivilegedAction
                public Object run() {
                    EditLogTailerThread.this.doWork();
                    return null;
                }
            });
        }
    }

    public EditLogTailer(FSNamesystem fSNamesystem, Configuration configuration) {
        this.conf = configuration;
        this.namesystem = fSNamesystem;
        this.editLog = fSNamesystem.getEditLog();
        long j = configuration.getInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, 120) * 1000;
        this.logRollPeriodMs = j;
        if (j >= 0) {
            InetSocketAddress activeNodeAddress = getActiveNodeAddress();
            this.activeAddr = activeNodeAddress;
            Preconditions.checkArgument(activeNodeAddress.getPort() > 0, "Active NameNode must have an IPC port configured. Got address '%s'", new Object[]{this.activeAddr});
            LOG.info("Will roll logs on active node at " + this.activeAddr + " every " + (j / 1000) + " seconds.");
        } else {
            LOG.info("Not going to trigger log rolls on active node because dfs.ha.log-roll.period is negative.");
        }
        long j2 = configuration.getInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 60) * 1000;
        this.sleepTimeMs = j2;
        LOG.debug("logRollPeriodMs=" + j + " sleepTime=" + j2);
    }

    private InetSocketAddress getActiveNodeAddress() {
        return NameNode.getServiceAddress(HAUtil.getConfForOtherNode(this.conf), true);
    }

    private NamenodeProtocol getActiveNodeProxy() throws IOException {
        if (this.cachedActiveProxy == null) {
            this.cachedActiveProxy = new NamenodeProtocolTranslatorPB((NamenodeProtocolPB) RPC.waitForProxy(NamenodeProtocolPB.class, RPC.getProtocolVersion(NamenodeProtocolPB.class), this.activeAddr, this.conf, this.conf.getInt(DFSConfigKeys.DFS_HA_LOGROLL_RPC_TIMEOUT_KEY, 20000), Long.MAX_VALUE));
        }
        return this.cachedActiveProxy;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean tooLongSinceLastLoad() {
        return this.logRollPeriodMs >= 0 && Time.now() - this.lastLoadTimestamp > this.logRollPeriodMs;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void triggerActiveLogRoll() {
        LOG.info("Triggering log roll on remote NameNode " + this.activeAddr);
        try {
            getActiveNodeProxy().rollEditLog();
            this.lastRollTriggerTxId = this.lastLoadedTxnId;
        } catch (IOException e) {
            LOG.warn("Unable to trigger a roll of the active NN", e);
        }
    }

    public void catchupDuringFailover() throws IOException {
        EditLogTailerThread editLogTailerThread = this.tailerThread;
        Preconditions.checkState(editLogTailerThread == null || !editLogTailerThread.isAlive(), "Tailer thread should not be running once failover starts");
        SecurityUtil.doAsLoginUser(new PrivilegedExceptionAction<Void>() { // from class: org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer.1
            @Override // java.security.PrivilegedExceptionAction
            public Void run() throws Exception {
                try {
                    EditLogTailer.this.doTailEdits();
                    return null;
                } catch (InterruptedException e) {
                    throw new IOException(e);
                }
            }
        });
    }

    void doTailEdits() throws IOException, InterruptedException {
        long j;
        this.namesystem.writeLockInterruptibly();
        try {
            FSImage fSImage = this.namesystem.getFSImage();
            long lastAppliedTxId = fSImage.getLastAppliedTxId();
            Log log = LOG;
            if (log.isDebugEnabled()) {
                log.debug("lastTxnId: " + lastAppliedTxId);
            }
            Collection<EditLogInputStream> selectInputStreams = this.editLog.selectInputStreams(1 + lastAppliedTxId, 0L, null, false);
            if (log.isDebugEnabled()) {
                log.debug("edit streams to load from: " + selectInputStreams.size());
            }
            try {
                try {
                    long loadEdits = fSImage.loadEdits(selectInputStreams, this.namesystem);
                    if (loadEdits > 0 || log.isDebugEnabled()) {
                        log.info(String.format("Loaded %d edits starting from txid %d ", Long.valueOf(loadEdits), Long.valueOf(lastAppliedTxId)));
                    }
                    if (loadEdits > 0) {
                        this.lastLoadTimestamp = Time.now();
                    }
                    this.lastLoadedTxnId = fSImage.getLastAppliedTxId();
                } catch (EditLogInputException e) {
                    j = e.getNumEditsLoaded();
                    try {
                        throw e;
                    } catch (Throwable th) {
                        th = th;
                        if (j <= 0) {
                        }
                        LOG.info(String.format("Loaded %d edits starting from txid %d ", Long.valueOf(j), Long.valueOf(lastAppliedTxId)));
                        throw th;
                    }
                }
            } catch (Throwable th2) {
                th = th2;
                j = 0;
                if (j <= 0 || LOG.isDebugEnabled()) {
                    LOG.info(String.format("Loaded %d edits starting from txid %d ", Long.valueOf(j), Long.valueOf(lastAppliedTxId)));
                }
                throw th;
            }
        } catch (IOException e2) {
            LOG.warn("Edits tailer failed to find any streams. Will try again later.", e2);
        } finally {
            this.namesystem.writeUnlock();
        }
    }

    FSEditLog getEditLog() {
        return this.editLog;
    }

    public long getLastLoadTimestamp() {
        return this.lastLoadTimestamp;
    }

    public void setEditLog(FSEditLog fSEditLog) {
        this.editLog = fSEditLog;
    }

    public void start() {
        this.tailerThread.start();
    }

    public void stop() throws IOException {
        this.tailerThread.setShouldRun(false);
        this.tailerThread.interrupt();
        try {
            this.tailerThread.join();
        } catch (InterruptedException e) {
            LOG.warn("Edit log tailer thread exited with an exception");
            throw new IOException(e);
        }
    }
}
