diff --git a/Cargo.lock b/Cargo.lock index a8aa8881984..b2e5b6de2ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -241,7 +241,7 @@ dependencies = [ "Inflector", "async-graphql-parser", "darling", - "proc-macro-crate 3.1.0", + "proc-macro-crate", "proc-macro2", "quote", "strum", @@ -699,7 +699,7 @@ checksum = "3147d8272e8fa0ccd29ce51194dd98f79ddfb8191ba9e3409884e751798acf3a" dependencies = [ "core2", "multibase", - "multihash 0.19.1", + "multihash", "unsigned-varint 0.8.0", ] @@ -762,22 +762,6 @@ dependencies = [ "unreachable", ] -[[package]] -name = "common-multipart-rfc7578" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5baee326bc603965b0f26583e1ecd7c111c41b49bd92a344897476a352798869" -dependencies = [ - "bytes", - "futures-core", - "futures-util", - "http 0.2.12", - "mime", - "mime_guess", - "rand", - "thiserror", -] - [[package]] name = "console" version = "0.13.0" @@ -1137,6 +1121,24 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "deadpool" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb84100978c1c7b37f09ed3ce3e5f843af02c2a2c431bae5b19230dad2c1b490" +dependencies = [ + "async-trait", + "deadpool-runtime", + "num_cpus", + "tokio", +] + +[[package]] +name = "deadpool-runtime" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" + [[package]] name = "debugid" version = "0.8.0" @@ -1162,6 +1164,17 @@ dependencies = [ "serde", ] +[[package]] +name = "derivative" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "derive_more" version = "0.99.18" @@ -1292,22 +1305,13 @@ dependencies = [ "dirs-sys-next", ] -[[package]] -name = "dirs" -version = "4.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059" -dependencies = [ - "dirs-sys 0.3.7", -] - [[package]] name = "dirs" version = "5.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225" dependencies = [ - "dirs-sys 0.4.1", + "dirs-sys", ] [[package]] @@ -1320,17 +1324,6 @@ dependencies = [ "dirs-sys-next", ] -[[package]] -name = "dirs-sys" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6" -dependencies = [ - "libc", - "redox_users", - "winapi", -] - [[package]] name = "dirs-sys" version = "0.4.1" @@ -1815,6 +1808,7 @@ dependencies = [ "clap", "csv", "defer", + "derivative", "diesel", "diesel_derives", "envconfig", @@ -1875,6 +1869,7 @@ dependencies = [ "url", "wasmparser 0.118.2", "web3", + "wiremock", ] [[package]] @@ -2007,8 +2002,6 @@ dependencies = [ "graph-chain-starknet", "graph-chain-substreams", "graph-runtime-wasm", - "ipfs-api", - "ipfs-api-backend-hyper", "serde_yaml", "tower 0.4.13 (git+https://github.com/tower-rs/tower.git)", "tower-test", @@ -2592,34 +2585,6 @@ dependencies = [ "want", ] -[[package]] -name = "hyper-multipart-rfc7578" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0eb2cf73e96e9925f4bed948e763aa2901c2f1a3a5f713ee41917433ced6671" -dependencies = [ - "bytes", - "common-multipart-rfc7578", - "futures-core", - "http 0.2.12", - "hyper 0.14.29", -] - -[[package]] -name = "hyper-rustls" -version = "0.23.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c" -dependencies = [ - "http 0.2.12", - "hyper 0.14.29", - "log", - "rustls 0.20.9", - "rustls-native-certs 0.6.3", - "tokio", - "tokio-rustls 0.23.4", -] - [[package]] name = "hyper-rustls" version = "0.27.2" @@ -2631,7 +2596,7 @@ dependencies = [ "hyper 1.4.0", "hyper-util", "rustls 0.23.10", - "rustls-native-certs 0.7.1", + "rustls-native-certs", "rustls-pki-types", "tokio", "tokio-rustls 0.26.0", @@ -2814,59 +2779,6 @@ dependencies = [ "serde", ] -[[package]] -name = "ipfs-api" -version = "0.17.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d8cc57cf12ae4af611e53dd04053e1cfb815917c51c410aa30399bf377046ab" -dependencies = [ - "ipfs-api-backend-hyper", -] - -[[package]] -name = "ipfs-api-backend-hyper" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a9d131b408b4caafe1e7c00d410a09ad3eb7e3ab68690cf668e86904b2176b4" -dependencies = [ - "async-trait", - "base64 0.13.1", - "bytes", - "futures 0.3.30", - "http 0.2.12", - "hyper 0.14.29", - "hyper-multipart-rfc7578", - "hyper-rustls 0.23.2", - "ipfs-api-prelude", - "thiserror", -] - -[[package]] -name = "ipfs-api-prelude" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b74065805db266ba2c6edbd670b23c4714824a955628472b2e46cc9f3a869cb" -dependencies = [ - "async-trait", - "bytes", - "cfg-if 1.0.0", - "common-multipart-rfc7578", - "dirs 4.0.0", - "futures 0.3.30", - "http 0.2.12", - "multiaddr", - "multibase", - "serde", - "serde_json", - "serde_urlencoded", - "thiserror", - "tokio", - "tokio-util 0.7.11", - "tracing", - "typed-builder", - "walkdir", -] - [[package]] name = "ipnet" version = "2.9.0" @@ -3279,29 +3191,10 @@ dependencies = [ "httparse", "memchr", "mime", - "spin 0.9.8", + "spin", "version_check", ] -[[package]] -name = "multiaddr" -version = "0.17.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b36f567c7099511fa8612bbbb52dda2419ce0bdbacf31714e3a5ffdb766d3bd" -dependencies = [ - "arrayref", - "byteorder", - "data-encoding", - "log", - "multibase", - "multihash 0.17.0", - "percent-encoding", - "serde", - "static_assertions", - "unsigned-varint 0.7.2", - "url", -] - [[package]] name = "multibase" version = "0.9.1" @@ -3313,17 +3206,6 @@ dependencies = [ "data-encoding-macro", ] -[[package]] -name = "multihash" -version = "0.17.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "835d6ff01d610179fbce3de1694d007e500bf33a7f29689838941d6bf783ae40" -dependencies = [ - "core2", - "multihash-derive", - "unsigned-varint 0.7.2", -] - [[package]] name = "multihash" version = "0.19.1" @@ -3334,20 +3216,6 @@ dependencies = [ "unsigned-varint 0.7.2", ] -[[package]] -name = "multihash-derive" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d6d4752e6230d8ef7adf7bd5d8c4b1f6561c1014c5ba9a37445ccefe18aa1db" -dependencies = [ - "proc-macro-crate 1.1.3", - "proc-macro-error", - "proc-macro2", - "quote", - "syn 1.0.109", - "synstructure", -] - [[package]] name = "multimap" version = "0.8.3" @@ -3479,8 +3347,8 @@ dependencies = [ "quick-xml", "rand", "reqwest", - "ring 0.17.8", - "rustls-pemfile 2.1.2", + "ring", + "rustls-pemfile", "serde", "serde_json", "snafu", @@ -3581,7 +3449,7 @@ version = "3.6.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d830939c76d294956402033aee57a6da7b438f2294eb94864c37b0569053a42c" dependencies = [ - "proc-macro-crate 3.1.0", + "proc-macro-crate", "proc-macro2", "quote", "syn 1.0.109", @@ -3870,16 +3738,6 @@ dependencies = [ "indexmap 2.2.6", ] -[[package]] -name = "proc-macro-crate" -version = "1.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e17d47ce914bf4de440332250b0edd23ce48c005f59fab39d3335866b114f11a" -dependencies = [ - "thiserror", - "toml 0.5.11", -] - [[package]] name = "proc-macro-crate" version = "3.1.0" @@ -3889,30 +3747,6 @@ dependencies = [ "toml_edit 0.21.1", ] -[[package]] -name = "proc-macro-error" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" -dependencies = [ - "proc-macro-error-attr", - "proc-macro2", - "quote", - "syn 1.0.109", - "version_check", -] - -[[package]] -name = "proc-macro-error-attr" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" -dependencies = [ - "proc-macro2", - "quote", - "version_check", -] - [[package]] name = "proc-macro-utils" version = "0.10.0" @@ -4143,7 +3977,7 @@ checksum = "ddf517c03a109db8100448a4be38d498df8a210a99fe0e1b9eaf39e78c640efe" dependencies = [ "bytes", "rand", - "ring 0.17.8", + "ring", "rustc-hash", "rustls 0.23.10", "slab", @@ -4335,7 +4169,7 @@ dependencies = [ "http-body 1.0.0", "http-body-util", "hyper 1.4.0", - "hyper-rustls 0.27.2", + "hyper-rustls", "hyper-tls", "hyper-util", "ipnet", @@ -4349,8 +4183,8 @@ dependencies = [ "pin-project-lite", "quinn", "rustls 0.23.10", - "rustls-native-certs 0.7.1", - "rustls-pemfile 2.1.2", + "rustls-native-certs", + "rustls-pemfile", "rustls-pki-types", "serde", "serde_json", @@ -4370,21 +4204,6 @@ dependencies = [ "winreg", ] -[[package]] -name = "ring" -version = "0.16.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" -dependencies = [ - "cc", - "libc", - "once_cell", - "spin 0.5.2", - "untrusted 0.7.1", - "web-sys", - "winapi", -] - [[package]] name = "ring" version = "0.17.8" @@ -4395,8 +4214,8 @@ dependencies = [ "cfg-if 1.0.0", "getrandom", "libc", - "spin 0.9.8", - "untrusted 0.9.0", + "spin", + "untrusted", "windows-sys 0.52.0", ] @@ -4450,18 +4269,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "rustls" -version = "0.20.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b80e3dec595989ea8510028f30c408a4630db12c9cbb8de34203b89d6577e99" -dependencies = [ - "log", - "ring 0.16.20", - "sct", - "webpki", -] - [[package]] name = "rustls" version = "0.22.4" @@ -4469,7 +4276,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" dependencies = [ "log", - "ring 0.17.8", + "ring", "rustls-pki-types", "rustls-webpki", "subtle", @@ -4483,25 +4290,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05cff451f60db80f490f3c182b77c35260baace73209e9cdbbe526bfe3a4d402" dependencies = [ "once_cell", - "ring 0.17.8", + "ring", "rustls-pki-types", "rustls-webpki", "subtle", "zeroize", ] -[[package]] -name = "rustls-native-certs" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" -dependencies = [ - "openssl-probe", - "rustls-pemfile 1.0.4", - "schannel", - "security-framework", -] - [[package]] name = "rustls-native-certs" version = "0.7.1" @@ -4509,21 +4304,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a88d6d420651b496bdd98684116959239430022a115c1240e6c3993be0b15fba" dependencies = [ "openssl-probe", - "rustls-pemfile 2.1.2", + "rustls-pemfile", "rustls-pki-types", "schannel", "security-framework", ] -[[package]] -name = "rustls-pemfile" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" -dependencies = [ - "base64 0.21.7", -] - [[package]] name = "rustls-pemfile" version = "2.1.2" @@ -4546,9 +4332,9 @@ version = "0.102.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9a6fccd794a42c2c105b513a2f62bc3fd8f3ba57a4593677ceb0bd035164d78" dependencies = [ - "ring 0.17.8", + "ring", "rustls-pki-types", - "untrusted 0.9.0", + "untrusted", ] [[package]] @@ -4596,16 +4382,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" -[[package]] -name = "sct" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" -dependencies = [ - "ring 0.17.8", - "untrusted 0.9.0", -] - [[package]] name = "secp256k1" version = "0.21.3" @@ -4829,7 +4605,7 @@ version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da03fa3b94cc19e3ebfc88c4229c49d8f08cdbd1228870a45f0ffdf84988e14b" dependencies = [ - "dirs 5.0.1", + "dirs", ] [[package]] @@ -4983,12 +4759,6 @@ dependencies = [ "sha-1", ] -[[package]] -name = "spin" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" - [[package]] name = "spin" version = "0.9.8" @@ -5211,18 +4981,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" -[[package]] -name = "synstructure" -version = "0.12.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f36bdaa60a83aca3921b5259d5400cbf5e90fc51931376a9bd4a0eb79aa7210f" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", - "unicode-xid", -] - [[package]] name = "system-configuration" version = "0.5.1" @@ -5514,17 +5272,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-rustls" -version = "0.23.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" -dependencies = [ - "rustls 0.20.9", - "tokio", - "webpki", -] - [[package]] name = "tokio-rustls" version = "0.25.0" @@ -5699,8 +5446,8 @@ dependencies = [ "percent-encoding", "pin-project", "prost 0.12.6", - "rustls-native-certs 0.7.1", - "rustls-pemfile 2.1.2", + "rustls-native-certs", + "rustls-pemfile", "rustls-pki-types", "tokio", "tokio-rustls 0.25.0", @@ -5906,17 +5653,6 @@ dependencies = [ "utf-8", ] -[[package]] -name = "typed-builder" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89851716b67b937e393b3daa8423e67ddfc4bbbf1654bcf05488e95e0828db0c" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "typenum" version = "1.17.0" @@ -6028,12 +5764,6 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb066959b24b5196ae73cb057f45598450d2c5f71460e98c49b738086eff9c06" -[[package]] -name = "untrusted" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" - [[package]] name = "untrusted" version = "0.9.0" @@ -6611,16 +6341,6 @@ dependencies = [ "url", ] -[[package]] -name = "webpki" -version = "0.22.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53" -dependencies = [ - "ring 0.17.8", - "untrusted 0.9.0", -] - [[package]] name = "which" version = "4.4.2" @@ -6851,6 +6571,30 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "wiremock" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fff469918e7ca034884c7fd8f93fe27bacb7fcb599fd879df6c7b429a29b646" +dependencies = [ + "assert-json-diff", + "async-trait", + "base64 0.22.1", + "deadpool", + "futures 0.3.30", + "http 1.1.0", + "http-body-util", + "hyper 1.4.0", + "hyper-util", + "log", + "once_cell", + "regex", + "serde", + "serde_json", + "tokio", + "url", +] + [[package]] name = "wit-parser" version = "0.13.2" diff --git a/Cargo.toml b/Cargo.toml index 2a11e9b5d49..29f5d8ac0a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ async-graphql-axum = "7.0.6" axum = "0.7.5" chrono = "0.4.38" clap = { version = "4.5.4", features = ["derive", "env"] } +derivative = "2.2.0" diesel = { version = "2.1.3", features = ["postgres", "serde_json", "numeric", "r2d2", "chrono", "uuid"] } diesel-derive-enum = { version = "2.1.0", features = ["postgres"] } diesel-dynamic-schema = "0.2.1" diff --git a/core/Cargo.toml b/core/Cargo.toml index fb546d8a29d..e73834333b1 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -25,8 +25,4 @@ anyhow = "1.0" [dev-dependencies] tower-test = { git = "https://github.com/tower-rs/tower.git" } -ipfs-api-backend-hyper = "0.6" -ipfs-api = { version = "0.17.0", features = [ - "with-hyper-rustls", -], default-features = false } uuid = { version = "1.9.1", features = ["v4"] } diff --git a/core/src/polling_monitor/ipfs_service.rs b/core/src/polling_monitor/ipfs_service.rs index 89ab217fa71..24a9a6b9c6a 100644 --- a/core/src/polling_monitor/ipfs_service.rs +++ b/core/src/polling_monitor/ipfs_service.rs @@ -1,29 +1,27 @@ +use std::sync::Arc; +use std::time::Duration; + use anyhow::{anyhow, Error}; use bytes::Bytes; use graph::futures03::future::BoxFuture; -use graph::{ - derive::CheapClone, - ipfs_client::{CidFile, IpfsClient}, - prelude::CheapClone, -}; -use std::time::Duration; +use graph::ipfs::ContentPath; +use graph::ipfs::IpfsClient; +use graph::ipfs::IpfsError; +use graph::{derive::CheapClone, prelude::CheapClone}; use tower::{buffer::Buffer, ServiceBuilder, ServiceExt}; -const CLOUDFLARE_TIMEOUT: u16 = 524; -const GATEWAY_TIMEOUT: u16 = 504; - -pub type IpfsService = Buffer, Error>>>; +pub type IpfsService = Buffer, Error>>>; pub fn ipfs_service( - client: IpfsClient, + client: Arc, max_file_size: usize, timeout: Duration, rate_limit: u16, ) -> IpfsService { let ipfs = IpfsServiceInner { client, - max_file_size, timeout, + max_file_size, }; let svc = ServiceBuilder::new() @@ -38,37 +36,30 @@ pub fn ipfs_service( #[derive(Clone, CheapClone)] struct IpfsServiceInner { - client: IpfsClient, - max_file_size: usize, + client: Arc, timeout: Duration, + max_file_size: usize, } impl IpfsServiceInner { - async fn call_inner(self, req: CidFile) -> Result, Error> { - let CidFile { cid, path } = req; - let multihash = cid.hash().code(); + async fn call_inner(self, path: ContentPath) -> Result, Error> { + let multihash = path.cid().hash().code(); if !SAFE_MULTIHASHES.contains(&multihash) { return Err(anyhow!("CID multihash {} is not allowed", multihash)); } - let cid_str = match path { - Some(path) => format!("{}/{}", cid, path), - None => cid.to_string(), - }; - let res = self .client - .cat_all(&cid_str, Some(self.timeout), self.max_file_size) + .cat(&path, self.max_file_size, Some(self.timeout)) .await; match res { Ok(file_bytes) => Ok(Some(file_bytes)), - Err(e) => match e.status().map(|e| e.as_u16()) { - // Timeouts in IPFS mean the file is not available, so we return `None` - Some(GATEWAY_TIMEOUT) | Some(CLOUDFLARE_TIMEOUT) => return Ok(None), - _ if e.is_timeout() => return Ok(None), - _ => return Err(e.into()), - }, + Err(IpfsError::RequestFailed(err)) if err.is_timeout() => { + // Timeouts in IPFS mean that the content is not available, so we return `None`. + Ok(None) + } + Err(err) => Err(err.into()), } } } @@ -96,48 +87,42 @@ const SAFE_MULTIHASHES: [u64; 15] = [ #[cfg(test)] mod test { - use ipfs::IpfsApi; - use ipfs_api as ipfs; - use std::{fs, str::FromStr, time::Duration}; + use std::time::Duration; + + use graph::components::link_resolver::ArweaveClient; + use graph::components::link_resolver::ArweaveResolver; + use graph::data::value::Word; + use graph::ipfs::test_utils::add_files_to_local_ipfs_node_for_testing; + use graph::ipfs::IpfsRpcClient; + use graph::ipfs::ServerAddress; + use graph::tokio; use tower::ServiceExt; - - use cid::Cid; - use graph::{ - components::link_resolver::{ArweaveClient, ArweaveResolver}, - data::value::Word, - ipfs_client::IpfsClient, - tokio, - }; - use uuid::Uuid; + use super::*; + #[tokio::test] async fn cat_file_in_folder() { - let path = "./tests/fixtures/ipfs_folder"; - let uid = Uuid::new_v4().to_string(); - fs::write(format!("{}/random.txt", path), &uid).unwrap(); + let random_bytes = Uuid::new_v4().as_bytes().to_vec(); + let ipfs_file = ("dir/file.txt", random_bytes.clone()); - let cl: ipfs::IpfsClient = ipfs::IpfsClient::default(); + let add_resp = add_files_to_local_ipfs_node_for_testing([ipfs_file]) + .await + .unwrap(); - let rsp = cl.add_path(path).await.unwrap(); + let dir_cid = add_resp.into_iter().find(|x| x.name == "dir").unwrap().hash; - let ipfs_folder = rsp.iter().find(|rsp| rsp.name == "ipfs_folder").unwrap(); + let client = + IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &graph::log::discard()) + .unwrap() + .into_boxed(); - let local = IpfsClient::localhost(); - let cid = Cid::from_str(&ipfs_folder.hash).unwrap(); - let file = "random.txt".to_string(); + let svc = ipfs_service(client.into(), 100000, Duration::from_secs(30), 10); - let svc = super::ipfs_service(local, 100000, Duration::from_secs(5), 10); + let path = ContentPath::new(format!("{dir_cid}/file.txt")).unwrap(); + let content = svc.oneshot(path).await.unwrap().unwrap(); - let content = svc - .oneshot(super::CidFile { - cid, - path: Some(file), - }) - .await - .unwrap() - .unwrap(); - assert_eq!(content.to_vec(), uid.as_bytes().to_vec()); + assert_eq!(content.to_vec(), random_bytes); } #[tokio::test] diff --git a/core/src/subgraph/context/mod.rs b/core/src/subgraph/context/mod.rs index 6ffc5a5aa12..df0666a4f6a 100644 --- a/core/src/subgraph/context/mod.rs +++ b/core/src/subgraph/context/mod.rs @@ -18,7 +18,7 @@ use graph::{ CausalityRegion, DataSource, DataSourceTemplate, }, derive::CheapClone, - ipfs_client::CidFile, + ipfs::ContentPath, prelude::{ BlockNumber, BlockPtr, BlockState, CancelGuard, CheapClone, DeploymentHash, MetricsRegistry, RuntimeHostBuilder, SubgraphCountMetric, SubgraphInstanceMetrics, @@ -228,8 +228,8 @@ impl> IndexingContext { } pub struct OffchainMonitor { - ipfs_monitor: PollingMonitor, - ipfs_monitor_rx: mpsc::UnboundedReceiver<(CidFile, Bytes)>, + ipfs_monitor: PollingMonitor, + ipfs_monitor_rx: mpsc::UnboundedReceiver<(ContentPath, Bytes)>, arweave_monitor: PollingMonitor, arweave_monitor_rx: mpsc::UnboundedReceiver<(Base64, Bytes)>, } diff --git a/graph/Cargo.toml b/graph/Cargo.toml index 733746a259b..0687667763a 100644 --- a/graph/Cargo.toml +++ b/graph/Cargo.toml @@ -15,6 +15,7 @@ atomic_refcell = "0.1.13" old_bigdecimal = { version = "=0.1.2", features = ["serde"], package = "bigdecimal" } bytes = "1.0.1" cid = "0.11.1" +derivative = { workspace = true } graph_derive = { path = "./derive" } diesel = { workspace = true } diesel_derives = { workspace = true } @@ -90,7 +91,7 @@ defer = "0.2" # Our fork contains patches to make some fields optional for Celo and Fantom compatibility. # Without the "arbitrary_precision" feature, we get the error `data did not match any variant of untagged enum Response`. web3 = { git = "https://github.com/graphprotocol/rust-web3", branch = "graph-patches-onto-0.18", features = [ - "arbitrary_precision","test" + "arbitrary_precision", "test" ] } serde_plain = "1.0.2" csv = "1.3.0" @@ -100,6 +101,7 @@ object_store = { version = "0.10.1", features = ["gcp"] } clap.workspace = true maplit = "1.0.2" hex-literal = "0.4" +wiremock = "0.6.1" [build-dependencies] tonic-build = { workspace = true } diff --git a/graph/src/components/link_resolver/ipfs.rs b/graph/src/components/link_resolver/ipfs.rs index 627c9a95412..5064ab4b030 100644 --- a/graph/src/components/link_resolver/ipfs.rs +++ b/graph/src/components/link_resolver/ipfs.rs @@ -1,137 +1,60 @@ -use std::sync::{Arc, Mutex}; +use std::sync::Arc; +use std::sync::Mutex; use std::time::Duration; -use crate::env::EnvVars; -use crate::futures01::{stream::poll_fn, try_ready}; -use crate::futures01::{Async, Poll}; -use crate::ipfs_client::IpfsError; -use crate::util::futures::RetryConfigNoTimeout; use anyhow::anyhow; use async_trait::async_trait; use bytes::BytesMut; +use derivative::Derivative; use futures03::compat::Stream01CompatExt; -use futures03::future::TryFutureExt; -use futures03::stream::{FuturesUnordered, StreamExt, TryStreamExt}; +use futures03::stream::StreamExt; +use futures03::stream::TryStreamExt; use lru_time_cache::LruCache; use serde_json::Value; -use crate::{ - cheap_clone::CheapClone, - derive::CheapClone, - futures01::stream::Stream, - ipfs_client::IpfsClient, - prelude::{LinkResolver as LinkResolverTrait, *}, -}; - -fn retry_policy( - always_retry: bool, - op: &'static str, - logger: &Logger, -) -> RetryConfigNoTimeout { - // Even if retries were not requested, networking errors are still retried until we either get - // a valid HTTP response or a timeout. - if always_retry { - retry(op, logger).no_limit() - } else { - retry(op, logger) - .no_limit() - .when(|res: &Result<_, IpfsError>| match res { - Ok(_) => false, - Err(IpfsError::FileTooLarge(..)) => false, - Err(e) => !(e.is_status() || e.is_timeout()), - }) - } - .no_timeout() // The timeout should be set in the internal future. -} - -/// The IPFS APIs don't have a quick "do you have the file" function. Instead, we -/// just rely on whether an API times out. That makes sense for IPFS, but not for -/// our application. We want to be able to quickly select from a potential list -/// of clients where hopefully one already has the file, and just get the file -/// from that. -/// -/// The strategy here then is to cat a single byte as a proxy for "do you have the -/// file". Whichever client has or gets the file first wins. This API is a good -/// choice, because it doesn't involve us actually starting to download the file -/// from each client, which would be wasteful of bandwidth and memory in the -/// case multiple clients respond in a timely manner. -async fn select_fastest_client( - clients: Arc>, - logger: Logger, - path: String, - timeout: Duration, - do_retry: bool, -) -> Result { - if clients.len() == 1 { - return Ok(clients[0].cheap_clone()); - } - - let mut err: Option = None; - - let mut exists: FuturesUnordered<_> = clients - .iter() - .enumerate() - .map(|(i, c)| { - let c = c.cheap_clone(); - let path = path.clone(); - retry_policy(do_retry, "IPFS exists", &logger).run(move || { - let path = path.clone(); - let c = c.cheap_clone(); - async move { c.exists(&path, Some(timeout)).map_ok(|()| i).await } - }) - }) - .collect(); - - while let Some(result) = exists.next().await { - match result { - Ok(index) => { - return Ok(clients[index].cheap_clone()); - } - Err(e) => err = Some(e.into()), - } - } +use crate::derive::CheapClone; +use crate::env::EnvVars; +use crate::futures01::stream::poll_fn; +use crate::futures01::stream::Stream; +use crate::futures01::try_ready; +use crate::futures01::Async; +use crate::futures01::Poll; +use crate::ipfs::ContentPath; +use crate::ipfs::IpfsClient; +use crate::prelude::{LinkResolver as LinkResolverTrait, *}; + +#[derive(Clone, CheapClone, Derivative)] +#[derivative(Debug)] +pub struct IpfsResolver { + #[derivative(Debug = "ignore")] + client: Arc, - Err(err.unwrap_or_else(|| { - anyhow!( - "No IPFS clients were supplied to handle the call. File: {}", - path - ) - })) -} + #[derivative(Debug = "ignore")] + cache: Arc>>>, -#[derive(Clone, CheapClone)] -pub struct IpfsResolver { - clients: Arc>, - cache: Arc>>>, timeout: Duration, - retry: bool, - env_vars: Arc, + max_file_size: usize, + max_map_file_size: usize, + max_cache_file_size: usize, } impl IpfsResolver { - pub fn new(clients: Vec, env_vars: Arc) -> Self { + pub fn new(client: Arc, env_vars: Arc) -> Self { + let env = &env_vars.mappings; + Self { - clients: Arc::new(clients.into_iter().collect()), + client, cache: Arc::new(Mutex::new(LruCache::with_capacity( - env_vars.mappings.max_ipfs_cache_size as usize, + env.max_ipfs_cache_size as usize, ))), - timeout: env_vars.mappings.ipfs_timeout, - retry: false, - env_vars, + timeout: env.ipfs_timeout, + max_file_size: env.max_ipfs_file_bytes, + max_map_file_size: env.max_ipfs_map_file_size, + max_cache_file_size: env.max_ipfs_cache_file_size, } } } -impl Debug for IpfsResolver { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("LinkResolver") - .field("timeout", &self.timeout) - .field("retry", &self.retry) - .field("env_vars", &self.env_vars) - .finish() - } -} - #[async_trait] impl LinkResolverTrait for IpfsResolver { fn with_timeout(&self, timeout: Duration) -> Box { @@ -141,59 +64,41 @@ impl LinkResolverTrait for IpfsResolver { } fn with_retries(&self) -> Box { - let mut s = self.cheap_clone(); - s.retry = true; - Box::new(s) + // IPFS clients have internal retries enabled by default. + Box::new(self.cheap_clone()) } - /// Supports links of the form `/ipfs/ipfs_hash` or just `ipfs_hash`. async fn cat(&self, logger: &Logger, link: &Link) -> Result, Error> { - // Discard the `/ipfs/` prefix (if present) to get the hash. - let path = link.link.trim_start_matches("/ipfs/").to_owned(); + let path = ContentPath::new(&link.link)?; + let timeout = self.timeout; + let max_file_size = self.max_file_size; + let max_cache_file_size = self.max_cache_file_size; if let Some(data) = self.cache.lock().unwrap().get(&path) { - trace!(logger, "IPFS cache hit"; "hash" => &path); - return Ok(data.clone()); + trace!(logger, "IPFS cat cache hit"; "hash" => path.to_string()); + return Ok(data.to_owned()); } - trace!(logger, "IPFS cache miss"; "hash" => &path); - - let client = select_fastest_client( - self.clients.cheap_clone(), - logger.cheap_clone(), - path.clone(), - self.timeout, - self.retry, - ) - .await?; - let max_cache_file_size = self.env_vars.mappings.max_ipfs_cache_file_size; - let max_file_size = self.env_vars.mappings.max_ipfs_file_bytes; + trace!(logger, "IPFS cat cache miss"; "hash" => path.to_string()); - let req_path = path.clone(); - let timeout = self.timeout; - let data = retry_policy(self.retry, "ipfs.cat", logger) - .run(move || { - let path = req_path.clone(); - let client = client.clone(); - async move { - Ok(client - .cat_all(&path, Some(timeout), max_file_size) - .await? - .to_vec()) - } - }) - .await?; + let data = self + .client + .cat(&path, max_file_size, Some(timeout)) + .await? + .to_vec(); - // Only cache files if they are not too large if data.len() <= max_cache_file_size { let mut cache = self.cache.lock().unwrap(); + if !cache.contains_key(&path) { cache.insert(path.clone(), data.clone()); } } else { - debug!(logger, "File too large for cache"; - "path" => path, - "size" => data.len() + debug!( + logger, + "IPFS file too large for cache"; + "path" => path.to_string(), + "size" => data.len(), ); } @@ -201,50 +106,24 @@ impl LinkResolverTrait for IpfsResolver { } async fn get_block(&self, logger: &Logger, link: &Link) -> Result, Error> { - trace!(logger, "IPFS block get"; "hash" => &link.link); - let client = select_fastest_client( - self.clients.cheap_clone(), - logger.cheap_clone(), - link.link.clone(), - self.timeout, - self.retry, - ) - .await?; - - // Note: The IPFS protocol limits the size of blocks to 1MB, so we don't need to enforce size - // limits here. - let link = link.link.clone(); - let data = retry_policy(self.retry, "ipfs.getBlock", logger) - .run(move || { - let link = link.clone(); - let client = client.clone(); - async move { - let data = client.get_block(link.clone()).await?.to_vec(); - Result::, _>::Ok(data) - } - }) - .await?; + let path = ContentPath::new(&link.link)?; + let timeout = self.timeout; + + trace!(logger, "IPFS block get"; "hash" => path.to_string()); + + let data = self.client.get_block(&path, Some(timeout)).await?.to_vec(); Ok(data) } async fn json_stream(&self, logger: &Logger, link: &Link) -> Result { - // Discard the `/ipfs/` prefix (if present) to get the hash. - let path = link.link.trim_start_matches("/ipfs/").to_string(); - - let client = select_fastest_client( - self.clients.cheap_clone(), - logger.cheap_clone(), - path.to_string(), - self.timeout, - self.retry, - ) - .await?; + let path = ContentPath::new(&link.link)?; + let max_map_file_size = self.max_map_file_size; - let max_file_size = self.env_vars.mappings.max_ipfs_map_file_size; - let mut cummulative_file_size = 0; + trace!(logger, "IPFS JSON stream"; "hash" => path.to_string()); - let mut stream = client + let mut stream = self + .client .cat_stream(&path, None) .await? .fuse() @@ -259,16 +138,18 @@ impl LinkResolverTrait for IpfsResolver { // to the line number in the overall file let mut count = 0; + let mut cumulative_file_size = 0; + let stream: JsonValueStream = Box::pin( poll_fn(move || -> Poll, Error> { loop { - cummulative_file_size += buf.len(); + cumulative_file_size += buf.len(); - if cummulative_file_size > max_file_size { + if cumulative_file_size > max_map_file_size { return Err(anyhow!( "IPFS file {} is too large. It can be at most {} bytes", path, - max_file_size, + max_map_file_size, )); } @@ -324,9 +205,13 @@ impl LinkResolverTrait for IpfsResolver { #[cfg(test)] mod tests { + use serde_json::json; + use super::*; use crate::env::EnvVars; - use serde_json::json; + use crate::ipfs::test_utils::add_files_to_local_ipfs_node_for_testing; + use crate::ipfs::IpfsRpcClient; + use crate::ipfs::ServerAddress; #[tokio::test] async fn max_file_size() { @@ -334,32 +219,42 @@ mod tests { env_vars.mappings.max_ipfs_file_bytes = 200; let file: &[u8] = &[0u8; 201]; - let client = IpfsClient::localhost(); - let resolver = super::IpfsResolver::new(vec![client.clone()], Arc::new(env_vars)); - let logger = Logger::root(slog::Discard, o!()); + let cid = add_files_to_local_ipfs_node_for_testing([file.to_vec()]) + .await + .unwrap()[0] + .hash + .to_owned(); + + let logger = crate::log::discard(); + + let client = IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &logger) + .unwrap() + .into_boxed(); - let link = client.add(file.into()).await.unwrap().hash; - let err = IpfsResolver::cat(&resolver, &logger, &Link { link: link.clone() }) + let resolver = IpfsResolver::new(client.into(), Arc::new(env_vars)); + + let err = IpfsResolver::cat(&resolver, &logger, &Link { link: cid.clone() }) .await .unwrap_err(); + assert_eq!( err.to_string(), - format!( - "IPFS file {} is too large. It can be at most 200 bytes", - link - ) + format!("IPFS content from '{cid}' exceeds the 200 bytes limit") ); } async fn json_round_trip(text: &'static str, env_vars: EnvVars) -> Result, Error> { - let client = IpfsClient::localhost(); - let resolver = super::IpfsResolver::new(vec![client.clone()], Arc::new(env_vars)); + let cid = add_files_to_local_ipfs_node_for_testing([text.as_bytes().to_vec()]).await?[0] + .hash + .to_owned(); - let logger = Logger::root(slog::Discard, o!()); - let link = client.add(text.as_bytes().into()).await.unwrap().hash; + let logger = crate::log::discard(); + let client = + IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &logger)?.into_boxed(); + let resolver = IpfsResolver::new(client.into(), Arc::new(env_vars)); - let stream = IpfsResolver::json_stream(&resolver, &logger, &Link { link }).await?; + let stream = IpfsResolver::json_stream(&resolver, &logger, &Link { link: cid }).await?; stream.map_ok(|sv| sv.value).try_collect().await } diff --git a/graph/src/data_source/offchain.rs b/graph/src/data_source/offchain.rs index 34826e31625..46a77e8ba32 100644 --- a/graph/src/data_source/offchain.rs +++ b/graph/src/data_source/offchain.rs @@ -8,7 +8,7 @@ use crate::{ }, data::{store::scalar::Bytes, subgraph::SPEC_VERSION_0_0_7, value::Word}, data_source, - ipfs_client::CidFile, + ipfs::ContentPath, prelude::{DataSourceContext, Link}, schema::{EntityType, InputSchema}, }; @@ -47,8 +47,8 @@ impl OffchainDataSourceKind { pub fn try_parse_source(&self, bs: Bytes) -> Result { let source = match self { OffchainDataSourceKind::Ipfs => { - let cid_file = CidFile::try_from(bs)?; - Source::Ipfs(cid_file) + let path = ContentPath::try_from(bs)?; + Source::Ipfs(path) } OffchainDataSourceKind::Arweave => { let base64 = Word::from(String::from_utf8(bs.to_vec())?); @@ -187,7 +187,7 @@ impl DataSource { OffchainDataSourceKind::Ipfs => match source.parse() { Ok(source) => Source::Ipfs(source), // Ignore data sources created with an invalid CID. - Err(e) => return Err(DataSourceCreationError::Ignore(source, e)), + Err(e) => return Err(DataSourceCreationError::Ignore(source, e.into())), }, OffchainDataSourceKind::Arweave => Source::Arweave(Word::from(source)), }; @@ -313,7 +313,7 @@ pub type Base64 = Word; #[derive(Clone, Debug, Eq, PartialEq)] pub enum Source { - Ipfs(CidFile), + Ipfs(ContentPath), Arweave(Base64), } @@ -326,7 +326,7 @@ impl Source { /// the `source` of the data source is equal the `source` of the `TriggerData`. pub fn address(&self) -> Option> { match self { - Source::Ipfs(ref cid) => Some(cid.to_bytes()), + Source::Ipfs(ref path) => Some(path.to_string().as_bytes().to_vec()), Source::Arweave(ref base64) => Some(base64.as_bytes().to_vec()), } } @@ -335,7 +335,7 @@ impl Source { impl Into for Source { fn into(self) -> Bytes { match self { - Source::Ipfs(ref link) => Bytes::from(link.to_bytes()), + Source::Ipfs(ref path) => Bytes::from(path.to_string().as_bytes().to_vec()), Source::Arweave(ref base64) => Bytes::from(base64.as_bytes()), } } @@ -526,11 +526,9 @@ impl fmt::Debug for TriggerData { #[cfg(test)] mod test { - use std::str::FromStr; - use crate::{ data::{store::scalar::Bytes, value::Word}, - ipfs_client::CidFile, + ipfs::ContentPath, }; use super::{OffchainDataSourceKind, Source}; @@ -538,13 +536,13 @@ mod test { #[test] fn test_source_bytes_round_trip() { let base64 = "8APeQ5lW0-csTcBaGdPBDLAL2ci2AT9pTn2tppGPU_8"; - let cid = CidFile::from_str("QmVkvoPGi9jvvuxsHDVJDgzPEzagBaWSZRYoRDzU244HjZ").unwrap(); + let path = ContentPath::new("QmVkvoPGi9jvvuxsHDVJDgzPEzagBaWSZRYoRDzU244HjZ").unwrap(); - let ipfs_source: Bytes = Source::Ipfs(cid.clone()).into(); + let ipfs_source: Bytes = Source::Ipfs(path.clone()).into(); let s = OffchainDataSourceKind::Ipfs .try_parse_source(ipfs_source) .unwrap(); - assert! { matches!(s, Source::Ipfs(ipfs) if ipfs.eq(&cid))}; + assert! { matches!(s, Source::Ipfs(ipfs) if ipfs.eq(&path))}; let arweave_source = Source::Arweave(Word::from(base64)); let s = OffchainDataSourceKind::Arweave diff --git a/graph/src/data_source/tests.rs b/graph/src/data_source/tests.rs index 7a8750748d5..500c8cdb403 100644 --- a/graph/src/data_source/tests.rs +++ b/graph/src/data_source/tests.rs @@ -2,7 +2,7 @@ use cid::Cid; use crate::{ blockchain::mock::{MockBlockchain, MockDataSource}, - ipfs_client::CidFile, + ipfs::ContentPath, prelude::Link, }; @@ -31,10 +31,7 @@ fn offchain_duplicate() { assert!(!a.is_duplicate_of(&c)); let mut c = a.clone(); - c.source = Source::Ipfs(CidFile { - cid: Cid::default(), - path: Some("/foo".into()), - }); + c.source = Source::Ipfs(ContentPath::new(format!("{}/foo", Cid::default())).unwrap()); assert!(!a.is_duplicate_of(&c)); let mut c = a.clone(); @@ -73,10 +70,7 @@ fn new_datasource() -> offchain::DataSource { offchain::OffchainDataSourceKind::Ipfs, "theName".into(), 0, - Source::Ipfs(CidFile { - cid: Cid::default(), - path: None, - }), + Source::Ipfs(ContentPath::new(Cid::default().to_string()).unwrap()), Mapping { language: String::new(), api_version: Version::new(0, 0, 0), diff --git a/graph/src/ipfs/content_path.rs b/graph/src/ipfs/content_path.rs new file mode 100644 index 00000000000..3106d202d5e --- /dev/null +++ b/graph/src/ipfs/content_path.rs @@ -0,0 +1,236 @@ +use anyhow::anyhow; +use cid::Cid; + +use crate::ipfs::IpfsError; +use crate::ipfs::IpfsResult; + +#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] +/// Represents a path to some data on IPFS. +pub struct ContentPath { + cid: Cid, + path: Option, +} + +impl ContentPath { + /// Creates a new [ContentPath] from the specified input. + pub fn new(input: impl AsRef) -> IpfsResult { + let input = input.as_ref(); + + if input.is_empty() { + return Err(IpfsError::InvalidContentPath { + input: "".to_owned(), + source: anyhow!("path is empty"), + }); + } + + let (cid, path) = input + .strip_prefix("/ipfs/") + .unwrap_or(input) + .split_once('/') + .unwrap_or((input, "")); + + let cid = cid + .parse::() + .map_err(|err| IpfsError::InvalidContentPath { + input: input.to_owned(), + source: anyhow::Error::from(err).context("invalid CID"), + })?; + + if path.contains('?') { + return Err(IpfsError::InvalidContentPath { + input: input.to_owned(), + source: anyhow!("query parameters not allowed"), + }); + } + + Ok(Self { + cid, + path: (!path.is_empty()).then_some(path.to_owned()), + }) + } + + pub fn cid(&self) -> &Cid { + &self.cid + } + + pub fn path(&self) -> Option<&str> { + self.path.as_deref() + } +} + +impl std::str::FromStr for ContentPath { + type Err = IpfsError; + + fn from_str(s: &str) -> Result { + Self::new(s) + } +} + +impl TryFrom for ContentPath { + type Error = IpfsError; + + fn try_from(bytes: crate::data::store::scalar::Bytes) -> Result { + let s = String::from_utf8(bytes.to_vec()).map_err(|err| IpfsError::InvalidContentPath { + input: bytes.to_string(), + source: err.into(), + })?; + + Self::new(s) + } +} + +impl std::fmt::Display for ContentPath { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let cid = &self.cid; + + match self.path { + Some(ref path) => write!(f, "{cid}/{path}"), + None => write!(f, "{cid}"), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const CID_V0: &str = "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn"; + const CID_V1: &str = "bafybeiczsscdsbs7ffqz55asqdf3smv6klcw3gofszvwlyarci47bgf354"; + + #[test] + fn fails_on_empty_input() { + let err = ContentPath::new("").unwrap_err(); + + assert_eq!( + err.to_string(), + "'' is not a valid IPFS content path: path is empty", + ); + } + + #[test] + fn fails_on_an_invalid_cid() { + let err = ContentPath::new("not_a_cid").unwrap_err(); + + assert!(err + .to_string() + .starts_with("'not_a_cid' is not a valid IPFS content path: invalid CID: ")); + } + + #[test] + fn accepts_a_valid_cid_v0() { + let path = ContentPath::new(CID_V0).unwrap(); + + assert_eq!( + path, + ContentPath { + cid: CID_V0.parse().unwrap(), + path: None, + } + ); + } + + #[test] + fn accepts_a_valid_cid_v1() { + let path = ContentPath::new(CID_V1).unwrap(); + + assert_eq!( + path, + ContentPath { + cid: CID_V1.parse().unwrap(), + path: None, + } + ); + } + + #[test] + fn fails_on_a_leading_slash_followed_by_a_valid_cid() { + let err = ContentPath::new(format!("/{CID_V0}")).unwrap_err(); + + assert!(err.to_string().starts_with(&format!( + "'/{CID_V0}' is not a valid IPFS content path: invalid CID: " + ))); + } + + #[test] + fn ignores_the_first_slash_after_the_cid() { + let path = ContentPath::new(format!("{CID_V0}/")).unwrap(); + + assert_eq!( + path, + ContentPath { + cid: CID_V0.parse().unwrap(), + path: None, + } + ); + } + + #[test] + fn accepts_a_path_after_the_cid() { + let path = ContentPath::new(format!("{CID_V0}/readme.md")).unwrap(); + + assert_eq!( + path, + ContentPath { + cid: CID_V0.parse().unwrap(), + path: Some("readme.md".to_owned()), + } + ); + } + + #[test] + fn accepts_multiple_consecutive_slashes_after_the_cid() { + let path = ContentPath::new(format!("{CID_V0}//")).unwrap(); + + assert_eq!( + path, + ContentPath { + cid: CID_V0.parse().unwrap(), + path: Some("/".to_owned()), + } + ); + } + + #[test] + fn fails_on_an_invalid_cid_followed_by_a_path() { + let err = ContentPath::new("not_a_cid/readme.md").unwrap_err(); + + assert!(err + .to_string() + .starts_with("'not_a_cid/readme.md' is not a valid IPFS content path: invalid CID: ")); + } + + #[test] + fn fails_on_attempts_to_pass_query_parameters() { + let err = ContentPath::new(format!("{CID_V0}/readme.md?offline=true")).unwrap_err(); + + assert_eq!( + err.to_string(), + format!( + "'{CID_V0}/readme.md?offline=true' is not a valid IPFS content path: query parameters not allowed" + ) + ); + } + + #[test] + fn accepts_and_removes_the_ipfs_prefix() { + let path = ContentPath::new(format!("/ipfs/{CID_V0}")).unwrap(); + + assert_eq!( + path, + ContentPath { + cid: CID_V0.parse().unwrap(), + path: None, + } + ); + + let path = ContentPath::new(format!("/ipfs/{CID_V0}/readme.md")).unwrap(); + + assert_eq!( + path, + ContentPath { + cid: CID_V0.parse().unwrap(), + path: Some("readme.md".to_owned()), + } + ); + } +} diff --git a/graph/src/ipfs/error.rs b/graph/src/ipfs/error.rs new file mode 100644 index 00000000000..2ec3e25764f --- /dev/null +++ b/graph/src/ipfs/error.rs @@ -0,0 +1,99 @@ +use reqwest::StatusCode; +use thiserror::Error; + +use crate::ipfs::ContentPath; +use crate::ipfs::ServerAddress; + +#[derive(Debug, Error)] +pub enum IpfsError { + #[error("'{input}' is not a valid IPFS server address: {source:#}")] + InvalidServerAddress { + input: String, + source: anyhow::Error, + }, + + #[error("'{server_address}' is not a valid IPFS server: {reason:#}")] + InvalidServer { + server_address: ServerAddress, + + #[source] + reason: anyhow::Error, + }, + + #[error("'{input}' is not a valid IPFS content path: {source:#}")] + InvalidContentPath { + input: String, + source: anyhow::Error, + }, + + #[error("IPFS content from '{path}' is not available: {reason:#}")] + ContentNotAvailable { + path: ContentPath, + + #[source] + reason: anyhow::Error, + }, + + #[error("IPFS content from '{path}' exceeds the {max_size} bytes limit")] + ContentTooLarge { path: ContentPath, max_size: usize }, + + #[error(transparent)] + RequestFailed(RequestError), +} + +#[derive(Debug, Error)] +#[error("request to IPFS server failed: {0:#}")] +pub struct RequestError(reqwest::Error); + +impl IpfsError { + pub fn is_invalid_server(&self) -> bool { + matches!(self, Self::InvalidServer { .. }) + } +} + +impl From for IpfsError { + fn from(err: reqwest::Error) -> Self { + Self::RequestFailed(RequestError(err)) + } +} + +impl RequestError { + /// Returns true if the request failed due to a timeout. + pub fn is_timeout(&self) -> bool { + if self.0.is_timeout() { + return true; + } + + let Some(status) = self.0.status() else { + return false; + }; + + const CLOUDFLARE_CONNECTION_TIMEOUT: u16 = 522; + const CLOUDFLARE_REQUEST_TIMEOUT: u16 = 524; + + [ + StatusCode::REQUEST_TIMEOUT, + StatusCode::GATEWAY_TIMEOUT, + StatusCode::from_u16(CLOUDFLARE_CONNECTION_TIMEOUT).unwrap(), + StatusCode::from_u16(CLOUDFLARE_REQUEST_TIMEOUT).unwrap(), + ] + .into_iter() + .any(|x| status == x) + } + + /// Returns true if the request can be retried. + pub fn is_retriable(&self) -> bool { + let Some(status) = self.0.status() else { + return true; + }; + + [ + StatusCode::TOO_MANY_REQUESTS, + StatusCode::INTERNAL_SERVER_ERROR, + StatusCode::BAD_GATEWAY, + StatusCode::SERVICE_UNAVAILABLE, + ] + .into_iter() + .any(|x| status == x) + } +} diff --git a/graph/src/ipfs/gateway_client.rs b/graph/src/ipfs/gateway_client.rs new file mode 100644 index 00000000000..87eb25b48a0 --- /dev/null +++ b/graph/src/ipfs/gateway_client.rs @@ -0,0 +1,702 @@ +use std::time::Duration; + +use anyhow::anyhow; +use async_trait::async_trait; +use bytes::Bytes; +use bytes::BytesMut; +use derivative::Derivative; +use futures03::stream::BoxStream; +use futures03::StreamExt; +use futures03::TryStreamExt; +use http::header::ACCEPT; +use http::header::CACHE_CONTROL; +use reqwest::StatusCode; +use slog::Logger; + +use crate::derive::CheapClone; +use crate::ipfs::retry_policy::retry_policy; +use crate::ipfs::CanProvide; +use crate::ipfs::Cat; +use crate::ipfs::CatStream; +use crate::ipfs::ContentPath; +use crate::ipfs::GetBlock; +use crate::ipfs::IpfsClient; +use crate::ipfs::IpfsError; +use crate::ipfs::IpfsResult; +use crate::ipfs::ServerAddress; + +/// The request that verifies that the IPFS gateway is accessible is generally fast because +/// it does not involve querying the distributed network. +const TEST_REQUEST_TIMEOUT: Duration = Duration::from_secs(300); + +#[derive(Clone, CheapClone, Derivative)] +#[derivative(Debug)] +/// A client that connects to an IPFS gateway. +/// +/// Reference: +pub struct IpfsGatewayClient { + server_address: ServerAddress, + + #[derivative(Debug = "ignore")] + http_client: reqwest::Client, + + logger: Logger, + test_request_timeout: Duration, +} + +impl IpfsGatewayClient { + /// Creates a new [IpfsGatewayClient] with the specified server address. + /// Verifies that the server is responding to IPFS gateway requests. + pub async fn new(server_address: impl AsRef, logger: &Logger) -> IpfsResult { + let client = Self::new_unchecked(server_address, logger)?; + + client + .send_test_request() + .await + .map_err(|reason| IpfsError::InvalidServer { + server_address: client.server_address.clone(), + reason, + })?; + + Ok(client) + } + + /// Creates a new [IpfsGatewayClient] with the specified server address. + /// Does not verify that the server is responding to IPFS gateway requests. + pub fn new_unchecked(server_address: impl AsRef, logger: &Logger) -> IpfsResult { + Ok(Self { + server_address: ServerAddress::new(server_address)?, + http_client: reqwest::Client::new(), + logger: logger.to_owned(), + test_request_timeout: TEST_REQUEST_TIMEOUT, + }) + } + + pub fn into_boxed(self) -> Box { + Box::new(self) + } + + async fn send_test_request(&self) -> anyhow::Result<()> { + // To successfully perform this test, it does not really matter which CID we use. + const RANDOM_CID: &str = "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn"; + + // A special request described in the specification that should instruct the gateway + // to perform a very quick local check and return either HTTP status 200, which would + // mean the server has the content locally cached, or a 412 error, which would mean the + // content is not locally cached. This information is sufficient to verify that the + // server behaves like an IPFS gateway. + let req = self + .http_client + .head(self.ipfs_url(RANDOM_CID)) + .header(CACHE_CONTROL, "only-if-cached") + .timeout(self.test_request_timeout); + + let ok = retry_policy("IPFS.Gateway.send_test_request", &self.logger) + .run(move || { + let req = req.try_clone().expect("request can be cloned"); + + async move { + let resp = req.send().await?; + let status = resp.status(); + + if status == StatusCode::OK || status == StatusCode::PRECONDITION_FAILED { + return Ok(true); + } + + resp.error_for_status()?; + + Ok(false) + } + }) + .await?; + + if !ok { + return Err(anyhow!("not a gateway")); + } + + Ok(()) + } + + fn ipfs_url(&self, path_and_query: impl AsRef) -> String { + format!("{}ipfs/{}", self.server_address, path_and_query.as_ref()) + } +} + +#[async_trait] +impl CanProvide for IpfsGatewayClient { + async fn can_provide(&self, path: &ContentPath, timeout: Option) -> IpfsResult { + let url = self.ipfs_url(path.to_string()); + let mut req = self.http_client.head(url); + + if let Some(timeout) = timeout { + req = req.timeout(timeout); + } + + retry_policy("IPFS.Gateway.can_provide", &self.logger) + .run(move || { + let req = req.try_clone().expect("request can be cloned"); + + async move { + let status = req.send().await?.error_for_status()?.status(); + + Ok(status == StatusCode::OK) + } + }) + .await + } +} + +#[async_trait] +impl CatStream for IpfsGatewayClient { + async fn cat_stream( + &self, + path: &ContentPath, + timeout: Option, + ) -> IpfsResult>> { + let url = self.ipfs_url(path.to_string()); + let mut req = self.http_client.get(url); + + if let Some(timeout) = timeout { + req = req.timeout(timeout); + } + + let resp = retry_policy("IPFS.Gateway.cat_stream", &self.logger) + .run(move || { + let req = req.try_clone().expect("request can be cloned"); + + async move { Ok(req.send().await?.error_for_status()?) } + }) + .await?; + + Ok(resp.bytes_stream().err_into().boxed()) + } +} + +#[async_trait] +impl Cat for IpfsGatewayClient { + async fn cat( + &self, + path: &ContentPath, + max_size: usize, + timeout: Option, + ) -> IpfsResult { + let url = self.ipfs_url(path.to_string()); + let mut req = self.http_client.get(url); + + if let Some(timeout) = timeout { + req = req.timeout(timeout); + } + + let path = path.to_owned(); + + retry_policy("IPFS.Gateway.cat", &self.logger) + .run(move || { + let path = path.clone(); + let req = req.try_clone().expect("request can be cloned"); + + async move { + let content = req + .send() + .await? + .error_for_status()? + .bytes_stream() + .err_into() + .try_fold(BytesMut::new(), |mut acc, chunk| async { + acc.extend(chunk); + + if acc.len() > max_size { + return Err(IpfsError::ContentTooLarge { + path: path.clone(), + max_size, + }); + } + + Ok(acc) + }) + .await?; + + Ok(content.into()) + } + }) + .await + } +} + +#[async_trait] +impl GetBlock for IpfsGatewayClient { + async fn get_block(&self, path: &ContentPath, timeout: Option) -> IpfsResult { + let url = self.ipfs_url(format!("{path}?format=raw")); + + let mut req = self + .http_client + .get(url) + .header(ACCEPT, "application/vnd.ipld.raw"); + + if let Some(timeout) = timeout { + req = req.timeout(timeout); + } + + retry_policy("IPFS.Gateway.get_block", &self.logger) + .run(move || { + let req = req.try_clone().expect("request can be cloned"); + + async move { Ok(req.send().await?.error_for_status()?.bytes().await?) } + }) + .await + } +} + +#[cfg(test)] +mod tests { + use wiremock::matchers as m; + use wiremock::Mock; + use wiremock::MockBuilder; + use wiremock::MockServer; + use wiremock::ResponseTemplate; + + use super::*; + use crate::log::discard; + + const PATH: &str = "/ipfs/QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn"; + + async fn mock_server() -> MockServer { + MockServer::start().await + } + + fn mock_head() -> MockBuilder { + Mock::given(m::method("HEAD")).and(m::path(PATH)) + } + + fn mock_get() -> MockBuilder { + Mock::given(m::method("GET")).and(m::path(PATH)) + } + + fn mock_gateway_check(status: StatusCode) -> Mock { + mock_head() + .and(m::header("Cache-Control", "only-if-cached")) + .respond_with(ResponseTemplate::new(status)) + } + + fn mock_get_block() -> MockBuilder { + mock_get() + .and(m::query_param("format", "raw")) + .and(m::header("Accept", "application/vnd.ipld.raw")) + } + + async fn make_client() -> (MockServer, IpfsGatewayClient) { + let server = mock_server().await; + let client = IpfsGatewayClient::new_unchecked(server.uri(), &discard()).unwrap(); + + (server, client) + } + + fn make_path() -> ContentPath { + ContentPath::new(PATH).unwrap() + } + + fn ms(millis: u64) -> Duration { + Duration::from_millis(millis) + } + + #[tokio::test] + async fn new_fails_to_create_the_client_if_gateway_is_not_accessible() { + let server = mock_server().await; + + IpfsGatewayClient::new(server.uri(), &discard()) + .await + .unwrap_err(); + } + + #[tokio::test] + async fn new_creates_the_client_if_it_can_check_the_gateway() { + let server = mock_server().await; + + // Test content is cached locally on the gateway. + mock_gateway_check(StatusCode::OK) + .up_to_n_times(1) + .expect(1) + .mount(&server) + .await; + + IpfsGatewayClient::new(server.uri(), &discard()) + .await + .unwrap(); + + // Test content is not cached locally on the gateway. + mock_gateway_check(StatusCode::PRECONDITION_FAILED) + .expect(1) + .mount(&server) + .await; + + IpfsGatewayClient::new(server.uri(), &discard()) + .await + .unwrap(); + } + + #[tokio::test] + async fn new_retries_gateway_check_on_retriable_errors() { + let server = mock_server().await; + + mock_gateway_check(StatusCode::INTERNAL_SERVER_ERROR) + .up_to_n_times(1) + .expect(1) + .mount(&server) + .await; + + mock_gateway_check(StatusCode::OK) + .expect(1) + .mount(&server) + .await; + + IpfsGatewayClient::new(server.uri(), &discard()) + .await + .unwrap(); + } + + #[tokio::test] + async fn new_does_not_retry_gateway_check_on_non_retriable_errors() { + let server = mock_server().await; + + mock_gateway_check(StatusCode::METHOD_NOT_ALLOWED) + .expect(1) + .mount(&server) + .await; + + IpfsGatewayClient::new(server.uri(), &discard()) + .await + .unwrap_err(); + } + + #[tokio::test] + async fn new_unchecked_creates_the_client_without_checking_the_gateway() { + let server = mock_server().await; + + IpfsGatewayClient::new_unchecked(server.uri(), &discard()).unwrap(); + } + + #[tokio::test] + async fn can_provide_returns_true_when_content_is_available() { + let (server, client) = make_client().await; + + mock_head() + .respond_with(ResponseTemplate::new(StatusCode::OK)) + .expect(1) + .mount(&server) + .await; + + let ok = client.can_provide(&make_path(), None).await.unwrap(); + + assert!(ok); + } + + #[tokio::test] + async fn can_provide_returns_false_when_content_is_not_completely_available() { + let (server, client) = make_client().await; + + mock_head() + .respond_with(ResponseTemplate::new(StatusCode::PARTIAL_CONTENT)) + .expect(1) + .mount(&server) + .await; + + let ok = client.can_provide(&make_path(), None).await.unwrap(); + + assert!(!ok); + } + + #[tokio::test] + async fn can_provide_fails_on_timeout() { + let (server, client) = make_client().await; + + mock_head() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_delay(ms(500))) + .expect(1) + .mount(&server) + .await; + + client + .can_provide(&make_path(), Some(ms(300))) + .await + .unwrap_err(); + } + + #[tokio::test] + async fn can_provide_retries_the_request_on_retriable_errors() { + let (server, client) = make_client().await; + + mock_head() + .respond_with(ResponseTemplate::new(StatusCode::INTERNAL_SERVER_ERROR)) + .up_to_n_times(1) + .expect(1) + .mount(&server) + .await; + + mock_head() + .respond_with(ResponseTemplate::new(StatusCode::OK)) + .expect(1) + .mount(&server) + .await; + + let ok = client.can_provide(&make_path(), None).await.unwrap(); + + assert!(ok); + } + + #[tokio::test] + async fn can_provide_does_not_retry_the_request_on_non_retriable_errors() { + let (server, client) = make_client().await; + + mock_head() + .respond_with(ResponseTemplate::new(StatusCode::GATEWAY_TIMEOUT)) + .expect(1) + .mount(&server) + .await; + + client.can_provide(&make_path(), None).await.unwrap_err(); + } + + #[tokio::test] + async fn cat_stream_returns_the_content() { + let (server, client) = make_client().await; + + mock_get() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_body_bytes(b"some data")) + .expect(1) + .mount(&server) + .await; + + let content = client + .cat_stream(&make_path(), None) + .await + .unwrap() + .try_fold(BytesMut::new(), |mut acc, chunk| async { + acc.extend(chunk); + + Ok(acc) + }) + .await + .unwrap(); + + assert_eq!(content.as_ref(), b"some data") + } + + #[tokio::test] + async fn cat_stream_fails_on_timeout() { + let (server, client) = make_client().await; + + mock_get() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_delay(ms(500))) + .expect(1) + .mount(&server) + .await; + + let result = client.cat_stream(&make_path(), Some(ms(300))).await; + + assert!(matches!(result, Err(_))); + } + + #[tokio::test] + async fn cat_stream_retries_the_request_on_retriable_errors() { + let (server, client) = make_client().await; + + mock_get() + .respond_with(ResponseTemplate::new(StatusCode::INTERNAL_SERVER_ERROR)) + .up_to_n_times(1) + .expect(1) + .mount(&server) + .await; + + mock_get() + .respond_with(ResponseTemplate::new(StatusCode::OK)) + .expect(1) + .mount(&server) + .await; + + let _stream = client.cat_stream(&make_path(), None).await.unwrap(); + } + + #[tokio::test] + async fn cat_stream_does_not_retry_the_request_on_non_retriable_errors() { + let (server, client) = make_client().await; + + mock_get() + .respond_with(ResponseTemplate::new(StatusCode::GATEWAY_TIMEOUT)) + .expect(1) + .mount(&server) + .await; + + let result = client.cat_stream(&make_path(), None).await; + + assert!(matches!(result, Err(_))); + } + + #[tokio::test] + async fn cat_returns_the_content() { + let (server, client) = make_client().await; + + mock_get() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_body_bytes(b"some data")) + .expect(1) + .mount(&server) + .await; + + let content = client.cat(&make_path(), usize::MAX, None).await.unwrap(); + + assert_eq!(content.as_ref(), b"some data"); + } + + #[tokio::test] + async fn cat_returns_the_content_if_max_size_is_equal_to_the_content_size() { + let (server, client) = make_client().await; + + let data = b"some data"; + + mock_get() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_body_bytes(data)) + .expect(1) + .mount(&server) + .await; + + let content = client.cat(&make_path(), data.len(), None).await.unwrap(); + + assert_eq!(content.as_ref(), data); + } + + #[tokio::test] + async fn cat_fails_if_content_is_too_large() { + let (server, client) = make_client().await; + + let data = b"some data"; + + mock_get() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_body_bytes(data)) + .expect(1) + .mount(&server) + .await; + + client + .cat(&make_path(), data.len() - 1, None) + .await + .unwrap_err(); + } + + #[tokio::test] + async fn cat_fails_on_timeout() { + let (server, client) = make_client().await; + + mock_get() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_delay(ms(500))) + .expect(1) + .mount(&server) + .await; + + client + .cat(&make_path(), usize::MAX, Some(ms(300))) + .await + .unwrap_err(); + } + + #[tokio::test] + async fn cat_retries_the_request_on_retriable_errors() { + let (server, client) = make_client().await; + + mock_get() + .respond_with(ResponseTemplate::new(StatusCode::INTERNAL_SERVER_ERROR)) + .up_to_n_times(1) + .expect(1) + .mount(&server) + .await; + + mock_get() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_body_bytes(b"some data")) + .expect(1) + .mount(&server) + .await; + + let content = client.cat(&make_path(), usize::MAX, None).await.unwrap(); + + assert_eq!(content.as_ref(), b"some data"); + } + + #[tokio::test] + async fn cat_does_not_retry_the_request_on_non_retriable_errors() { + let (server, client) = make_client().await; + + mock_get() + .respond_with(ResponseTemplate::new(StatusCode::GATEWAY_TIMEOUT)) + .expect(1) + .mount(&server) + .await; + + client + .cat(&make_path(), usize::MAX, None) + .await + .unwrap_err(); + } + + #[tokio::test] + async fn get_block_returns_the_block_content() { + let (server, client) = make_client().await; + + mock_get_block() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_body_bytes(b"some data")) + .expect(1) + .mount(&server) + .await; + + let block = client.get_block(&make_path(), None).await.unwrap(); + + assert_eq!(block.as_ref(), b"some data"); + } + + #[tokio::test] + async fn get_block_fails_on_timeout() { + let (server, client) = make_client().await; + + mock_get_block() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_delay(ms(500))) + .expect(1) + .mount(&server) + .await; + + client + .get_block(&make_path(), Some(ms(300))) + .await + .unwrap_err(); + } + + #[tokio::test] + async fn get_block_retries_the_request_on_retriable_errors() { + let (server, client) = make_client().await; + + mock_get_block() + .respond_with(ResponseTemplate::new(StatusCode::INTERNAL_SERVER_ERROR)) + .up_to_n_times(1) + .expect(1) + .mount(&server) + .await; + + mock_get_block() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_body_bytes(b"some data")) + .expect(1) + .mount(&server) + .await; + + let block = client.get_block(&make_path(), None).await.unwrap(); + + assert_eq!(block.as_ref(), b"some data"); + } + + #[tokio::test] + async fn get_block_does_not_retry_the_request_on_non_retriable_errors() { + let (server, client) = make_client().await; + + mock_get_block() + .respond_with(ResponseTemplate::new(StatusCode::GATEWAY_TIMEOUT)) + .expect(1) + .mount(&server) + .await; + + client.get_block(&make_path(), None).await.unwrap_err(); + } +} diff --git a/graph/src/ipfs/mod.rs b/graph/src/ipfs/mod.rs new file mode 100644 index 00000000000..038897d7dcd --- /dev/null +++ b/graph/src/ipfs/mod.rs @@ -0,0 +1,148 @@ +use std::sync::Arc; +use std::time::Duration; + +use anyhow::anyhow; +use async_trait::async_trait; +use bytes::Bytes; +use futures03::stream::BoxStream; +use slog::info; +use slog::Logger; + +use crate::util::security::SafeDisplay; + +mod content_path; +mod error; +mod gateway_client; +mod pool; +mod retry_policy; +mod rpc_client; +mod server_address; + +pub mod test_utils; + +pub use self::content_path::ContentPath; +pub use self::error::IpfsError; +pub use self::error::RequestError; +pub use self::gateway_client::IpfsGatewayClient; +pub use self::rpc_client::IpfsRpcClient; +pub use self::server_address::ServerAddress; + +pub type IpfsResult = Result; + +/// Describes a read-only connection to an IPFS server. +pub trait IpfsClient: CanProvide + CatStream + Cat + GetBlock + Send + Sync + 'static {} + +#[async_trait] +/// Checks if the server can provide data from the specified content path. +pub trait CanProvide { + /// Checks if the server can provide data from the specified content path. + async fn can_provide(&self, path: &ContentPath, timeout: Option) -> IpfsResult; +} + +#[async_trait] +/// Streams data from the specified content path. +pub trait CatStream { + /// Streams data from the specified content path. + async fn cat_stream( + &self, + path: &ContentPath, + timeout: Option, + ) -> IpfsResult>>; +} + +#[async_trait] +/// Downloads data from the specified content path. +pub trait Cat { + /// Downloads data from the specified content path. + async fn cat( + &self, + path: &ContentPath, + max_size: usize, + timeout: Option, + ) -> IpfsResult; +} + +#[async_trait] +/// Downloads an IPFS block in raw format. +pub trait GetBlock { + /// Downloads an IPFS block in raw format. + async fn get_block(&self, path: &ContentPath, timeout: Option) -> IpfsResult; +} + +impl IpfsClient for T where T: CanProvide + CatStream + Cat + GetBlock + Send + Sync + 'static {} + +/// Creates and returns the most appropriate IPFS client for the given IPFS server addresses. +/// +/// If multiple IPFS server addresses are specified, an IPFS client pool is created internally +/// and for each IPFS read request, the fastest client that can provide the content is +/// automatically selected and the request is forwarded to that client. +pub async fn new_ipfs_client( + server_addresses: I, + logger: &Logger, +) -> IpfsResult> +where + I: IntoIterator, + S: AsRef, +{ + let mut clients = Vec::new(); + + for server_address in server_addresses { + let server_address = server_address.as_ref(); + + info!( + logger, + "Connecting to IPFS server at '{}'", + SafeDisplay(server_address) + ); + + match IpfsGatewayClient::new(server_address, logger).await { + Ok(client) => { + info!( + logger, + "Successfully connected to IPFS gateway at: '{}'", + SafeDisplay(server_address) + ); + + clients.push(client.into_boxed()); + continue; + } + Err(err) if err.is_invalid_server() => {} + Err(err) => return Err(err), + }; + + match IpfsRpcClient::new(server_address, logger).await { + Ok(client) => { + info!( + logger, + "Successfully connected to IPFS RPC API at: '{}'", + SafeDisplay(server_address) + ); + + clients.push(client.into_boxed()); + continue; + } + Err(err) if err.is_invalid_server() => {} + Err(err) => return Err(err), + }; + + return Err(IpfsError::InvalidServer { + server_address: server_address.parse()?, + reason: anyhow!("unknown server kind"), + }); + } + + match clients.len() { + 0 => Err(IpfsError::InvalidServerAddress { + input: "".to_owned(), + source: anyhow!("at least one server address is required"), + }), + 1 => Ok(clients.pop().unwrap().into()), + n => { + info!(logger, "Creating a pool of {} IPFS clients", n); + + let pool = pool::IpfsClientPool::with_clients(clients); + + Ok(pool.into_boxed().into()) + } + } +} diff --git a/graph/src/ipfs/pool.rs b/graph/src/ipfs/pool.rs new file mode 100644 index 00000000000..0fb5ce8dc5b --- /dev/null +++ b/graph/src/ipfs/pool.rs @@ -0,0 +1,271 @@ +use std::time::Duration; + +use anyhow::anyhow; +use async_trait::async_trait; +use bytes::Bytes; +use futures03::stream::BoxStream; +use futures03::stream::FuturesUnordered; +use futures03::stream::StreamExt; + +use crate::ipfs::CanProvide; +use crate::ipfs::Cat; +use crate::ipfs::CatStream; +use crate::ipfs::ContentPath; +use crate::ipfs::GetBlock; +use crate::ipfs::IpfsClient; +use crate::ipfs::IpfsError; +use crate::ipfs::IpfsResult; + +/// Contains a list of IPFS clients and, for each read request, selects the fastest IPFS client +/// that can provide the content and forwards the request to that client. +/// +/// This can significantly improve performance when using multiple IPFS gateways, +/// as some of them may already have the content cached. +/// +/// Note: It should remain an implementation detail and not be used directly. +pub(super) struct IpfsClientPool { + inner: Vec>, +} + +impl IpfsClientPool { + pub(super) fn with_clients(clients: Vec>) -> Self { + Self { inner: clients } + } + + pub(super) fn into_boxed(self) -> Box { + Box::new(self) + } +} + +#[async_trait] +impl CanProvide for IpfsClientPool { + async fn can_provide(&self, path: &ContentPath, timeout: Option) -> IpfsResult { + select_fastest_ipfs_client(&self.inner, path, timeout) + .await + .map(|_client| true) + } +} + +#[async_trait] +impl CatStream for IpfsClientPool { + async fn cat_stream( + &self, + path: &ContentPath, + timeout: Option, + ) -> IpfsResult>> { + let client = select_fastest_ipfs_client(&self.inner, path, timeout).await?; + + client.cat_stream(path, timeout).await + } +} + +#[async_trait] +impl Cat for IpfsClientPool { + async fn cat( + &self, + path: &ContentPath, + max_size: usize, + timeout: Option, + ) -> IpfsResult { + let client = select_fastest_ipfs_client(&self.inner, path, timeout).await?; + + client.cat(path, max_size, timeout).await + } +} + +#[async_trait] +impl GetBlock for IpfsClientPool { + async fn get_block(&self, path: &ContentPath, timeout: Option) -> IpfsResult { + let client = select_fastest_ipfs_client(&self.inner, path, timeout).await?; + + client.get_block(path, timeout).await + } +} + +/// Returns the first IPFS client that can provide the content from the specified path. +async fn select_fastest_ipfs_client<'a>( + clients: &'a [Box], + path: &ContentPath, + timeout: Option, +) -> IpfsResult<&'a dyn IpfsClient> { + let mut futs = clients + .iter() + .enumerate() + .map(|(i, client)| async move { + client + .can_provide(path, timeout) + .await + .map(|ok| ok.then_some(i)) + }) + .collect::>(); + + let mut last_err = None; + + while let Some(result) = futs.next().await { + match result { + Ok(Some(i)) => return Ok(clients[i].as_ref()), + Ok(None) => continue, + Err(err) => last_err = Some(err), + }; + } + + let err = last_err.unwrap_or_else(|| IpfsError::ContentNotAvailable { + path: path.to_owned(), + reason: anyhow!("no clients can provide the content"), + }); + + Err(err) +} + +#[cfg(test)] +mod tests { + use http::StatusCode; + use wiremock::matchers as m; + use wiremock::Mock; + use wiremock::MockBuilder; + use wiremock::MockServer; + use wiremock::ResponseTemplate; + + use super::*; + use crate::ipfs::IpfsGatewayClient; + use crate::log::discard; + + const PATH: &str = "/ipfs/QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn"; + + fn mock_head() -> MockBuilder { + Mock::given(m::method("HEAD")).and(m::path(PATH)) + } + + fn mock_get() -> MockBuilder { + Mock::given(m::method("GET")).and(m::path(PATH)) + } + + async fn make_client() -> (MockServer, IpfsGatewayClient) { + let server = MockServer::start().await; + let client = IpfsGatewayClient::new_unchecked(server.uri(), &discard()).unwrap(); + + (server, client) + } + + fn make_path() -> ContentPath { + ContentPath::new(PATH).unwrap() + } + + fn ms(millis: u64) -> Duration { + Duration::from_millis(millis) + } + + #[tokio::test] + async fn can_provide_returns_true_if_any_client_can_provide_the_content() { + let (server_1, client_1) = make_client().await; + let (server_2, client_2) = make_client().await; + + mock_head() + .respond_with( + ResponseTemplate::new(StatusCode::INTERNAL_SERVER_ERROR).set_delay(ms(100)), + ) + .expect(1..) + .mount(&server_1) + .await; + + mock_head() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_delay(ms(200))) + .expect(1) + .mount(&server_2) + .await; + + let clients = vec![client_1.into_boxed(), client_2.into_boxed()]; + let pool = IpfsClientPool::with_clients(clients); + let ok = pool.can_provide(&make_path(), None).await.unwrap(); + + assert!(ok); + } + + #[tokio::test] + async fn cat_stream_forwards_the_request_to_the_fastest_client_that_can_provide_the_content() { + let (server_1, client_1) = make_client().await; + let (server_2, client_2) = make_client().await; + + mock_head() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_delay(ms(200))) + .expect(1) + .mount(&server_1) + .await; + + mock_head() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_delay(ms(100))) + .expect(1) + .mount(&server_2) + .await; + + mock_get() + .respond_with(ResponseTemplate::new(StatusCode::OK)) + .expect(1) + .mount(&server_2) + .await; + + let clients = vec![client_1.into_boxed(), client_2.into_boxed()]; + let pool = IpfsClientPool::with_clients(clients); + let _stream = pool.cat_stream(&make_path(), None).await.unwrap(); + } + + #[tokio::test] + async fn cat_forwards_the_request_to_the_fastest_client_that_can_provide_the_content() { + let (server_1, client_1) = make_client().await; + let (server_2, client_2) = make_client().await; + + mock_head() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_delay(ms(200))) + .expect(1) + .mount(&server_1) + .await; + + mock_head() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_delay(ms(100))) + .expect(1) + .mount(&server_2) + .await; + + mock_get() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_body_bytes(b"some data")) + .expect(1) + .mount(&server_2) + .await; + + let clients = vec![client_1.into_boxed(), client_2.into_boxed()]; + let pool = IpfsClientPool::with_clients(clients); + let content = pool.cat(&make_path(), usize::MAX, None).await.unwrap(); + + assert_eq!(content.as_ref(), b"some data") + } + + #[tokio::test] + async fn get_block_forwards_the_request_to_the_fastest_client_that_can_provide_the_content() { + let (server_1, client_1) = make_client().await; + let (server_2, client_2) = make_client().await; + + mock_head() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_delay(ms(200))) + .expect(1) + .mount(&server_1) + .await; + + mock_head() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_delay(ms(100))) + .expect(1) + .mount(&server_2) + .await; + + mock_get() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_body_bytes(b"some data")) + .expect(1) + .mount(&server_2) + .await; + + let clients = vec![client_1.into_boxed(), client_2.into_boxed()]; + let pool = IpfsClientPool::with_clients(clients); + let block = pool.get_block(&make_path(), None).await.unwrap(); + + assert_eq!(block.as_ref(), b"some data") + } +} diff --git a/graph/src/ipfs/retry_policy.rs b/graph/src/ipfs/retry_policy.rs new file mode 100644 index 00000000000..942b24f4e79 --- /dev/null +++ b/graph/src/ipfs/retry_policy.rs @@ -0,0 +1,24 @@ +use slog::Logger; + +use crate::ipfs::error::IpfsError; +use crate::util::futures::retry; +use crate::util::futures::RetryConfigNoTimeout; + +const DEFAULT_MAX_ATTEMPTS: usize = 100; + +/// Creates a retry policy for each request sent by IPFS clients. +/// +/// Note: It is expected that timeouts will be set on the requests. +pub fn retry_policy( + operation_name: &'static str, + logger: &Logger, +) -> RetryConfigNoTimeout { + retry(operation_name, logger) + .limit(DEFAULT_MAX_ATTEMPTS) + .when(|result: &Result| match result { + Ok(_) => false, + Err(IpfsError::RequestFailed(err)) => !err.is_timeout() && err.is_retriable(), + Err(_) => false, + }) + .no_timeout() +} diff --git a/graph/src/ipfs/rpc_client.rs b/graph/src/ipfs/rpc_client.rs new file mode 100644 index 00000000000..fb0420606a9 --- /dev/null +++ b/graph/src/ipfs/rpc_client.rs @@ -0,0 +1,684 @@ +use std::time::Duration; + +use anyhow::anyhow; +use async_trait::async_trait; +use bytes::Bytes; +use bytes::BytesMut; +use derivative::Derivative; +use futures03::stream::BoxStream; +use futures03::StreamExt; +use futures03::TryStreamExt; +use graph_derive::CheapClone; +use http::header::CONTENT_LENGTH; +use reqwest::Response; +use reqwest::StatusCode; +use slog::Logger; + +use crate::ipfs::retry_policy::retry_policy; +use crate::ipfs::CanProvide; +use crate::ipfs::Cat; +use crate::ipfs::CatStream; +use crate::ipfs::ContentPath; +use crate::ipfs::GetBlock; +use crate::ipfs::IpfsClient; +use crate::ipfs::IpfsError; +use crate::ipfs::IpfsResult; +use crate::ipfs::ServerAddress; + +/// The request that verifies that the IPFS RPC API is accessible is generally fast because +/// it does not involve querying the distributed network. +const TEST_REQUEST_TIMEOUT: Duration = Duration::from_secs(300); + +#[derive(Clone, CheapClone, Derivative)] +#[derivative(Debug)] +/// A client that connects to an IPFS RPC API. +/// +/// Reference: +pub struct IpfsRpcClient { + server_address: ServerAddress, + + #[derivative(Debug = "ignore")] + http_client: reqwest::Client, + + logger: Logger, + test_request_timeout: Duration, +} + +impl IpfsRpcClient { + /// Creates a new [IpfsRpcClient] with the specified server address. + /// Verifies that the server is responding to IPFS RPC API requests. + pub async fn new(server_address: impl AsRef, logger: &Logger) -> IpfsResult { + let client = Self::new_unchecked(server_address, logger)?; + + client + .send_test_request() + .await + .map_err(|reason| IpfsError::InvalidServer { + server_address: client.server_address.clone(), + reason, + })?; + + Ok(client) + } + + /// Creates a new [IpfsRpcClient] with the specified server address. + /// Does not verify that the server is responding to IPFS RPC API requests. + pub fn new_unchecked(server_address: impl AsRef, logger: &Logger) -> IpfsResult { + Ok(Self { + server_address: ServerAddress::new(server_address)?, + http_client: reqwest::Client::new(), + logger: logger.to_owned(), + test_request_timeout: TEST_REQUEST_TIMEOUT, + }) + } + + pub fn into_boxed(self) -> Box { + Box::new(self) + } + + async fn send_test_request(&self) -> anyhow::Result<()> { + let client = self.to_owned(); + + let ok = retry_policy("IPFS.RPC.send_test_request", &self.logger) + .run(move || { + let client = client.clone(); + + async move { + // While there may be unrelated servers that successfully respond to this + // request, it is good enough to at least filter out unresponsive servers and + // confirm that the server behaves like an IPFS RPC API. + let status = client + .call("version", Some(client.test_request_timeout)) + .await? + .status(); + + Ok(status == StatusCode::OK) + } + }) + .await?; + + if !ok { + return Err(anyhow!("not an RPC API")); + } + + Ok(()) + } + + async fn call( + &self, + path_and_query: impl AsRef, + timeout: Option, + ) -> IpfsResult { + let url = self.url(path_and_query); + let mut req = self.http_client.post(url); + + // Some servers require `content-length` even for an empty body. + req = req.header(CONTENT_LENGTH, 0); + + if let Some(timeout) = timeout { + req = req.timeout(timeout); + } + + Ok(req.send().await?.error_for_status()?) + } + + fn url(&self, path_and_query: impl AsRef) -> String { + format!("{}api/v0/{}", self.server_address, path_and_query.as_ref()) + } +} + +#[async_trait] +impl CanProvide for IpfsRpcClient { + async fn can_provide(&self, path: &ContentPath, timeout: Option) -> IpfsResult { + let client = self.to_owned(); + let path = path.to_owned(); + + retry_policy("IPFS.RPC.can_provide", &self.logger) + .run(move || { + let client = client.clone(); + let path = path.clone(); + + async move { + let status = client + .call(format!("cat?arg={path}&length=1"), timeout) + .await? + .status(); + + Ok(status == StatusCode::OK) + } + }) + .await + } +} + +#[async_trait] +impl CatStream for IpfsRpcClient { + async fn cat_stream( + &self, + path: &ContentPath, + timeout: Option, + ) -> IpfsResult>> { + let client = self.to_owned(); + let path = path.to_owned(); + + let resp = retry_policy("IPFS.RPC.cat_stream", &self.logger) + .run(move || { + let client = client.clone(); + let path = path.clone(); + + async move { Ok(client.call(format!("cat?arg={path}"), timeout).await?) } + }) + .await?; + + Ok(resp.bytes_stream().err_into().boxed()) + } +} + +#[async_trait] +impl Cat for IpfsRpcClient { + async fn cat( + &self, + path: &ContentPath, + max_size: usize, + timeout: Option, + ) -> IpfsResult { + let client = self.to_owned(); + let path = path.to_owned(); + + retry_policy("IPFS.RPC.cat", &self.logger) + .run(move || { + let client = client.clone(); + let path = path.clone(); + + async move { + let content = client + .call(format!("cat?arg={path}"), timeout) + .await? + .bytes_stream() + .err_into() + .try_fold(BytesMut::new(), |mut acc, chunk| async { + acc.extend(chunk); + + if acc.len() > max_size { + return Err(IpfsError::ContentTooLarge { + path: path.clone(), + max_size, + }); + } + + Ok(acc) + }) + .await?; + + Ok(content.into()) + } + }) + .await + } +} + +#[async_trait] +impl GetBlock for IpfsRpcClient { + async fn get_block(&self, path: &ContentPath, timeout: Option) -> IpfsResult { + let client = self.to_owned(); + let path = path.to_owned(); + + retry_policy("IPFS.RPC.get_block", &self.logger) + .run(move || { + let client = client.clone(); + let path = path.clone(); + + async move { + let block = client + .call(format!("block/get?arg={path}"), timeout) + .await? + .bytes() + .await?; + + Ok(block) + } + }) + .await + } +} + +#[cfg(test)] +mod tests { + use wiremock::matchers as m; + use wiremock::Mock; + use wiremock::MockBuilder; + use wiremock::MockServer; + use wiremock::ResponseTemplate; + + use super::*; + use crate::log::discard; + + const CID: &str = "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn"; + + async fn mock_server() -> MockServer { + MockServer::start().await + } + + fn mock_post(path: &str) -> MockBuilder { + Mock::given(m::method("POST")).and(m::path(format!("/api/v0/{path}"))) + } + + fn mock_can_provide() -> MockBuilder { + mock_post("cat") + .and(m::query_param("arg", CID)) + .and(m::query_param("length", "1")) + } + + fn mock_cat() -> MockBuilder { + mock_post("cat").and(m::query_param("arg", CID)) + } + + fn mock_get_block() -> MockBuilder { + mock_post("block/get").and(m::query_param("arg", CID)) + } + + async fn make_client() -> (MockServer, IpfsRpcClient) { + let server = mock_server().await; + let client = IpfsRpcClient::new_unchecked(server.uri(), &discard()).unwrap(); + + (server, client) + } + + fn make_path() -> ContentPath { + ContentPath::new(CID).unwrap() + } + + fn ms(millis: u64) -> Duration { + Duration::from_millis(millis) + } + + #[tokio::test] + async fn new_fails_to_create_the_client_if_rpc_api_is_not_accessible() { + let server = mock_server().await; + + IpfsRpcClient::new(server.uri(), &discard()) + .await + .unwrap_err(); + } + + #[tokio::test] + async fn new_creates_the_client_if_it_can_check_the_rpc_api() { + let server = mock_server().await; + + mock_post("version") + .respond_with(ResponseTemplate::new(StatusCode::OK)) + .expect(1) + .mount(&server) + .await; + + IpfsRpcClient::new(server.uri(), &discard()).await.unwrap(); + } + + #[tokio::test] + async fn new_retries_rpc_api_check_on_retriable_errors() { + let server = mock_server().await; + + mock_post("version") + .respond_with(ResponseTemplate::new(StatusCode::INTERNAL_SERVER_ERROR)) + .up_to_n_times(1) + .expect(1) + .mount(&server) + .await; + + mock_post("version") + .respond_with(ResponseTemplate::new(StatusCode::OK)) + .expect(1) + .mount(&server) + .await; + + IpfsRpcClient::new(server.uri(), &discard()).await.unwrap(); + } + + #[tokio::test] + async fn new_does_not_retry_rpc_api_check_on_non_retriable_errors() { + let server = mock_server().await; + + mock_post("version") + .respond_with(ResponseTemplate::new(StatusCode::METHOD_NOT_ALLOWED)) + .expect(1) + .mount(&server) + .await; + + IpfsRpcClient::new(server.uri(), &discard()) + .await + .unwrap_err(); + } + + #[tokio::test] + async fn new_unchecked_creates_the_client_without_checking_the_rpc_api() { + let server = mock_server().await; + + IpfsRpcClient::new_unchecked(server.uri(), &discard()).unwrap(); + } + + #[tokio::test] + async fn can_provide_returns_true_when_content_is_available() { + let (server, client) = make_client().await; + + mock_can_provide() + .respond_with(ResponseTemplate::new(StatusCode::OK)) + .expect(1) + .mount(&server) + .await; + + let ok = client.can_provide(&make_path(), None).await.unwrap(); + + assert!(ok); + } + + #[tokio::test] + async fn can_provide_returns_false_when_content_is_not_completely_available() { + let (server, client) = make_client().await; + + mock_can_provide() + .respond_with(ResponseTemplate::new(StatusCode::PARTIAL_CONTENT)) + .expect(1) + .mount(&server) + .await; + + let ok = client.can_provide(&make_path(), None).await.unwrap(); + + assert!(!ok); + } + + #[tokio::test] + async fn can_provide_fails_on_timeout() { + let (server, client) = make_client().await; + + mock_can_provide() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_delay(ms(500))) + .expect(1) + .mount(&server) + .await; + + client + .can_provide(&make_path(), Some(ms(300))) + .await + .unwrap_err(); + } + + #[tokio::test] + async fn can_provide_retries_the_request_on_retriable_errors() { + let (server, client) = make_client().await; + + mock_can_provide() + .respond_with(ResponseTemplate::new(StatusCode::INTERNAL_SERVER_ERROR)) + .up_to_n_times(1) + .expect(1) + .mount(&server) + .await; + + mock_can_provide() + .respond_with(ResponseTemplate::new(StatusCode::OK)) + .expect(1) + .mount(&server) + .await; + + let ok = client.can_provide(&make_path(), None).await.unwrap(); + + assert!(ok); + } + + #[tokio::test] + async fn can_provide_does_not_retry_the_request_on_non_retriable_errors() { + let (server, client) = make_client().await; + + mock_can_provide() + .respond_with(ResponseTemplate::new(StatusCode::GATEWAY_TIMEOUT)) + .expect(1) + .mount(&server) + .await; + + client.can_provide(&make_path(), None).await.unwrap_err(); + } + + #[tokio::test] + async fn cat_stream_returns_the_content() { + let (server, client) = make_client().await; + + mock_cat() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_body_bytes(b"some data")) + .expect(1) + .mount(&server) + .await; + + let content = client + .cat_stream(&make_path(), None) + .await + .unwrap() + .try_fold(BytesMut::new(), |mut acc, chunk| async { + acc.extend(chunk); + + Ok(acc) + }) + .await + .unwrap(); + + assert_eq!(content.as_ref(), b"some data"); + } + + #[tokio::test] + async fn cat_stream_fails_on_timeout() { + let (server, client) = make_client().await; + + mock_cat() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_delay(ms(500))) + .expect(1) + .mount(&server) + .await; + + let result = client.cat_stream(&make_path(), Some(ms(300))).await; + + assert!(matches!(result, Err(_))); + } + + #[tokio::test] + async fn cat_stream_retries_the_request_on_retriable_errors() { + let (server, client) = make_client().await; + + mock_cat() + .respond_with(ResponseTemplate::new(StatusCode::INTERNAL_SERVER_ERROR)) + .up_to_n_times(1) + .expect(1) + .mount(&server) + .await; + + mock_cat() + .respond_with(ResponseTemplate::new(StatusCode::OK)) + .expect(1) + .mount(&server) + .await; + + let _stream = client.cat_stream(&make_path(), None).await.unwrap(); + } + + #[tokio::test] + async fn cat_stream_does_not_retry_the_request_on_non_retriable_errors() { + let (server, client) = make_client().await; + + mock_cat() + .respond_with(ResponseTemplate::new(StatusCode::GATEWAY_TIMEOUT)) + .expect(1) + .mount(&server) + .await; + + let result = client.cat_stream(&make_path(), None).await; + + assert!(matches!(result, Err(_))); + } + + #[tokio::test] + async fn cat_returns_the_content() { + let (server, client) = make_client().await; + + mock_cat() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_body_bytes(b"some data")) + .expect(1) + .mount(&server) + .await; + + let content = client.cat(&make_path(), usize::MAX, None).await.unwrap(); + + assert_eq!(content.as_ref(), b"some data"); + } + + #[tokio::test] + async fn cat_returns_the_content_if_max_size_is_equal_to_the_content_size() { + let (server, client) = make_client().await; + + let data = b"some data"; + + mock_cat() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_body_bytes(data)) + .expect(1) + .mount(&server) + .await; + + let content = client.cat(&make_path(), data.len(), None).await.unwrap(); + + assert_eq!(content.as_ref(), data); + } + + #[tokio::test] + async fn cat_fails_if_content_is_too_large() { + let (server, client) = make_client().await; + + let data = b"some data"; + + mock_cat() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_body_bytes(data)) + .expect(1) + .mount(&server) + .await; + + client + .cat(&make_path(), data.len() - 1, None) + .await + .unwrap_err(); + } + + #[tokio::test] + async fn cat_fails_on_timeout() { + let (server, client) = make_client().await; + + mock_cat() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_delay(ms(500))) + .expect(1) + .mount(&server) + .await; + + client + .cat(&make_path(), usize::MAX, Some(ms(300))) + .await + .unwrap_err(); + } + + #[tokio::test] + async fn cat_retries_the_request_on_retriable_errors() { + let (server, client) = make_client().await; + + mock_cat() + .respond_with(ResponseTemplate::new(StatusCode::INTERNAL_SERVER_ERROR)) + .up_to_n_times(1) + .expect(1) + .mount(&server) + .await; + + mock_cat() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_body_bytes(b"some data")) + .expect(1) + .mount(&server) + .await; + + let content = client.cat(&make_path(), usize::MAX, None).await.unwrap(); + + assert_eq!(content.as_ref(), b"some data"); + } + + #[tokio::test] + async fn cat_does_not_retry_the_request_on_non_retriable_errors() { + let (server, client) = make_client().await; + + mock_cat() + .respond_with(ResponseTemplate::new(StatusCode::GATEWAY_TIMEOUT)) + .expect(1) + .mount(&server) + .await; + + client + .cat(&make_path(), usize::MAX, None) + .await + .unwrap_err(); + } + + #[tokio::test] + async fn get_block_returns_the_block_content() { + let (server, client) = make_client().await; + + mock_get_block() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_body_bytes(b"some data")) + .expect(1) + .mount(&server) + .await; + + let block = client.get_block(&make_path(), None).await.unwrap(); + + assert_eq!(block.as_ref(), b"some data"); + } + + #[tokio::test] + async fn get_block_fails_on_timeout() { + let (server, client) = make_client().await; + + mock_get_block() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_delay(ms(500))) + .expect(1) + .mount(&server) + .await; + + client + .get_block(&make_path(), Some(ms(300))) + .await + .unwrap_err(); + } + + #[tokio::test] + async fn get_block_retries_the_request_on_retriable_errors() { + let (server, client) = make_client().await; + + mock_get_block() + .respond_with(ResponseTemplate::new(StatusCode::INTERNAL_SERVER_ERROR)) + .up_to_n_times(1) + .expect(1) + .mount(&server) + .await; + + mock_get_block() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_body_bytes(b"some data")) + .expect(1) + .mount(&server) + .await; + + let block = client.get_block(&make_path(), None).await.unwrap(); + + assert_eq!(block.as_ref(), b"some data"); + } + + #[tokio::test] + async fn get_block_does_not_retry_the_request_on_non_retriable_errors() { + let (server, client) = make_client().await; + + mock_get_block() + .respond_with(ResponseTemplate::new(StatusCode::GATEWAY_TIMEOUT)) + .expect(1) + .mount(&server) + .await; + + client.get_block(&make_path(), None).await.unwrap_err(); + } +} diff --git a/graph/src/ipfs/server_address.rs b/graph/src/ipfs/server_address.rs new file mode 100644 index 00000000000..dd0026f054e --- /dev/null +++ b/graph/src/ipfs/server_address.rs @@ -0,0 +1,199 @@ +use std::sync::Arc; + +use anyhow::anyhow; +use http::uri::Scheme; +use http::Uri; + +use crate::derive::CheapClone; +use crate::ipfs::IpfsError; +use crate::ipfs::IpfsResult; + +#[derive(Clone, Debug, CheapClone)] +/// Contains a valid IPFS server address. +pub struct ServerAddress { + inner: Arc, +} + +impl ServerAddress { + /// Creates a new [ServerAddress] from the specified input. + pub fn new(input: impl AsRef) -> IpfsResult { + let input = input.as_ref(); + + if input.is_empty() { + return Err(IpfsError::InvalidServerAddress { + input: input.to_owned(), + source: anyhow!("address is empty"), + }); + } + + let uri = input + .parse::() + .map_err(|err| IpfsError::InvalidServerAddress { + input: input.to_owned(), + source: err.into(), + })?; + + let scheme = uri + .scheme() + // Default to HTTP for backward compatibility. + .unwrap_or(&Scheme::HTTP); + + let authority = uri + .authority() + .ok_or_else(|| IpfsError::InvalidServerAddress { + input: input.to_owned(), + source: anyhow!("missing authority"), + })?; + + let mut inner = format!("{scheme}://"); + + // In the case of IPFS gateways, depending on the configuration, path requests are + // sometimes redirected to the subdomain resolver. This is a problem for localhost because + // some operating systems do not allow subdomain DNS resolutions on localhost for security + // reasons. To avoid forcing users to always specify an IP address instead of localhost + // when they want to use a local IPFS gateway, we will naively try to do this for them. + if authority.host().to_lowercase() == "localhost" { + inner.push_str("127.0.0.1"); + + if let Some(port) = authority.port_u16() { + inner.push_str(&format!(":{port}")); + } + } else { + inner.push_str(authority.as_str()); + } + + inner.push_str(uri.path().trim_end_matches('/')); + inner.push('/'); + + Ok(Self { + inner: inner.into(), + }) + } + + pub fn local_gateway() -> Self { + Self::new("http://127.0.0.1:8080").unwrap() + } + + pub fn local_rpc_api() -> Self { + Self::new("http://127.0.0.1:5001").unwrap() + } +} + +impl std::str::FromStr for ServerAddress { + type Err = IpfsError; + + fn from_str(s: &str) -> Result { + Self::new(s) + } +} + +impl AsRef for ServerAddress { + fn as_ref(&self) -> &str { + &self.inner + } +} + +impl std::fmt::Display for ServerAddress { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&self.inner) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn fails_on_an_empty_address() { + let err = ServerAddress::new("").unwrap_err(); + + assert_eq!( + err.to_string(), + "'' is not a valid IPFS server address: address is empty", + ); + } + + #[test] + fn requires_an_authority() { + let err = ServerAddress::new("https://").unwrap_err(); + + assert_eq!( + err.to_string(), + "'https://' is not a valid IPFS server address: invalid format", + ); + } + + #[test] + fn accepts_a_valid_address() { + let addr = ServerAddress::new("https://example.com/").unwrap(); + + assert_eq!(addr.to_string(), "https://example.com/"); + } + + #[test] + fn defaults_to_http_scheme() { + let addr = ServerAddress::new("example.com").unwrap(); + + assert_eq!(addr.to_string(), "http://example.com/"); + } + + #[test] + fn accepts_a_valid_address_with_a_port() { + let addr = ServerAddress::new("https://example.com:8080/").unwrap(); + + assert_eq!(addr.to_string(), "https://example.com:8080/"); + } + + #[test] + fn rewrites_localhost_to_ipv4() { + let addr = ServerAddress::new("https://localhost/").unwrap(); + + assert_eq!(addr.to_string(), "https://127.0.0.1/"); + } + + #[test] + fn maintains_the_port_on_localhost_rewrite() { + let addr = ServerAddress::new("https://localhost:8080/").unwrap(); + + assert_eq!(addr.to_string(), "https://127.0.0.1:8080/"); + } + + #[test] + fn keeps_the_path_in_an_address() { + let addr = ServerAddress::new("https://example.com/ipfs/").unwrap(); + + assert_eq!(addr.to_string(), "https://example.com/ipfs/"); + } + + #[test] + fn removes_the_query_from_an_address() { + let addr = ServerAddress::new("https://example.com/?format=json").unwrap(); + + assert_eq!(addr.to_string(), "https://example.com/"); + } + + #[test] + fn adds_a_final_slash() { + let addr = ServerAddress::new("https://example.com").unwrap(); + + assert_eq!(addr.to_string(), "https://example.com/"); + + let addr = ServerAddress::new("https://example.com/ipfs").unwrap(); + + assert_eq!(addr.to_string(), "https://example.com/ipfs/"); + } + + #[test] + fn local_gateway_server_address_is_valid() { + let addr = ServerAddress::local_gateway(); + + assert_eq!(addr.to_string(), "http://127.0.0.1:8080/"); + } + + #[test] + fn local_rpc_api_server_address_is_valid() { + let addr = ServerAddress::local_rpc_api(); + + assert_eq!(addr.to_string(), "http://127.0.0.1:5001/"); + } +} diff --git a/graph/src/ipfs/test_utils.rs b/graph/src/ipfs/test_utils.rs new file mode 100644 index 00000000000..decd9724a78 --- /dev/null +++ b/graph/src/ipfs/test_utils.rs @@ -0,0 +1,76 @@ +use reqwest::multipart; +use serde::Deserialize; + +#[derive(Clone, Debug)] +pub struct IpfsAddFile { + path: String, + content: Vec, +} + +#[derive(Clone, Debug, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct IpfsAddResponse { + pub name: String, + pub hash: String, +} + +impl From> for IpfsAddFile { + fn from(content: Vec) -> Self { + Self { + path: Default::default(), + content: content.into(), + } + } +} + +impl From<(T, U)> for IpfsAddFile +where + T: Into, + U: Into>, +{ + fn from((path, content): (T, U)) -> Self { + Self { + path: path.into(), + content: content.into(), + } + } +} + +pub async fn add_files_to_local_ipfs_node_for_testing( + files: T, +) -> anyhow::Result> +where + T: IntoIterator, + U: Into, +{ + let mut form = multipart::Form::new(); + + for file in files.into_iter() { + let file = file.into(); + let part = multipart::Part::bytes(file.content).file_name(file.path); + + form = form.part("path", part); + } + + let resp = reqwest::Client::new() + .post("http://127.0.0.1:5001/api/v0/add") + .multipart(form) + .send() + .await? + .text() + .await?; + + let mut output = Vec::new(); + + for line in resp.lines() { + let line = line.trim(); + + if line.is_empty() { + continue; + } + + output.push(serde_json::from_str::(line)?); + } + + Ok(output) +} diff --git a/graph/src/ipfs_client.rs b/graph/src/ipfs_client.rs deleted file mode 100644 index 07221e6dd6e..00000000000 --- a/graph/src/ipfs_client.rs +++ /dev/null @@ -1,330 +0,0 @@ -use anyhow::anyhow; -use anyhow::Error; -use bytes::Bytes; -use bytes::BytesMut; -use cid::Cid; -use futures03::stream::TryStreamExt as _; -use futures03::Stream; -use http::header::CONTENT_LENGTH; -use http::Uri; -use reqwest::multipart; -use serde::Deserialize; -use std::fmt::Display; -use std::time::Duration; -use std::{str::FromStr, sync::Arc}; - -use crate::derive::CheapClone; - -#[derive(Debug, thiserror::Error)] -pub enum IpfsError { - #[error("Request error: {0}")] - Request(#[from] reqwest::Error), - #[error("IPFS file {0} is too large. It can be at most {1} bytes")] - FileTooLarge(String, usize), -} - -impl IpfsError { - pub fn is_timeout(&self) -> bool { - match self { - Self::Request(e) => e.is_timeout(), - _ => false, - } - } - - /// Is this error from an HTTP status code? - pub fn is_status(&self) -> bool { - match self { - Self::Request(e) => e.is_status(), - _ => false, - } - } - - pub fn status(&self) -> Option { - match self { - Self::Request(e) => e.status(), - _ => None, - } - } -} - -/// Represents a file on Ipfs. This file can be the CID or a path within a folder CID. -/// The path cannot have a prefix (ie CID/hello.json would be cid: CID path: "hello.json") -#[derive(Debug, Clone, Default, Eq, PartialEq, Hash)] -pub struct CidFile { - pub cid: Cid, - pub path: Option, -} - -impl Display for CidFile { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let str = match self.path { - Some(ref f) => format!("{}/{}", self.cid, f), - None => self.cid.to_string(), - }; - f.write_str(&str) - } -} - -impl CidFile { - pub fn to_bytes(&self) -> Vec { - self.to_string().as_bytes().to_vec() - } -} - -impl TryFrom for CidFile { - type Error = anyhow::Error; - - fn try_from(value: crate::data::store::scalar::Bytes) -> Result { - let str = String::from_utf8(value.to_vec())?; - - Self::from_str(&str) - } -} - -/// The string should not have a prefix and only one slash after the CID is removed, everything -/// else is considered a file path. If this is malformed, it will fail to find the file. -impl FromStr for CidFile { - type Err = anyhow::Error; - - fn from_str(s: &str) -> Result { - if s.is_empty() { - return Err(anyhow!("cid can't be empty")); - } - - let cid_str: String = s.chars().take_while(|c| *c != '/').collect(); - let cid = Cid::from_str(&cid_str)?; - - // if cid was the only content or if it's just slash terminated. - if cid_str.len() == s.len() || s.len() + 1 == cid_str.len() { - return Ok(CidFile { cid, path: None }); - } - - let file: String = s[cid_str.len() + 1..].to_string(); - let path = if file.is_empty() { None } else { Some(file) }; - - Ok(CidFile { cid, path }) - } -} - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "PascalCase")] -pub struct AddResponse { - pub name: String, - pub hash: String, - pub size: String, -} - -/// Reference type, clones will share the connection pool. -#[derive(Clone, CheapClone)] -pub struct IpfsClient { - base: Arc, - // reqwest::Client doesn't need to be `Arc` because it has one internally - // already. - client: reqwest::Client, -} - -impl IpfsClient { - pub fn new(base: &str) -> Result { - Ok(IpfsClient { - client: reqwest::Client::new(), - base: Arc::new(Uri::from_str(base)?), - }) - } - - pub fn localhost() -> Self { - IpfsClient { - client: reqwest::Client::new(), - base: Arc::new(Uri::from_str("http://localhost:5001").unwrap()), - } - } - - /// To check the existence of a cid, we do a cat of a single byte. - pub async fn exists(&self, cid: &str, timeout: Option) -> Result<(), IpfsError> { - self.call(self.cat_url("cat", cid, Some(1)), None, timeout) - .await?; - Ok(()) - } - - pub async fn cat_all( - &self, - cid: &str, - timeout: Option, - max_file_size: usize, - ) -> Result { - let byte_stream = self.cat_stream(cid, timeout).await?; - let bytes = byte_stream - .err_into() - .try_fold(BytesMut::new(), |mut acc, chunk| async move { - acc.extend_from_slice(&chunk); - - // Check size limit - if acc.len() > max_file_size { - return Err(IpfsError::FileTooLarge(cid.to_string(), max_file_size)); - } - - Ok(acc) - }) - .await?; - Ok(bytes.into()) - } - pub async fn cat_stream( - &self, - cid: &str, - timeout: Option, - ) -> Result> + 'static, reqwest::Error> { - Ok(self - .call(self.cat_url("cat", cid, None), None, timeout) - .await? - .bytes_stream()) - } - - pub async fn get_block(&self, cid: String) -> Result { - let form = multipart::Form::new().part("arg", multipart::Part::text(cid)); - self.call(format!("{}api/v0/block/get", self.base), Some(form), None) - .await? - .bytes() - .await - } - - pub async fn test(&self) -> Result<(), reqwest::Error> { - self.call(format!("{}api/v0/version", self.base), None, None) - .await - .map(|_| ()) - } - - pub async fn add(&self, data: Vec) -> Result { - let form = multipart::Form::new().part("path", multipart::Part::bytes(data)); - - self.call(format!("{}api/v0/add", self.base), Some(form), None) - .await? - .json() - .await - } - - fn cat_url(&self, route: &str, arg: &str, length: Option) -> String { - // URL security: We control the base and the route, user-supplied input goes only into the - // query parameters. - let mut url = format!("{}api/v0/{}?arg={}", self.base, route, arg); - if let Some(length) = length { - url.push_str(&format!("&length={}", length)); - } - url - } - - async fn call( - &self, - url: String, - form: Option, - timeout: Option, - ) -> Result { - let mut req = self.client.post(&url); - if let Some(form) = form { - req = req.multipart(form); - } else { - // Some servers require `content-length` even for an empty body. - req = req.header(CONTENT_LENGTH, 0); - } - - if let Some(timeout) = timeout { - req = req.timeout(timeout) - } - - req.send() - .await - .map(|res| res.error_for_status()) - .and_then(|x| x) - } -} - -#[cfg(test)] -mod test { - use std::str::FromStr; - - use anyhow::anyhow; - use cid::Cid; - - use crate::ipfs_client::CidFile; - - #[test] - fn test_cid_parsing() { - let cid_str = "bafyreibjo4xmgaevkgud7mbifn3dzp4v4lyaui4yvqp3f2bqwtxcjrdqg4"; - let cid = Cid::from_str(cid_str).unwrap(); - - struct Case<'a> { - name: &'a str, - input: String, - path: String, - expected: Result, - } - - let cases = vec![ - Case { - name: "correct no slashes, no file", - input: cid_str.to_string(), - path: cid_str.to_string(), - expected: Ok(CidFile { cid, path: None }), - }, - Case { - name: "correct with file path", - input: format!("{}/file.json", cid), - path: format!("{}/file.json", cid_str), - expected: Ok(CidFile { - cid, - path: Some("file.json".into()), - }), - }, - Case { - name: "correct cid with trailing slash", - input: format!("{}/", cid), - path: format!("{}", cid), - expected: Ok(CidFile { cid, path: None }), - }, - Case { - name: "incorrect, empty", - input: "".to_string(), - path: "".to_string(), - expected: Err(anyhow!("cid can't be empty")), - }, - Case { - name: "correct, two slahes", - input: format!("{}//", cid), - path: format!("{}//", cid), - expected: Ok(CidFile { - cid, - path: Some("/".into()), - }), - }, - Case { - name: "incorrect, leading slahes", - input: format!("/ipfs/{}/file.json", cid), - path: "".to_string(), - expected: Err(anyhow!("Input too short")), - }, - Case { - name: "correct syntax, invalid CID", - input: "notacid/file.json".to_string(), - path: "".to_string(), - expected: Err(anyhow!("Failed to parse multihash")), - }, - ]; - - for case in cases { - let f = CidFile::from_str(&case.input); - - match case.expected { - Ok(cid_file) => { - assert!(f.is_ok(), "case: {}", case.name); - let f = f.unwrap(); - assert_eq!(f, cid_file, "case: {}", case.name); - assert_eq!(f.to_string(), case.path, "case: {}", case.name); - } - Err(err) => assert_eq!( - f.unwrap_err().to_string(), - err.to_string(), - "case: {}", - case.name - ), - } - } - } -} diff --git a/graph/src/lib.rs b/graph/src/lib.rs index 2c335c02df2..fe1b6949642 100644 --- a/graph/src/lib.rs +++ b/graph/src/lib.rs @@ -16,8 +16,6 @@ pub mod log; /// `CheapClone` trait. pub mod cheap_clone; -pub mod ipfs_client; - pub mod data_source; pub mod blockchain; @@ -37,6 +35,8 @@ pub mod schema; /// Helpers for parsing environment variables. pub mod env; +pub mod ipfs; + /// Wrapper for spawning tasks that abort on panic, which is our default. mod task_spawn; pub use task_spawn::{ diff --git a/node/src/chain.rs b/node/src/chain.rs index 3e87ff8295b..1be5761e77e 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -25,16 +25,13 @@ use graph::firehose::{ SubstreamsGenesisDecoder, }; use graph::futures03::future::try_join_all; -use graph::futures03::TryFutureExt; -use graph::ipfs_client::IpfsClient; use graph::itertools::Itertools; use graph::log::factory::LoggerFactory; use graph::prelude::anyhow; use graph::prelude::MetricsRegistry; -use graph::slog::{debug, error, info, o, warn, Logger}; +use graph::slog::{debug, info, o, warn, Logger}; use graph::tokio::time::timeout; use graph::url::Url; -use graph::util::security::SafeDisplay; use graph_chain_ethereum::{self as ethereum, Transport}; use graph_store_postgres::{BlockStore, ChainHeadUpdateListener}; use std::cmp::Ordering; @@ -54,73 +51,6 @@ pub enum ProviderNetworkStatus { }, } -pub fn create_ipfs_clients(logger: &Logger, ipfs_addresses: &Vec) -> Vec { - // Parse the IPFS URL from the `--ipfs` command line argument - let ipfs_addresses: Vec<_> = ipfs_addresses - .iter() - .map(|uri| { - if uri.starts_with("http://") || uri.starts_with("https://") { - String::from(uri) - } else { - format!("http://{}", uri) - } - }) - .collect(); - - ipfs_addresses - .into_iter() - .map(|ipfs_address| { - info!( - logger, - "Trying IPFS node at: {}", - SafeDisplay(&ipfs_address) - ); - - let ipfs_client = match IpfsClient::new(&ipfs_address) { - Ok(ipfs_client) => ipfs_client, - Err(e) => { - error!( - logger, - "Failed to create IPFS client for `{}`: {}", - SafeDisplay(&ipfs_address), - e - ); - panic!("Could not connect to IPFS"); - } - }; - - // Test the IPFS client by getting the version from the IPFS daemon - let ipfs_test = ipfs_client.cheap_clone(); - let ipfs_ok_logger = logger.clone(); - let ipfs_err_logger = logger.clone(); - let ipfs_address_for_ok = ipfs_address.clone(); - let ipfs_address_for_err = ipfs_address; - graph::spawn(async move { - ipfs_test - .test() - .map_err(move |e| { - error!( - ipfs_err_logger, - "Is there an IPFS node running at \"{}\"?", - SafeDisplay(ipfs_address_for_err), - ); - panic!("Failed to connect to IPFS: {}", e); - }) - .map_ok(move |_| { - info!( - ipfs_ok_logger, - "Successfully connected to IPFS node at: {}", - SafeDisplay(ipfs_address_for_ok) - ); - }) - .await - }); - - ipfs_client - }) - .collect() -} - pub fn create_substreams_networks( logger: Logger, config: &Config, diff --git a/node/src/main.rs b/node/src/main.rs index d67c38992ba..80622fdbf61 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -21,7 +21,6 @@ use graph_core::{ SubgraphRegistrar as IpfsSubgraphRegistrar, }; use graph_graphql::prelude::GraphQlRunner; -use graph_node::chain::create_ipfs_clients; use graph_node::config::Config; use graph_node::network_setup::Networks; use graph_node::opt; @@ -202,15 +201,17 @@ async fn main() { let logger_factory = LoggerFactory::new(logger.clone(), elastic_config, metrics_registry.clone()); - // Try to create IPFS clients for each URL specified in `--ipfs` - let ipfs_clients: Vec<_> = create_ipfs_clients(&logger, &opt.ipfs); - let ipfs_client = ipfs_clients.first().cloned().expect("Missing IPFS client"); + let ipfs_client = graph::ipfs::new_ipfs_client(&opt.ipfs, &logger) + .await + .unwrap_or_else(|err| panic!("Failed to create IPFS client: {err:#}")); + let ipfs_service = ipfs_service( - ipfs_client, + ipfs_client.cheap_clone(), ENV_VARS.mappings.max_ipfs_file_bytes, ENV_VARS.mappings.ipfs_timeout, ENV_VARS.mappings.ipfs_request_limit, ); + let arweave_resolver = Arc::new(ArweaveClient::new( logger.cheap_clone(), opt.arweave @@ -229,7 +230,7 @@ async fn main() { // Convert the clients into a link resolver. Since we want to get past // possible temporary DNS failures, make the resolver retry - let link_resolver = Arc::new(IpfsResolver::new(ipfs_clients, env_vars.cheap_clone())); + let link_resolver = Arc::new(IpfsResolver::new(ipfs_client, env_vars.cheap_clone())); let metrics_server = PrometheusMetricsServer::new(&logger_factory, prometheus_registry.clone()); let endpoint_metrics = Arc::new(EndpointMetrics::new( diff --git a/node/src/manager/commands/run.rs b/node/src/manager/commands/run.rs index ac393c42c55..1a9e7d4353b 100644 --- a/node/src/manager/commands/run.rs +++ b/node/src/manager/commands/run.rs @@ -2,7 +2,6 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use crate::chain::create_ipfs_clients; use crate::config::Config; use crate::manager::PanicSubscriptionManager; use crate::network_setup::Networks; @@ -59,14 +58,15 @@ pub async fn run( let logger_factory = LoggerFactory::new(logger.clone(), None, metrics_ctx.registry.clone()); // FIXME: Hard-coded IPFS config, take it from config file instead? - let ipfs_clients: Vec<_> = create_ipfs_clients(&logger, &ipfs_url); - let ipfs_client = ipfs_clients.first().cloned().expect("Missing IPFS client"); + let ipfs_client = graph::ipfs::new_ipfs_client(&ipfs_url, &logger).await?; + let ipfs_service = ipfs_service( - ipfs_client, + ipfs_client.cheap_clone(), env_vars.mappings.max_ipfs_file_bytes, env_vars.mappings.ipfs_timeout, env_vars.mappings.ipfs_request_limit, ); + let arweave_resolver = Arc::new(ArweaveClient::new( logger.cheap_clone(), arweave_url.parse().expect("invalid arweave url"), @@ -88,7 +88,7 @@ pub async fn run( // Convert the clients into a link resolver. Since we want to get past // possible temporary DNS failures, make the resolver retry - let link_resolver = Arc::new(IpfsResolver::new(ipfs_clients, env_vars.cheap_clone())); + let link_resolver = Arc::new(IpfsResolver::new(ipfs_client, env_vars.cheap_clone())); let chain_head_update_listener = store_builder.chain_head_update_listener(); let network_store = store_builder.network_store(config.chain_ids()); diff --git a/node/src/opt.rs b/node/src/opt.rs index 2b127e34e29..e4dc44ba92a 100644 --- a/node/src/opt.rs +++ b/node/src/opt.rs @@ -106,7 +106,7 @@ pub struct Opt { long, value_name = "HOST:PORT", env = "IPFS", - help = "HTTP addresses of IPFS nodes" + help = "HTTP addresses of IPFS servers (RPC, Gateway)" )] pub ipfs: Vec, #[clap( diff --git a/runtime/test/src/common.rs b/runtime/test/src/common.rs index 46a17f54f22..2416a96a198 100644 --- a/runtime/test/src/common.rs +++ b/runtime/test/src/common.rs @@ -4,7 +4,8 @@ use graph::components::store::DeploymentLocator; use graph::data::subgraph::*; use graph::data_source; use graph::env::EnvVars; -use graph::ipfs_client::IpfsClient; +use graph::ipfs::IpfsRpcClient; +use graph::ipfs::ServerAddress; use graph::log; use graph::prelude::*; use graph_chain_ethereum::{ @@ -64,12 +65,16 @@ fn mock_host_exports( Arc::new(templates.iter().map(|t| t.into()).collect()), ); + let ipfs_client = IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &LOGGER) + .unwrap() + .into_boxed(); + HostExports::new( subgraph_id, network, ds_details, - Arc::new(graph::prelude::IpfsResolver::new( - vec![IpfsClient::localhost()], + Arc::new(IpfsResolver::new( + ipfs_client.into(), Arc::new(EnvVars::default()), )), ens_lookup, diff --git a/runtime/test/src/test.rs b/runtime/test/src/test.rs index 91895e07725..561820760d4 100644 --- a/runtime/test/src/test.rs +++ b/runtime/test/src/test.rs @@ -1,21 +1,21 @@ use graph::blockchain::BlockTime; use graph::components::metrics::gas::GasMetrics; +use graph::components::store::*; use graph::data::store::{scalar, Id, IdType}; use graph::data::subgraph::*; use graph::data::value::Word; +use graph::ipfs::test_utils::add_files_to_local_ipfs_node_for_testing; use graph::prelude::web3::types::U256; use graph::runtime::gas::GasCounter; use graph::runtime::{AscIndexId, AscType, HostExportError}; use graph::runtime::{AscPtr, ToAscObj}; use graph::schema::{EntityType, InputSchema}; -use graph::{components::store::*, ipfs_client::IpfsClient}; use graph::{entity, prelude::*}; use graph_chain_ethereum::DataSource; use graph_runtime_wasm::asc_abi::class::{Array, AscBigInt, AscEntity, AscString, Uint8Array}; use graph_runtime_wasm::{ host_exports, ExperimentalFeatures, MappingContext, ValidModule, WasmInstance, }; - use semver::Version; use std::collections::{BTreeMap, HashMap}; use std::str::FromStr; @@ -418,8 +418,8 @@ async fn test_ipfs_cat(api_version: Version) { std::thread::spawn(move || { let _runtime_guard = runtime.enter(); - let ipfs = IpfsClient::localhost(); - let hash = graph::block_on(ipfs.add("42".into())).unwrap().hash; + let fut = add_files_to_local_ipfs_node_for_testing(["42".as_bytes().to_vec()]); + let hash = graph::block_on(fut).unwrap()[0].hash.to_owned(); let mut module = graph::block_on(test_module( "ipfsCat", @@ -455,8 +455,9 @@ async fn test_ipfs_block() { std::thread::spawn(move || { let _runtime_guard = runtime.enter(); - let ipfs = IpfsClient::localhost(); - let hash = graph::block_on(ipfs.add("42".into())).unwrap().hash; + let fut = add_files_to_local_ipfs_node_for_testing(["42".as_bytes().to_vec()]); + let hash = graph::block_on(fut).unwrap()[0].hash.to_owned(); + let mut module = graph::block_on(test_module( "ipfsBlock", mock_data_source( @@ -493,15 +494,16 @@ fn make_thing(id: &str, value: &str) -> (String, EntityModification) { const BAD_IPFS_HASH: &str = "bad-ipfs-hash"; async fn run_ipfs_map( - ipfs: IpfsClient, subgraph_id: &'static str, json_string: String, api_version: Version, -) -> Result, anyhow::Error> { +) -> Result, Error> { let hash = if json_string == BAD_IPFS_HASH { "Qm".to_string() } else { - ipfs.add(json_string.into()).await.unwrap().hash + add_files_to_local_ipfs_node_for_testing([json_string.as_bytes().to_vec()]).await?[0] + .hash + .to_owned() }; // Ipfs host functions use `block_on` which must be called from a sync context, @@ -548,14 +550,12 @@ async fn run_ipfs_map( } async fn test_ipfs_map(api_version: Version, json_error_msg: &str) { - let ipfs = IpfsClient::localhost(); let subgraph_id = "ipfsMap"; // Try it with two valid objects let (str1, thing1) = make_thing("one", "eins"); let (str2, thing2) = make_thing("two", "zwei"); let ops = run_ipfs_map( - ipfs.clone(), subgraph_id, format!("{}\n{}", str1, str2), api_version.clone(), @@ -567,14 +567,9 @@ async fn test_ipfs_map(api_version: Version, json_error_msg: &str) { // Valid JSON, but not what the callback expected; it will // fail on an assertion - let err = run_ipfs_map( - ipfs.clone(), - subgraph_id, - format!("{}\n[1,2]", str1), - api_version.clone(), - ) - .await - .unwrap_err(); + let err = run_ipfs_map(subgraph_id, format!("{}\n[1,2]", str1), api_version.clone()) + .await + .unwrap_err(); assert!( format!("{:#}", err).contains("JSON value is not an object."), "{:#}", @@ -582,32 +577,21 @@ async fn test_ipfs_map(api_version: Version, json_error_msg: &str) { ); // Malformed JSON - let err = run_ipfs_map( - ipfs.clone(), - subgraph_id, - format!("{}\n[", str1), - api_version.clone(), - ) - .await - .unwrap_err(); + let err = run_ipfs_map(subgraph_id, format!("{}\n[", str1), api_version.clone()) + .await + .unwrap_err(); assert!(format!("{err:?}").contains("EOF while parsing a list")); // Empty input - let ops = run_ipfs_map( - ipfs.clone(), - subgraph_id, - "".to_string(), - api_version.clone(), - ) - .await - .expect("call failed for emoty string"); + let ops = run_ipfs_map(subgraph_id, "".to_string(), api_version.clone()) + .await + .expect("call failed for emoty string"); assert_eq!(0, ops.len()); // Missing entry in the JSON object let errmsg = format!( "{:#}", run_ipfs_map( - ipfs.clone(), subgraph_id, "{\"value\": \"drei\"}".to_string(), api_version.clone(), @@ -618,15 +602,10 @@ async fn test_ipfs_map(api_version: Version, json_error_msg: &str) { assert!(errmsg.contains(json_error_msg)); // Bad IPFS hash. - let err = run_ipfs_map( - ipfs.clone(), - subgraph_id, - BAD_IPFS_HASH.to_string(), - api_version.clone(), - ) - .await - .unwrap_err(); - assert!(format!("{err:?}").contains("500 Internal Server Error")); + let err = run_ipfs_map(subgraph_id, BAD_IPFS_HASH.to_string(), api_version.clone()) + .await + .unwrap_err(); + assert!(format!("{err:?}").contains("invalid CID")); } #[tokio::test(flavor = "multi_thread")] diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index ebed1d3a115..b61e1f45e40 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -32,7 +32,7 @@ use graph::futures03::{Stream, StreamExt}; use graph::http_body_util::Full; use graph::hyper::body::Bytes; use graph::hyper::Request; -use graph::ipfs_client::IpfsClient; +use graph::ipfs::IpfsClient; use graph::prelude::ethabi::ethereum_types::H256; use graph::prelude::serde_json::{self, json}; use graph::prelude::{ @@ -169,7 +169,7 @@ pub struct TestContext { pub link_resolver: Arc, pub arweave_resolver: Arc, pub env_vars: Arc, - pub ipfs: IpfsClient, + pub ipfs: Arc, graphql_runner: Arc, indexing_status_service: Arc>, } @@ -462,13 +462,21 @@ pub async fn setup( let static_filters = env_vars.experimental_static_filters; - let ipfs = IpfsClient::localhost(); + let ipfs_client: Arc = graph::ipfs::IpfsRpcClient::new_unchecked( + graph::ipfs::ServerAddress::local_rpc_api(), + &logger, + ) + .unwrap() + .into_boxed() + .into(); + let link_resolver = Arc::new(IpfsResolver::new( - vec![ipfs.cheap_clone()], + ipfs_client.cheap_clone(), Default::default(), )); + let ipfs_service = ipfs_service( - ipfs.cheap_clone(), + ipfs_client.cheap_clone(), env_vars.mappings.max_ipfs_file_bytes, env_vars.mappings.ipfs_timeout, env_vars.mappings.ipfs_request_limit, @@ -572,7 +580,7 @@ pub async fn setup( link_resolver, env_vars, indexing_status_service, - ipfs, + ipfs: ipfs_client, arweave_resolver, } } diff --git a/tests/tests/runner_tests.rs b/tests/tests/runner_tests.rs index 7da707ac7cd..caeb67e9adf 100644 --- a/tests/tests/runner_tests.rs +++ b/tests/tests/runner_tests.rs @@ -13,7 +13,8 @@ use graph::data::subgraph::schema::{SubgraphError, SubgraphHealth}; use graph::data::value::Word; use graph::data_source::CausalityRegion; use graph::env::EnvVars; -use graph::ipfs_client::IpfsClient; +use graph::ipfs; +use graph::ipfs::test_utils::add_files_to_local_ipfs_node_for_testing; use graph::object; use graph::prelude::ethabi::ethereum_types::H256; use graph::prelude::web3::types::Address; @@ -632,18 +633,18 @@ async fn file_data_sources() { let RunnerTestRecipe { stores, test_info } = RunnerTestRecipe::new("file-data-sourcess", "file-data-sources").await; - let ipfs = IpfsClient::new("http://localhost:5001").unwrap(); - - async fn add_content_to_ipfs(ipfs: &IpfsClient, content: &str) -> String { - let bytes = content.to_string().into_bytes(); - let resp = ipfs.add(bytes).await.unwrap(); - resp.hash + async fn add_content_to_ipfs(content: &str) -> String { + add_files_to_local_ipfs_node_for_testing([content.as_bytes().to_vec()]) + .await + .unwrap()[0] + .hash + .to_owned() } - let hash_1 = add_content_to_ipfs(&ipfs, "EXAMPLE_1").await; - let hash_2 = add_content_to_ipfs(&ipfs, "EXAMPLE_2").await; - let hash_3 = add_content_to_ipfs(&ipfs, "EXAMPLE_3").await; - let hash_4 = add_content_to_ipfs(&ipfs, "EXAMPLE_4").await; + let hash_1 = add_content_to_ipfs("EXAMPLE_1").await; + let hash_2 = add_content_to_ipfs("EXAMPLE_2").await; + let hash_3 = add_content_to_ipfs("EXAMPLE_3").await; + let hash_4 = add_content_to_ipfs("EXAMPLE_4").await; //concatenate hash2 and hash3 let hash_2_comma_3 = format!("{},{}", hash_2, hash_3); @@ -828,7 +829,7 @@ async fn file_data_sources() { let mut blocks = blocks.clone(); blocks.retain(|block| block.block.number() <= 4); - let hash_5 = add_content_to_ipfs(&ipfs, "EXAMPLE_5").await; + let hash_5 = add_content_to_ipfs("EXAMPLE_5").await; let mut block_5 = empty_block(test_ptr(4), test_ptr(5)); push_test_command(&mut block_5, "CREATE_FOO", &hash_5); @@ -1277,8 +1278,7 @@ async fn build_subgraph_with_yarn_cmd_and_arg( arg: Option<&str>, ) -> DeploymentHash { // Test that IPFS is up. - IpfsClient::localhost() - .test() + ipfs::IpfsRpcClient::new(ipfs::ServerAddress::local_rpc_api(), &graph::log::discard()) .await .expect("Could not connect to IPFS, make sure it's running at port 5001");