询问本地 flink 应用写入到阿里云主机时遇到写入不了的问题

查看 26|回复 0
作者:summerlv   
问题是这样的,本来是在本地开发 flink 应用,然后想连接云主机 Doris 把内容写入到 Doris 中,但是一开始报错是返回了远程 Doris 运行时的一个内网地址。
于是改了以下源码的这个类:BackendV2 在其中增加了 convertToHostname() 方法将内网地址映射为公网地址
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.flink.rest.models;
import com.car.common.Constant;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.Objects;
/**
* Be response model
**/
@JsonIgnoreProperties(ignoreUnknown = true)
public class BackendV2 {
    @JsonProperty(value = "backends")
    private List[B] backends;
    public void setBackends(List[B] backends) { this.backends = backends; }
    public static class BackendRowV2 {
        @JsonProperty("ip")
        public String ip;
        @JsonProperty(value="http_port")
        public int httpPort;
        @JsonProperty("is_alive")
        public boolean isAlive;
        
        public String getIp() {
            return ip;
        }
        public void setIp(String ip) {
            this.ip = ip;
        }
        public int getHttpPort() {
            return httpPort;
        }
        public void setHttpPort(int httpPort) {
            this.httpPort = httpPort;
        }
        public boolean isAlive() {
            return isAlive;
        }
        public void setAlive(boolean alive) {
            isAlive = alive;
        }
        
        public String convertToHostname(String ip) {
            System.out.println("================"+ip);
            this.ip = ip;
            if(Objects.equals(ip,"192.168.0.107")) {
                return ip= Constant.HADOOP102;
            } else if(Objects.equals(ip,"192.168.0.108")) {
                return ip = Constant.HADOOP103;
            } else if (Objects.equals(ip,"192.168.0.109")) {
                return ip = Constant.HADOOP104;
            } else {
                return null;
            }
        }
        public String toBackendString(){
                        return convertToHostname(ip) + ":" + httpPort;
            //return ip + ":" + httpPort;
        }
    }
}
本来的源码是这样的:
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.flink.rest.models;
import com.car.common.Constant;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.Objects;
/**
* Be response model
**/
@JsonIgnoreProperties(ignoreUnknown = true)
public class BackendV2 {
    @JsonProperty(value = "backends")
    private List[B] backends;
    public void setBackends(List[B] backends) { this.backends = backends; }
    public static class BackendRowV2 {
        @JsonProperty("ip")
        public String ip;
        @JsonProperty(value="http_port")
        public int httpPort;
        @JsonProperty("is_alive")
        public boolean isAlive;
        
        public String getIp() {
            return ip;
        }
        public void setIp(String ip) {
            this.ip = ip;
        }
        public int getHttpPort() {
            return httpPort;
        }
        public void setHttpPort(int httpPort) {
            this.httpPort = httpPort;
        }
        public boolean isAlive() {
            return isAlive;
        }
        public void setAlive(boolean alive) {
            isAlive = alive;
        }
        
      
        public String toBackendString(){
            return ip + ":" + httpPort;
        }
    }
}
但是替换完以后报错如下:
Caused by: org.apache.doris.shaded.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "http_port" (class org.apache.doris.flink.rest.models.BackendV2$BackendRowV2), not marked as ignorable (4 known properties: "httpPort", "isAlive", "alive", "ip"])
at [Source: (String)"{"backends":[{"ip":"192.168.0.109","http_port":7040,"is_alive":true}]}"; line: 1, column: 52] (through reference chain: org.apache.doris.flink.rest.models.BackendV2["backends"]->java.util.ArrayList[0]->org.apache.doris.flink.rest.models.BackendV2$BackendRowV2["http_port"])
        at org.apache.doris.shaded.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61)
        at org.apache.doris.shaded.com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:1127)
        at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:2023)
        at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1700)
        at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownVanilla(BeanDeserializerBase.java:1678)
        at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:319)
        at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:176)
        at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer._deserializeFromArray(CollectionDeserializer.java:355)
        at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:244)
        at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:28)
        at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:129)
        at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:313)
        at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:176)
        at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323)
        at org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4674)
        at org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3629)
        at org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3597)
        at org.apache.doris.flink.rest.RestService.parseBackendV2(RestService.java:380)
替换前报错如下:
2023-07-25 20:54:43 WARN (org.apache.doris.flink.sink.writer.DorisWriter:tryHttpConnection) - Failed to connect to backend:http://192.168.0.109:7040
java.net.ConnectException: Connection timed out: connect
        at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
        at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
        at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java.net.Socket.connect(Socket.java:607)
        at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
        at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
        at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
        at sun.net.www.http.HttpClient.[i](HttpClient.java:242)
        at sun.net.www.http.HttpClient.New(HttpClient.java:339)
        at sun.net.www.http.HttpClient.New(HttpClient.java:357)
        at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1228)
        at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1162)
        at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1056)
        at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:990)
        at org.apache.doris.flink.sink.writer.DorisWriter.tryHttpConnection(DorisWriter.java:259)
        at org.apache.doris.flink.sink.writer.DorisWriter.getAvailableBackend(DorisWriter.java:245)
        at org.apache.doris.flink.sink.writer.DorisWriter.initializeLoad(DorisWriter.java:108)
        at org.apache.doris.flink.sink.DorisSink.createWriter(DorisSink.java:64)
        at org.apache.flink.streaming.api.transformations.SinkV1Adapter.createWriter(SinkV1Adapter.java:77)
        at org.apache.flink.streaming.api.transformations.SinkV1Adapter$PlainSinkAdapter.createWriter(SinkV1Adapter.java:306)
        at org.apache.flink.streaming.api.transformations.SinkV1Adapter$StatefulSinkAdapter.createWriter(SinkV1Adapter.java:315)
        at org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:117)
        at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:146)
        at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)
        at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
        at java.lang.Thread.run(Thread.java:750)
想问问大家有没遇到过本地 flink 应用远程连接 Doris 并写入数据的问题啊?
感谢大家!!!
您需要登录后才可以回帖 登录 | 立即注册

返回顶部